• 任务调度之ScheduledThreadPoolExecutor源码分析


    1. 概述

    接上篇 https://pushkin.blog.csdn.net/article/details/127823364?spm=1001.2014.3001.5502,继续来探究下java另一个定时调度的框架
    ScheduledThreadPoolExecutor (JDK1.5版本引进的JUC下定时任务实现类),相比于Timer,ScheduledThreadPoolExecutor 其实就是多线程版的Timer,主要解决的就是多任务执行相互影响的问题。

    2. 案例

    先直接来用ScheduledThreadPoolExecutor 实现一个定时调度需求的Demo
    ScheduledThreadPoolExecutor 主要的调度方法如下:

    在这里插入图片描述

    2.1 需求

    实现一个指定延时执行 + 周期调度 + 指定时间停止的demo

    2.2 Demo

    @Slf4j
    public class ScheduledThreadPoolExecutorDemo {
    
        public static void main(String[] args) {
            log.info("主线程{} - 开始...", Thread.currentThread().getName());
            // 任务1
            Job job1 = new Job(1);
            // 任务2
            Job job2 = new Job(2);
    
            ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(10);
            RunnableScheduledFuture<?> callJob1 = (RunnableScheduledFuture<?>)scheduledThreadPoolExecutor.scheduleWithFixedDelay(job1, 2, 5, TimeUnit.SECONDS);
    
            RunnableScheduledFuture<?> callJob2 = (RunnableScheduledFuture<?>)
                    scheduledThreadPoolExecutor.scheduleWithFixedDelay(job2, 2, 5, TimeUnit.SECONDS);
    
            Runnable removeJob = ()->{
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                scheduledThreadPoolExecutor.remove(callJob1);
                log.info("{} - 移除job1", Thread.currentThread().getName());
            };
    
            Thread removeJobThread = new Thread(removeJob);
            removeJobThread.start();
    
    
            log.info("主线程{} - 结束...", Thread.currentThread().getName());
        }
    
        static class Job implements Runnable{
            private final int id;
    
            public Job(int id) {
                this.id = id;
            }
    
            @Override
            public void run() {
                log.info("{} - 开始 job{} 任务 >>>>>>>>>>>", id, Thread.currentThread().getName());
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("<<<<<<<<<<<<< job{} 任务完成 - {} ...", id, Thread.currentThread().getName());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    2.3 运行结果

    实现:指定延迟2s后,以5s作为周期进行定时调度执行,然后指定10s后移除任务1(模仿取消任务)
    ScheduledThreadPoolExecutor的使用也比较简单,从日志中也能看到不能的任务分配给了不同的线程进行异步去执行,任务之间消除了之前Timer中的单线程引起的任务。

    21:48:45.680 [main] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 主线程main - 开始...
    21:48:45.686 [main] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 主线程main - 结束...
    21:48:47.693 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 1 - 开始 jobpool-1-thread-1 任务 >>>>>>>>>>>
    21:48:47.693 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-2 任务 >>>>>>>>>>>
    21:48:48.709 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job1 任务完成 - pool-1-thread-1 ...
    21:48:48.709 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-2 ...
    21:48:53.725 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 1 - 开始 jobpool-1-thread-2 任务 >>>>>>>>>>>
    21:48:53.725 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-1 任务 >>>>>>>>>>>
    21:48:54.738 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job1 任务完成 - pool-1-thread-2 ...
    21:48:54.738 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-1 ...
    21:48:55.701 [Thread-0] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - Thread-0 - 移除job1
    21:48:59.741 [pool-1-thread-4] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-4 任务 >>>>>>>>>>>
    21:49:00.745 [pool-1-thread-4] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-4 ...
    21:49:05.756 [pool-1-thread-3] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-3 任务 >>>>>>>>>>>
    21:49:06.770 [pool-1-thread-3] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-3 ...
    21:49:11.783 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-2 任务 >>>>>>>>>>>
    21:49:12.789 [pool-1-thread-2] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-2 ...
    21:49:17.792 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-1 任务 >>>>>>>>>>>
    21:49:18.795 [pool-1-thread-1] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-1 ...
    21:49:23.798 [pool-1-thread-5] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - 2 - 开始 jobpool-1-thread-5 任务 >>>>>>>>>>>
    21:49:24.805 [pool-1-thread-5] INFO com.example.demo202206.schedule.timer.ScheduledThreadPoolExecutorDemo - <<<<<<<<<<<<< job2 任务完成 - pool-1-thread-5 ...
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    3. 源码探究

    代码的设计上应该和Timer大差不差,无非应该就是多加了个线程池。我们来看下ScheduledThreadPoolExecutor的实现设计。
    在这里插入图片描述

    ScheduledThreadPoolExecutor 继承了 ThreadPoolExecutor,实现了 ScheduledExecutorService。除了正常的线程池功能以外。ScheduledThreadPoolExecutor 调度功能的实现主要就由 ScheduledFutureTask 与DelayedWorkQueue这两个内部类了。

    ScheduledFutureTask内部类继承了FutureTask,实现了Runnable,以此实现了可回调结果的任务线程,同时还实现了Delayed接口(getDelay方法用于返回距离下次任务执行时间的时间间隔)

    DelayedWorkQueue内部类就是一个阻塞队列,不过自己实现了类似Timer里面的基于任务执行时间动态排序的优先级队列。

    ScheduledThreadPoolExecutor 这块核心除了线程池,其实就是内部优先级队列的实现了

    3.1 DelayedWorkQueue

    其主要属性如下:
    在这里插入图片描述

    其中:

    在这里插入图片描述

    看到属性和方法大致应该都能猜到其含义与功能,这里重点看下leader线程与队列的任务获取;添加;移除;排序的过程:

    take() 等待获取任务

    public RunnableScheduledFuture<?> take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
    	    // 自循环,实现对队列的监控 保证返回根节点
            for (;;) {
    			// 获取根节点任务
                RunnableScheduledFuture<?> first = queue[0];
    			// 如果队列为空,则通知其他线程等待
                if (first == null)
                    available.await();
                else {
    				// 获取根节点任务等待时间与系统时间的差值
                    long delay = first.getDelay(NANOSECONDS);
    				// 如果等待时间已经到,则返回根节点任务并重排序队列
                    if (delay <= 0)
                        return finishPoll(first);
    				// 如果等待时间还没有到,则继续等待且不拥有任务的引用
                    first = null; // don't retain ref while waiting
    				// 如果此时等待根节点的leader线程不为空则通知其他线程继续等待
                    if (leader != null)
                        available.await();
                    else {
    					// 如果此时leader线程为空,则把当前线程置为leader
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
    						// 当前线程等待延迟的时间
                            available.awaitNanos(delay); 
                        } finally {
    						// 延迟时间已到 则把当前线程变成非leader线程
    						// 当前线程继续用于执行for循环的逻辑
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
    } finally {
    	// 如果leader为null 则唤醒一个线程成为leader
            if (leader == null && queue[0] != null)
                available.signal();
            lock.unlock();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    remove() 移除任务

    public boolean remove(Object x) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
    	    // 获取任务x的heapIndex
            int i = indexOf(x);
    		// 如果为-1 则表明不在队列内
            if (i < 0)
                return false;
    
    		// 设置删除任务的heapIndex为-1
            setIndex(queue[i], -1);
    		// 队列深度-1
            int s = --size;
    		// 获取队列末尾的节点任务
            RunnableScheduledFuture<?> replacement = queue[s];
            queue[s] = null;
    		// 如果删除的任务节点不是末尾的节点,则重排序
            if (s != i) {
                siftDown(i, replacement);
                if (queue[i] == replacement)
                    siftUp(i, replacement);
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    siftDown(int,RunnableScheduledFuture)-移除元素后重排序

    其实根Timer优先级队列排序一样,还是数组堆排序,复杂度 o(logn)

    private void siftDown(int k, RunnableScheduledFuture<?> key) {
    	// 取队列当前深度的一半 相当于size / 2
        int half = size >>> 1;
        // 索引k(初值为0)的值大于half时 退出循环
        while (k < half) {
    	// 获取左节点的索引
            int child = (k << 1) + 1;
    		// 获取左节点的任务
            RunnableScheduledFuture<?> c = queue[child];
    		// 获取右节点的索引
            int right = child + 1;
    		// 如果右节点在范围内 且 左节点大于右节点,
            if (right < size && c.compareTo(queue[right]) > 0)
    			// 更新child的值为右节点索引值 且更新c为右节点的任务
                c = queue[child = right];
    		// 如果任务key小于任务c 则退出循环(最小堆)
            if (key.compareTo(c) <= 0)
                break;
    		// 否则把任务c放到k上(较小的任务放到父节点上)
            queue[k] = c;
    		// 设置任务c的堆索引
            setIndex(c, k);
    		// 更新k的值为child
            k = child;
        }
        // 任务key插入k的位置
        queue[k] = key;
        // 设置任务key的堆索引k
        setIndex(key, k);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    4. 总结

    ScheduledThreadPoolExecutor 与Timer的实现流程差不多,都是维护一个优先级队列来获取最早需要执行的任务,然后通过线程进行消费,区别就是ScheduledThreadPoolExecutor 实现多线程去运行任务,任务之间不会相互影响。其中周期性执行都是通过重置任务下一次执行时间来完成的。

    同时需要注意的是,使用ScheduledThreadPoolExecutor 去执行任务,由于使用的是多线程,默认当任务执行出错,不会发生像Timer一样某个任务异常导致整体的失败 (所以注意使用ScheduledThreadPoolExecutor 去调度的任务,重写任务的业务时,记得自己捕捉好异常进行处理,否则程序不会报任何的错误)

    其中不管是Timer,还是ScheduledThreadPoolExecutor ,其优先级队列任务添加、删除复杂度还是O(logn), 这在需要操作调度大量任务的一些框架将成为其性能瓶颈。下一篇我们在来看下时间轮算法是如何优化这个问题的。

  • 相关阅读:
    什么是容错性(Fault Tolerance)?解释容错性的实现方法
    nginx的使用配置
    招投标系统简介 企业电子招投标采购系统源码之电子招投标系统 —降低企业采购成本
    MyBatis分页插件
    Spring Security(6)
    如何进行跨平台开发和移植性处理?
    后台日志打印配置
    Open CASCADE学习|一个点的坐标变换
    (课程笔记)C++后端知识点大纲
    Linux进程概念(一)
  • 原文地址:https://blog.csdn.net/qq_31557939/article/details/127827181