接上篇 https://pushkin.blog.csdn.net/article/details/127823364?spm=1001.2014.3001.5502,继续来探究下java另一个定时调度的框架
ScheduledThreadPoolExecutor (JDK1.5版本引进的JUC下定时任务实现类),相比于Timer,ScheduledThreadPoolExecutor 其实就是多线程版的Timer,主要解决的就是多任务执行相互影响的问题。
先直接来用ScheduledThreadPoolExecutor 实现一个定时调度需求的Demo
ScheduledThreadPoolExecutor 主要的调度方法如下:
实现一个指定延时执行 + 周期调度 + 指定时间停止的demo
@Slf4j
public class ScheduledThreadPoolExecutorDemo {
public static void main(String[] args) {
log.info("主线程{} - 开始...", Thread.currentThread().getName());
// 任务1
Job job1 = new Job(1);
// 任务2
Job job2 = new Job(2);
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(10);
RunnableScheduledFuture<?> callJob1 = (RunnableScheduledFuture<?>)scheduledThreadPoolExecutor.scheduleWithFixedDelay(job1, 2, 5, TimeUnit.SECONDS);
RunnableScheduledFuture<?> callJob2 = (RunnableScheduledFuture<?>)
scheduledThreadPoolExecutor.scheduleWithFixedDelay(job2, 2, 5, TimeUnit.SECONDS);
Runnable removeJob = ()->{
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
scheduledThreadPoolExecutor.remove(callJob1);
log.info("{} - 移除job1", Thread.currentThread().getName());
};
Thread removeJobThread = new Thread(removeJob);
removeJobThread.start();
log.info("主线程{} - 结束...", Thread.currentThread().getName());
}
static class Job implements Runnable{
private final int id;
public Job(int id) {
this.id = id;
}
@Override
public void run() {
log.info("{} - 开始 job{} 任务 >>>>>>>>>>>", id, Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("<<<<<<<<<<<<< job{} 任务完成 - {} ...", id, Thread.currentThread().getName());
}
}
}
实现:指定延迟2s后,以5s作为周期进行定时调度执行,然后指定10s后移除任务1(模仿取消任务)
ScheduledThreadPoolExecutor的使用也比较简单,从日志中也能看到不能的任务分配给了不同的线程进行异步去执行,任务之间消除了之前Timer中的单线程引起的任务。
21:48:45.680 [main] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 主线程main - 开始...
21:48:45.686 [main] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 主线程main - 结束...
21:48:47.693 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 1 - 开始 jobpool-1-thread-1 任务 >>>>>>>>>>>
21:48:47.693 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-2 任务 >>>>>>>>>>>
21:48:48.709 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job1 任务完成 - pool-1-thread-1 ...
21:48:48.709 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-2 ...
21:48:53.725 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 1 - 开始 jobpool-1-thread-2 任务 >>>>>>>>>>>
21:48:53.725 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-1 任务 >>>>>>>>>>>
21:48:54.738 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job1 任务完成 - pool-1-thread-2 ...
21:48:54.738 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-1 ...
21:48:55.701 [Thread-0] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - Thread-0 - 移除job1
21:48:59.741 [pool-1-thread-4] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-4 任务 >>>>>>>>>>>
21:49:00.745 [pool-1-thread-4] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-4 ...
21:49:05.756 [pool-1-thread-3] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-3 任务 >>>>>>>>>>>
21:49:06.770 [pool-1-thread-3] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-3 ...
21:49:11.783 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-2 任务 >>>>>>>>>>>
21:49:12.789 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-2 ...
21:49:17.792 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-1 任务 >>>>>>>>>>>
21:49:18.795 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-1 ...
21:49:23.798 [pool-1-thread-5] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-5 任务 >>>>>>>>>>>
21:49:24.805 [pool-1-thread-5] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-5 ...
代码的设计上应该和Timer大差不差,无非应该就是多加了个线程池。我们来看下ScheduledThreadPoolExecutor的实现设计。
ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,实现了 ScheduledExecutorService。除了正常的线程池功能以外。ScheduledThreadPoolExecutor 调度功能的实现主要就由 ScheduledFutureTask 与DelayedWorkQueue这两个内部类了。
ScheduledFutureTask内部类继承了FutureTask,实现了Runnable,以此实现了可回调结果的任务线程,同时还实现了Delayed接口(getDelay方法用于返回距离下次任务执行时间的时间间隔)
DelayedWorkQueue内部类就是一个阻塞队列,不过自己实现了类似Timer里面的基于任务执行时间动态排序的优先级队列。
ScheduledThreadPoolExecutor 这块核心除了线程池,其实就是内部优先级队列的实现了
其主要属性如下:
其中:
看到属性和方法大致应该都能猜到其含义与功能,这里重点看下leader线程与队列的任务获取;添加;移除;排序的过程:
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 自循环,实现对队列的监控 保证返回根节点
for (;;) {
// 获取根节点任务
RunnableScheduledFuture<?> first = queue[0];
// 如果队列为空,则通知其他线程等待
if (first == null)
available.await();
else {
// 获取根节点任务等待时间与系统时间的差值
long delay = first.getDelay(NANOSECONDS);
// 如果等待时间已经到,则返回根节点任务并重排序队列
if (delay <= 0)
return finishPoll(first);
// 如果等待时间还没有到,则继续等待且不拥有任务的引用
first = null; // don't retain ref while waiting
// 如果此时等待根节点的leader线程不为空则通知其他线程继续等待
if (leader != null)
available.await();
else {
// 如果此时leader线程为空,则把当前线程置为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 当前线程等待延迟的时间
available.awaitNanos(delay);
} finally {
// 延迟时间已到 则把当前线程变成非leader线程
// 当前线程继续用于执行for循环的逻辑
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader为null 则唤醒一个线程成为leader
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取任务x的heapIndex
int i = indexOf(x);
// 如果为-1 则表明不在队列内
if (i < 0)
return false;
// 设置删除任务的heapIndex为-1
setIndex(queue[i], -1);
// 队列深度-1
int s = --size;
// 获取队列末尾的节点任务
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
// 如果删除的任务节点不是末尾的节点,则重排序
if (s != i) {
siftDown(i, replacement);
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
其实根Timer优先级队列排序一样,还是数组堆排序,复杂度 o(logn)
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// 取队列当前深度的一半 相当于size / 2
int half = size >>> 1;
// 索引k(初值为0)的值大于half时 退出循环
while (k < half) {
// 获取左节点的索引
int child = (k << 1) + 1;
// 获取左节点的任务
RunnableScheduledFuture<?> c = queue[child];
// 获取右节点的索引
int right = child + 1;
// 如果右节点在范围内 且 左节点大于右节点,
if (right < size && c.compareTo(queue[right]) > 0)
// 更新child的值为右节点索引值 且更新c为右节点的任务
c = queue[child = right];
// 如果任务key小于任务c 则退出循环(最小堆)
if (key.compareTo(c) <= 0)
break;
// 否则把任务c放到k上(较小的任务放到父节点上)
queue[k] = c;
// 设置任务c的堆索引
setIndex(c, k);
// 更新k的值为child
k = child;
}
// 任务key插入k的位置
queue[k] = key;
// 设置任务key的堆索引k
setIndex(key, k);
}
ScheduledThreadPoolExecutor 与Timer的实现流程差不多,都是维护一个优先级队列来获取最早需要执行的任务,然后通过线程进行消费,区别就是ScheduledThreadPoolExecutor 实现多线程去运行任务,任务之间不会相互影响。其中周期性执行都是通过重置任务下一次执行时间来完成的。
同时需要注意的是,使用ScheduledThreadPoolExecutor 去执行任务,由于使用的是多线程,默认当任务执行出错,不会发生像Timer一样某个任务异常导致整体的失败 (所以注意使用ScheduledThreadPoolExecutor 去调度的任务,重写任务的业务时,记得自己捕捉好异常进行处理,否则程序不会报任何的错误)
其中不管是Timer,还是ScheduledThreadPoolExecutor ,其优先级队列任务添加、删除复杂度还是O(logn), 这在需要操作调度大量任务的一些框架将成为其性能瓶颈。下一篇我们在来看下时间轮算法是如何优化这个问题的。