• 12 多线程与高并发 - ScheduledThreadPoolExecutor 源码解析


    ScheduledThreadPoolExecutor 介绍

    ScheduledThreadPoolExecutor 是 ThreadPoolExecutor 的一个子类,在线程池的基础上实现了延迟执行任务以及周期性执行任务的功能。

    简单使用

    public static void main(String[] args) throws InterruptedException {
        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
    
        //1. execute
        // 和普通线程池执行一样
        executor.execute(() -> {
            System.out.println("execute");
        });
    
        //2. schedule
        // 指定延迟时间,一次性执行任务
        executor.schedule(() -> {
            System.out.println("schedule");
        },2000,TimeUnit.MILLISECONDS);
    
        //3. AtFixedRate
        // 周期性执行任务(周期时间:执行时间和延迟时间的最大值)
        executor.scheduleAtFixedRate(() -> {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("at:" + System.currentTimeMillis());
        },3000,2000,TimeUnit.MILLISECONDS);
    
        //4. WithFixedDelay
        // 周期性执行任务(周期时间:执行时间 + 延迟时间)
        executor.scheduleWithFixedDelay(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("with:" + System.currentTimeMillis());
        },3000,2000,TimeUnit.MILLISECONDS);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    核心内容

    在这里插入图片描述

    ScheduledFutureTask - 任务

    ScheduledFutureTask 实现了 RunnableScheduledFuture 接口,间接的实现了 Delayed 接口,让任务可以放到延迟队列中,并且基于二叉堆做排序

    private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
            /** 计数器,每个任务都有一个全局唯一的序号
            	如果任务的执行时间一模一样,比对sequenceNumber */
            private final long sequenceNumber;
    
            /** 任务执行的时间,单位是纳秒 */
            private long time;
    
            /* 
            * period == 0:表示一次性执行的任务 
            * period > 0:表示使用的是 scheduleAtFixedRate
            * period < 0:表示使用的是 scheduleWithFixedDelay 
            * */
            private final long period;
    
            /** 周期性执行任务时,引用具体任务,方便后面重新扔到阻塞队列 */
            RunnableScheduledFuture<V> outerTask = this;
    
            /**
             * Index into delay queue, to support faster cancellation.
             */
            int heapIndex;
    		// 实现Delayed接口重写的方法,执行时间
            public long getDelay(TimeUnit unit) {
                return unit.convert(time - now(), NANOSECONDS);
            }
    		// 实现Delayed接口重写的方法,比较方式
            public int compareTo(Delayed other) {
                if (other == this) // compare zero if same object
                    return 0;
                if (other instanceof ScheduledFutureTask) {
                    ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                    long diff = time - x.time;
                    if (diff < 0)
                        return -1;
                    else if (diff > 0)
                        return 1;
                    else if (sequenceNumber < x.sequenceNumber)
                        return -1;
                    else
                        return 1;
                }
                long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
                return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    DelayedWorkQueue - 队列

    DelayedWorkQueue 包装了 RunnableScheduledFuture[]

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
            implements BlockingQueue<Runnable> {
    		// 初始长度
            private static final int INITIAL_CAPACITY = 16;
            // 数组
            private RunnableScheduledFuture<?>[] queue =
                new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
            // 锁
            private final ReentrantLock lock = new ReentrantLock();
            // 长度
            private int size = 0;
    		// 等待拿堆顶数据的线程
            private Thread leader = null;
    		// Condition 队列
            private final Condition available = lock.newCondition();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    源码分析

    execute()

    直接调用 schedule(),入参 delay = 0

    public void execute(Runnable command) {
            schedule(command, 0, NANOSECONDS);
        }
    
    • 1
    • 2
    • 3

    schedule()

    延迟一段时间,执行一次任务

    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay,
                                               TimeUnit unit) {
            // 非空校验
            if (callable == null || unit == null)
                throw new NullPointerException();
            // 将任务封装成 ScheduledFutureTask 
            RunnableScheduledFuture<V> t = decorateTask(callable,
                new ScheduledFutureTask<V>(callable,
                                           triggerTime(delay, unit)));
            // 延迟执行任务                               
            delayedExecute(t);
            return t;
        }
    
    
    	// 返回当前任务要执行的系统时间
        private long triggerTime(long delay, TimeUnit unit) {
            return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
        }
        long triggerTime(long delay) {
            return now() +
                ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    delayedExecute() 延迟执行任务

    private void delayedExecute(RunnableScheduledFuture<?> task) {
    		// 线程池状态不是RUNNING,就拒绝任务
            if (isShutdown())
                reject(task);
            else {
    			// 将任务放入延迟队列中(二叉堆)
                super.getQueue().add(task);
    			// 1.线程池状态
    			// 2.根据策略决定是否能执行
    			// 3.将任务从延迟队列移除
                if (isShutdown() &&
                    !canRunInCurrentRunState(task.isPeriodic()) &&
                    remove(task))
                    task.cancel(false);
                else
    				// 是否需要创建线程
                    ensurePrestart();
            }
        }
    	
    	
    	
    // periodic - true:代表是周期性执行的任务
    // periodic - false:代表是一次性的延迟任务
    boolean canRunInCurrentRunState(boolean periodic) {
    	// 默认情况下,如果任务扔到了延迟队列中,有两个策略
        // 如果任务是周期性执行的,默认为false(continueExistingPeriodicTasksAfterShutdown)
        // 如果任务是一次性的延迟任务,默认为true(executeExistingDelayedTasksAfterShutdown)
    	
    	// 此时,周期性任务返回false,一次性任务返回true
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :
                                   executeExistingDelayedTasksAfterShutdown);
        
    }
    
    // 判断当前任务到底执行不执行
    final boolean isRunningOrShutdown(boolean shutdownOK) {
        // 重新拿到线程池的ctl
        int rs = runStateOf(ctl.get());
        // 如果线程池是RUNNING,返回true
        // 如果线程池状态是SHUTDOWN,那么就配合策略返回true、false
        return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
    }
    
    
    
    // 是否需要创建线程
    void ensurePrestart() {
        // 获取线程池中的工作线程个数。
        int wc = workerCountOf(ctl.get());
        // 如果工作线程个数,小于核心线程数,
        if (wc < corePoolSize)
            // 创建核心线程,一致在阻塞队列的位置take,等待拿任务执行
            addWorker(null, true);
        // 如果工作线程数不小于核心线程,但是值为0,创建非核心线程执行任务
        else if (wc == 0)
            // 创建非核心线程处理阻塞队列任务,而且只要阻塞队列没有任务了,当前线程立即销毁
            addWorker(null, false);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    scheduleAtFixedRate()

    scheduleAtFixedRate 在包装 ScheduledFutureTask 时会将 period 设置为正数,代表固定周期执行

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            // 将任务设置给outerTask属性,方便后期重新扔到延迟队列
            sft.outerTask = t;
            // 延迟执行任务
            delayedExecute(t);
            return t;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    scheduleWithFixedDelay()

    scheduleWithFixedDelay 在包装 ScheduledFutureTask 时会将 period 设置为负数,代表在执行任务完毕后,再计算下次执行的时间

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (delay <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(-delay));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            // 将任务设置给outerTask属性,方便后期重新扔到延迟队列
            sft.outerTask = t;
            // 延迟执行任务
            delayedExecute(t);
            return t;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    run()

    执行addWorker方法,会创建一个工作线程,工作线程在创建成功后,会执行start方法。在start方法执行后,会调用Worker的run方法,最终执行了runWorker方法,在runWorker方法中会在阻塞队列的位置执行take方法一直阻塞拿Runnable任务,拿到任务后就返回,然后执行。

    // 执行任务
    public void run() {
        // 获取任务是否是周期执行
        // true:周期执行
        // false:一次的延迟执行
        boolean periodic = isPeriodic();
        // 再次判断线程池状态是否不是RUNNING,如果不是RUNNING,并且SHUTDOWN情况也不允许执行,或者是STOP状态
        if (!canRunInCurrentRunState(periodic))
            // 取消任务
            cancel(false);
        else if (!periodic)
            // 当前任务是一次性的延迟执行。执行任务具体的run方法
            ScheduledFutureTask.super.run();
            // 周期性任务
        else if (ScheduledFutureTask.super.runAndReset()) {
        			// 计算下次任务运行时间
                    setNextRunTime();
                    // 重新将任务扔到延迟队列中
                    reExecutePeriodic(outerTask);
                }
    }
    
    
    // 计算任务下次执行时间,time是任务执行的时间,而这里是time的上次的执行时间
    private void setNextRunTime() {
        // 拿到当前任务的period
        long p = period;
        // period > 0:At
        if (p > 0)
            // 直接拿上次执行的时间,添加上周期时间,来计算下次执行的时间。
            time = time + p;
        else
            // period < 0:With
            // 任务执行完,拿当前系统时间计算下次执行的时间点
            time = now() + p;
    }
    
    // 重新将任务扔到延迟队列中
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        // 线程池状态的判断
        if (canRunInCurrentRunState(true)) {
            // 将任务扔到了延迟队列中
            super.getQueue().add(task);
            // 扔到延迟队列后,再次判断线程池状态,是否需要取消任务!
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                // 是否需要创建线程
                ensurePrestart();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
  • 相关阅读:
    软考高级系统架构设计师系列论文二十三:论数据中心集中存储架构
    有上下界的最小(最大)流| INIT: up[][]为容量上界; low[][]为容量下界;
    SOME/IP 协议介绍(五)指南
    2022-2028年全球与中国太阳能光伏消费产品市场现状及未来发展趋势分析报告
    网络编程套接字之二【UDP】
    范畴论(Category Theory)的基本介绍
    nginx基础篇
    离谱的 CSS!从表盘刻度到艺术剪纸
    华为认证大数据工程师(HCIA-Big Data)--填空题
    K8s自动化集群环境搭建
  • 原文地址:https://blog.csdn.net/qq_33512765/article/details/126498917