任务调度有很多的实现方式以及框架,比如DelayQueue配合线程池;Spring;Quartz; ScheduledThreadPool等等,今天来研究下Java自带的调度工具类Timer实现的原理以及案例。
先来实现一个指定延时执行 + 周期调度 + 指定时间停止的demo
@Slf4j
public class TimerDemo1 {
public static void main(String[] args) {
// 创建一个定时器
Timer timer = new Timer();
TimerTask job1 = new TimerTask() {
@Override
public void run() {
log.info("{} - 开始 job1 任务 >>>>>>>>>>>", Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("<<<<<<<<<<<<< job1 任务完成 - {} ...", Thread.currentThread().getName());
}
};
Runnable scheduleThread1 = ()->{
log.info("调度线程{} - 开始调度...", Thread.currentThread().getName());
// 延时3s开始执行,执行周期为5s
timer.schedule(job1, 3000, 5000);
};
Runnable scheduleThread2 = ()->{
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("调度线程{} - 结束调度...", Thread.currentThread().getName());
timer.cancel();
};
scheduleThread1.run();
scheduleThread2.run();
}
}
效果:延时3s开始执行, 执行周期为5s, 运行30s后结束执行
16:02:06.583 [main] INFO com.example.demo202206.schedule.timer.TimerDemo1 - 调度线程main - 开始调度...
16:02:09.603 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - Timer-0 - 开始 job1 任务 >>>>>>>>>>>
16:02:10.610 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - <<<<<<<<<<<<< job1 任务完成 - Timer-0 ...
16:02:14.618 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - Timer-0 - 开始 job1 任务 >>>>>>>>>>>
16:02:15.630 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - <<<<<<<<<<<<< job1 任务完成 - Timer-0 ...
16:02:19.626 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - Timer-0 - 开始 job1 任务 >>>>>>>>>>>
16:02:20.631 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - <<<<<<<<<<<<< job1 任务完成 - Timer-0 ...
16:02:24.641 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - Timer-0 - 开始 job1 任务 >>>>>>>>>>>
16:02:25.644 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - <<<<<<<<<<<<< job1 任务完成 - Timer-0 ...
16:02:29.649 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - Timer-0 - 开始 job1 任务 >>>>>>>>>>>
16:02:30.663 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - <<<<<<<<<<<<< job1 任务完成 - Timer-0 ...
16:02:34.661 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - Timer-0 - 开始 job1 任务 >>>>>>>>>>>
16:02:35.670 [Timer-0] INFO com.example.demo202206.schedule.timer.TimerDemo1 - <<<<<<<<<<<<< job1 任务完成 - Timer-0 ...
16:02:36.591 [main] INFO com.example.demo202206.schedule.timer.TimerDemo1 - 调度线程main - 结束调度...
Timer类使用非常简单,主要用到了Timer工具类进行定时执行,TimerTask实现具体的业务逻辑。
核心属性主要以下两个:优先级队列+ 消费线程
// 根据时间进行优先排序的队列
private final TaskQueue queue = new TaskQueue();
// 对TaskQueue里面的定时任务进行编排和触发执行,它是一个内部无限循环的线程。
private final TimerThread thread = new TimerThread(queue);
其中 TaskQueue任务队列就是基于数组实现的一个优先级队列
TaskQueue此类表示计时器任务队列:TimerTasks的优先级队列,
在nextExecutionTime上订阅。每个Timer对象都有其中一个,它与TimerThread共享。
在内部,该类使用堆为add、removeMin和rescheduleMin
提供日志(n)性能以及getMin操作的恒定时间性能。
优先级队列表现为平衡二进制堆的优先级队列:两个子队列
队列[n]是队列[2*n], 和队列[2*n+1]。
优先级队列为在nextExecutionTime字段上排序:具有最低值的TimerTask
nextExecutionTime在队列[1]中(假定队列为非空)。对于
堆中的每个节点n,n.nextExecutionTime<=d.nextExecutionTime。
实现一个添加任务时,根据执行时间进行排序的队列,每次消费都是获取执行时间最小的任务
class TaskQueue {
// 默认128长度数组的队列
private TimerTask[] queue = new TimerTask[128];
private int size = 0;
void add(TimerTask var1) {
// 扩容
if (this.size + 1 == this.queue.length) {
this.queue = (TimerTask[])Arrays.copyOf(this.queue, 2 * this.queue.length);
}
// 加到数组最后位置
this.queue[++this.size] = var1;
// 调整
this.fixUp(this.size);
}
TimerTask getMin() {
return this.queue[1];
}
// 堆插入操作 时间复杂度为o(logn)
private void fixUp(int var1) {
while(true) {
if (var1 > 1) {
int var2 = var1 >> 1;
// 通过任务执行时间进行比较排序
if (this.queue[var2].nextExecutionTime > this.queue[var1].nextExecutionTime) {
TimerTask var3 = this.queue[var2];
this.queue[var2] = this.queue[var1];
this.queue[var1] = var3;
var1 = var2;
continue;
}
}
return;
}
}
}
消费线程主要就是要实现一个消费TaskQueue队列任务的消费线程
/**
* The main timer loop. (See class comment.)
*/
private void mainLoop() {
while (true) {
try {
TimerTask task;
boolean taskFired;
synchronized(queue) {
// 如果queue里面没有要执行的任务,则挂起TimerThread线程
while (queue.isEmpty() && newTasksMayBeScheduled)
queue.wait();
if (queue.isEmpty())
break; // Queue is empty and will forever remain; die
// Queue nonempty; look at first evt and do the right thing
long currentTime, executionTime;
// 获取queue队列里面下一个要执行的任务(根据时间排序,也就是接下来最近要执行的任务)
task = queue.getMin();
synchronized(task.lock) {
if (task.state == TimerTask.CANCELLED) {
queue.removeMin();
continue; // No action required, poll queue again
}
currentTime = System.currentTimeMillis();
executionTime = task.nextExecutionTime;
// 执行条件
if (taskFired = (executionTime<=currentTime)) {
// 是否需要周期运行?
if (task.period == 0) { // Non-repeating, remove
// 执行完便从队列中移除
queue.removeMin();
task.state = TimerTask.EXECUTED;
} else { // Repeating task, reschedule
//针对task.period不等于0的任务,则计算它的下次执行时间点
//task.period<0表示是fixed delay模式的任务
//task.period>0表示是fixed rate模式的任务
queue.rescheduleMin(
task.period<0 ? currentTime - task.period
: executionTime + task.period);
}
}
}
// // 如果任务的下次执行时间还没有到达,则挂起TimerThread线程executionTime - currentTime毫秒数,到达执行时间点再自动激活
if (!taskFired) // Task hasn't yet fired; wait
queue.wait(executionTime - currentTime);
}
// 如果任务的下次执行时间到了,则执行任务
// 注意:这里任务执行没有另起线程,还是在TimerThread线程执行的,所以当有任务在同时执行时会出现阻塞
if (taskFired) // Task fired; run it, holding no locks
// 注意:这里没有try catch异常,当TimerTask抛出异常会导致整个TimerThread跳出循环,从而导致Timer失效
task.run();
} catch(InterruptedException e) {
}
}
}
}
Timer的原理主要是实现一个小顶堆的TaskQueue任务队列,然后通过TimerThread不断获取队列第一个任务的执行时间与当前时间进行对比,如果时间到了先看看这个任务是不是周期性执行的任务,如果是则修改当前任务时间为下次执行的时间,如果不是周期性任务则将任务从优先队列中移除。最后执行任务。如果时间还未到则调用 wait() 等待。
Timer属于Java比较早期的一个定时调度的工具类,由其源码分析还有以下几点不足