• 【并发编程】ThreadPoolExecutor任务提交与停止流程及底层实现【新手探索版】



    测试代码:

    // 创建只含有单个线程的线程池
    ExecutorService executor = Executors.newSingleThreadExecutor();
    // 提交任务
    executor.submit(()->{
        System.out.println("C");
    });
    // 优雅停机
    executor.shutdown();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1. ThreadPoolExecutor任务提交

    我们看submit方法

    // AbstractExecutorService
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • newTaskFor:把Runnable构建为FutureTask任务
    • execute:执行任务
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        // 1、线程数量小于核心线程,则添加Worker
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 2、插入任务到队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 3、队列也满了,则添加非核心work
        else if (!addWorker(command, false))
            // 4、添加非核心Worker失败则"拒绝"
            reject(command);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    从以上可以看到线程池的处理原则是:核心线程数 > 队列 > 非核心线程 > 拒绝策略

    1、先添加核心线程来处理

    2、核心线程不够,则添加到队列中

    3、队列也不够,则添加“非核心线程”

    4、非核心线程也不够,则执行拒绝策略

    2. 线程池状态[这部分是难点呀]

    public class ThreadPoolExecutor extends AbstractExecutorService {
        // 核心变量:该变量前3位表示"状态",后29为表示线程数量
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
        
        // 该变量控制ctl变量用多少bit位表示状态【bit位的分界线】
        private static final int COUNT_BITS = Integer.SIZE - 3;
        
        // 表示线程的最大容量【即0001 1111 1111...】
        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
        
        // 以下变量用前3位控制线程池的状态
        private static final int RUNNING    = -1 << COUNT_BITS; // [即111]
        private static final int SHUTDOWN   =  0 << COUNT_BITS; // [即000]
        private static final int STOP       =  1 << COUNT_BITS; // [即001]
        private static final int TIDYING    =  2 << COUNT_BITS; // [即010]
        private static final int TERMINATED =  3 << COUNT_BITS; // [即011]
        
        // 计算ctl变量的前3位,表示“状态”
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        // 计算ctl变量的后29位,表示"工作线程数量"
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • ctl:是control(控制)的缩写。是线程池的最核心变量,该int变量即包含状态也包含线程的数量
    • COUNT_BITS:该变量ctl变量bit位的分界线
    • CAPACITY:表示线程池最大线程数量
    • 各种状态:用ctl的前3个bit位表示状态
    • runStateOf、workerCountOf方法:计算状态、线程数量

    2.1. addWorker添加worker线程

    private boolean addWorker(Runnable firstTask, boolean core) {
        // <1>
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取状态
            int rs = runStateOf(c);
    
            // <1.1> 状态判断
            if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //增加c变量的线程数
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 一旦状态改变,则需要返回外层循环重新判断状态
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        // <2>
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask); // 创建Worker
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();// 添加"工作线程需要上锁"
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动工作线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    addWorker代码分2个部分

    • 维护ctl变量(在<1>部分)

    用cas给ctl变量表示线程数量的位置值加一。

    <1.1>处,状态判断我是感觉写法挺烧脑子的。

    if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
        return false;
    
    • 1
    • 2

    要使整体条件成立则rs >= SHUTDOWN成立且后面括号的判断为false。后面括号分3种情况:

    1、如果rs == SHUTDOWN为false(隐含条件有:rs >= SHUTDOWN成立。

    rs == SHUTDOWN,则rs可能得值有stop、tidying、terminated其中一个,即线程池已经终止,则表示终止状态下不必添加Worker线程。

    2、如果firstTask == null为false(隐含的条件有:rs >= SHUTDOWN,rs == SHUTDOWN成立。

    线程池是shutdown状态,且firstTask == null为false,则表示shutdown状态下不必添加Worker线程。

    3、如果workQueue.isEmpty()为true(隐含条件:rs=SHUTDOWN,firstTask == null成立。

    线程池是shutdown状态,且提交的任务也是null,且队列是空的,则不必添加Worker线程。【如果队列不是空的还是有必要添加Worker?】

    备注:第3个条件真的有必要吗???因为workQueue某些任务挺耗时,添加添加Worker是为了加快workQueue中任务的处理?

    如此复杂的条件判断的目的:特定条件下即使shutdown还是会添加Worker线程,并不是只要shutdown就不必添加Worker线程。

    批判:完全没有必要把条件搞这么复杂,只能说作者太追求极致了。完全可以只要状态>=shutdown就返回false。而且firstTask=null怎么会发生呢,这个方法是private啊。

    极致的条件是什么:①rs>=shutdown且②rs=shutdown且③firstTask=null且④workQueue非空,即这些条件都满足时还是可以添加Worker线程的。

    • 添加启动线程(在<2>部分)

    1、创建Worker

    firstTask变量表示Worker中的第一个任务

    每一个Worker表示一个工作线程。

    2、添加Worker到workers

    由线程池的workers变量统一维护所有线程

    3、添加线程的过程需要“上锁”,添加完线程之后调用t.start方法启动线程

    2.2. 内部类Worker

    /**
     * Worker类大体上管理着运行线程的中断状态 和 一些指标
     * Worker类投机取巧的继承了AbstractQueuedSynchronizer来简化在执行任务时的获取、释放锁
     * 这样防止了中断在运行中的任务,只会唤醒(中断)在等待从workQueue中获取任务的线程
     * 解释:
     *   为什么不直接执行execute(command)提交的command,而要在外面包一层Worker呢??
     *   主要是为了控制中断....重点主要原因啦
     *   用什么控制??
     *   用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁
     *   只有在等待从workQueue中获取任务getTask()时才能中断
     *
     * worker实现了一个简单的不可重入的互斥锁,而不是用ReentrantLock可重入锁
     * 因为我们不想让在调用比如setCorePoolSize()这种线程池控制方法时可以再次获取锁(重入)
     * 解释:
     *   setCorePoolSize()时可能会interruptIdleWorkers(),在对一个线程interrupt时会要w.tryLock()
     *   如果可重入,就可能会在对线程池操作的方法中中断线程,类似方法还有:
     *   setMaximumPoolSize()
     *   setKeppAliveTime()
     *   allowCoreThreadTimeOut()
     *   shutdown()
     * 此外,为了让线程真正开始后才可以中断,初始化lock状态为负值(-1),在开始runWorker()时将state置为0,而state>=0才可以中断
     * 
     * Worker继承了AQS,实现了Runnable,说明其既是一个可运行的任务,也是一把锁(不可重入)
     */
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
        final Thread thread; //利用ThreadFactory和 Worker这个Runnable创建的线程对象
         
        Runnable firstTask;
         
        volatile long completedTasks;
     
        Worker(Runnable firstTask) {
            //设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
            setState(-1);
            // 在调用runWorker()前,禁止interrupt中断,
            //在interruptIfStarted()方法中会判断 getState()>=0
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this); 
            //根据当前worker创建一个线程对象
           //当前worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,
           //而是调用当前worker.run()时调用firstTask.run()
        }
     
        public void run() {
            runWorker(this); 
            //runWorker()是ThreadPoolExecutor的方法
        }
     
        // Lock methods
        // The value 0 represents the unlocked state. 0代表“没被锁定”状态
        // The value 1 represents the locked state. 1代表“锁定”状态
     
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
     
        /**
         * 尝试获取锁
         * 重写AQS的tryAcquire(),AQS本来就是让子类来实现的
         */
        protected boolean tryAcquire(int unused) {
            //尝试一次将state从0设置为1,即“锁定”状态,
            //但由于每次都是state 0->1,而不是+1,那么说明不可重入
            //且state==-1时也不会获取到锁
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread()); 
                //设置exclusiveOwnerThread=当前线程
                return true;
            }
            return false;
        }
     
        /**
         * 尝试释放锁
         * 不是state-1,而是置为0
         */
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null); 
            setState(0);
            return true;
        }
     
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
     
        /**
         * 中断(如果运行)
         * shutdownNow时会循环对worker线程执行
         * 且不需要获取worker锁,即使在worker运行时也可以中断
         */
        void interruptIfStarted() {
            Thread t;
            //如果state>=0、t!=null、且t没有被中断
            //new Worker()时state==-1,说明不能中断
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    Worker类本身既实现了Runnable,又继承了AbstractQueuedSynchronizer,所以其既是一个可执行的任务,又可以达到锁的效果

    主要是几个问题:

    • 为什么会有一把锁

    肯定不是为了控制并发的,因为只有一个线程。主要是为了控制中断。用AQS锁,当运行时上锁,就不能中断,TreadPoolExecutor的shutdown()方法中断前都要获取worker锁,只有在等待从workQueue中获取任务getTask()时才能中断

    • 控制中断体现在哪里

    1、初始AQS状态为-1,此时不允许中断interrupt(),只有在worker线程启动了,执行了runWoker(),将state置为0,才能中断

    不允许中断体现在:

    1. shutdown()线程池时,会对每个worker tryLock()上锁,而Worker类这个AQS的tryAcquire()方法是固定将state从0->1,故初始状态state=-1时tryLock()失败,没办法中断interrupt()
    2. shutdownNow()线程池时,不用tryLock()上锁,但调用worker.interruptIfStarted()终止worker,interruptIfStarted()也有state>=0才能interrupt的逻辑,而初始state=-1

    2、为了防止某种情况下,在运行中的worker被中断,runWorker()每次运行任务时都会lock()上锁,而shutdown()这类可能会终止worker的操作需要先获取worker的锁,这样就防止了中断正在运行的线程。Worker实现的AQS为不可重入锁,为了是在获得worker锁的情况下再进入其它一些需要加锁的方法

    • 有没有响应中断

    很明显ThreadPoolExecutor没有响应中断(即没有检测和处理中断)。没有响应中断与控制中断是两回事。至于为什么要多搞一层Work来控制中断作者也不清楚。

    • 没有中断、 没有锁行不行

    个人猜测:如果只是我们自己写代码完全可以,但这是大神写的代码,不可以

    • 为什么要自己继承AQS,而不是使用ReentrantLock

    ReentrantLock是可重入的,而ThreadPoolExecute要求不可重入

    2.3. runWorker():执行任务

    看一下Worker如何执行任务的

    // java.util.concurrent.ThreadPoolExecutor.Worker
    public void run() {
        runWorker(this);
    }
    
    • 1
    • 2
    • 3
    • 4
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // <1>
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // <2> 获取任务【重点】
            while (task != null || (task = getTask()) != null) {
                // <3> 上锁,不是为了防止并发执行任务,为了在shutdown()时不终止正在运行的worker
                w.lock();
                // <4> 极端判断条件是:由Running或shutdown状态,突然变成stop状态
                if ((runStateAtLeast(ctl.get(), STOP) || 
                     (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                    && !wt.isInterrupted())
                    wt.interrupt();
                // <5>
                try {
                    // <5.1>
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // <5.2>
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // <5.3>
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // <6> completedAbruptly变量表示是否是"暴力结束线程",执行到此处说明线程是因为taskTask返回为null
            completedAbruptly = false;
        } finally {
            // <7> worker线程的退出【重点】
            processWorkerExit(w, completedAbruptly);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • <1>

    在创建Worker时添加的任务为第一个任务(firstTask)

    • <2>

    调用getTaskworkQueue取出任务。详情请看getTask方法详解。

    • <3>处,为什么要上锁

    是为了控制中断,是控制,控制,不是响应中断,线程池没有响应中断。

    • <4>

    此时的极端条件是什么:由Running或shutdown状态,突然变成stop状态

    1、如果状态达到stop级别,必须要对worker中断。

    2、可能暂时没有达到stop级别,但是别处立马调用了shutdownNow则立马达到stop级别,也要立刻中断

    注意:线程池本身不响应中断,是否中断用户自行在task.run方法决定

    • <5>

    1、<5.1>是任务执行开始之前的动作,默认空实现

    2、<5.2>是执行任务,即调用run方法

    3、<5.3>是任务结束之后的动作,默认空实现

    • <6>

    completedAbruptly变量表示是否是暴力完成任务的,执行到<4>处说明是正常完成,不是暴力完成。

    代码执行到<4>处位置,说明getTask方法取出任务为null,也就是任务不足了,就需要考虑销毁不必要的线程

    • <7>

    此时需要终止工作线程。这一段后面有详解。

    2.4. getTask():获取任务

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // <1> 极端条件是:rs == SHUTDOWN,且 workQueue不为空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // <2> 是否允许取出任务超时
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            // <3> 极端条件:wc <= maximumPoolSize 且 wc > corePoolSize 且 超时 且 队列为空,此时需要销毁当前线程
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // <2> 取出任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // <3> 到这里说明当前线程出现了等待超时,则可能要销毁非核心线程
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • <1>

    此时条件复杂,我们看极端情况,极端条件是:【rs == SHUTDOWN且 workQueue不为空】,说明了即使是SHUTDOWN状态只要队列workQueue不为空,还是需要等待workQueue执行。

    SHUTDOWN是优雅停机,会先等workQueue执行完毕

    • <2>处,

    timed变量主要是控制当线程空闲时是否需要销毁线程。默认如果线程数量>corePoolSize且空闲,则销毁当前线程;如果线程<=corePoolSize但空闲是不会销毁线程的。(是否销毁默认取决于线程数量)

    • <3>

    极端条件:wc <= maximumPoolSize 且 wc > corePoolSize 且 超时 且 队列为空,此时线程数量太多又没有任务所以需要销毁当前线程。

    • <4>处,取出任务。

    1、如果允许取出任务超时,则调用限时的poll方法。如果超时返回null

    2、如果不允许取出任务超时,则调用take方法阻塞获取task。

    也就是说如果返回null则一定就是**当前线程出现了等待的情况,则考虑是否需要销毁不必要的线程了

    2.5. processWorkerExit():worker线程退出

    注意:这才是worker工作线程真正退出的地方,而不是调用线程池的shutdown方法工作线程就会停止,工作线程会等待所有任务都执行完成后才会停止。

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        /**
         * 1、worker数量-1
         * 如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,
         *那么正在工作的worker线程数量需要-1
         * 如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
         */
        if (completedAbruptly) 
            // If abrupt, then workerCount wasn't adjusted 代码和注释正好相反啊
            decrementWorkerCount();
     
        /**
         * 2、从Workers Set中移除worker
         */
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks; //把worker的完成任务数加到线程池的完成任务数
            workers.remove(w); //从HashSet中移除
        } finally {
            mainLock.unlock();
        }
     
        /**
         * 3、在对线程池有负效益的操作时,都需要“尝试终止”线程池
         * 主要是判断线程池是否满足终止的状态
         * 如果状态满足,但还有线程池还有线程,尝试对其发出中断响应,使其能进入退出流程
         * 没有线程了,更新状态为tidying->terminated
         */
        tryTerminate();
     
        /**
         * 4、是否需要增加worker线程
         * 线程池状态是running 或 shutdown
         * 如果当前线程是突然终止的,addWorker()
         * 如果当前线程不是突然终止的,但当前线程数量 < 要维护的线程数量,addWorker()
         * 故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程
         */
        int c = ctl.get();
        //如果状态是running、shutdown,即tryTerminate()没有成功终止线程池,尝试再添加一个worker
        if (runStateLessThan(c, STOP)) {
            //不是突然完成的,即没有task任务可以获取而完成的,计算min,并根据当前worker数量判断是否需要addWorker()
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //allowCoreThreadTimeOut默认为false,即min默认为corePoolSize
                 
                //如果min为0,即不需要维持核心线程数量,且workQueue不为空,至少保持一个线程
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                 
                //如果线程数量大于最少数量,直接返回,否则下面至少要addWorker一个
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
             
            //添加一个没有firstTask的worker
            //只要worker是completedAbruptly突然终止的,或者线程数量小于要维护的数量,就新添一个worker线程,即使是shutdown状态
            addWorker(null, false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    processWorkerExit(Worker w, boolean completedAbruptly)

    worker:要结束的worker

    completedAbruptly:是否突然完成(是否因为异常退出)

    执行流程:

    1、worker数量-1

    A、如果是突然终止,说明是task执行时异常情况导致,即run()方法执行时发生了异常,那么正在工作的worker线程数量需要-1
    
    B、如果不是突然终止,说明是worker线程没有task可执行了,不用-1,因为已经在getTask()方法中-1了
    
    • 1
    • 2
    • 3

    2、从Workers Set中移除worker,删除时需要上锁mainlock
    3、tryTerminate()。大概逻辑:判断线程池是否满足终止的状态

    tryTerminate虽然很长,但是只有一个作用,通过设置线程池状态为terminal来真正终止线程,终止的前提条件是:线程池状态为shutdown + 队列是空的 + 工作线程为0

    操作:更新状态为tidying->terminated

    4、是否需要增加worker线程,如果线程池还没有完全终止,仍需要保持一定数量的线程

    如:我们自定义的任务task.run方法发生了异常导致线程退出,此时线程池需要补充新的线程进来

    故如果调用线程池shutdown(),直到workQueue为空前,线程池都会维持corePoolSize个线程,然后再逐渐销毁这corePoolSize个线程

    3.3. 关闭线程池

    注意:关闭线程池并不能让线程池中的线程停止,只是对其发送了中断信号,仅仅是发送了信号而已。而ThreadPoolExecute没有响应中断信号,所以最终只有在所有的task都被执行后才会真正的结束worker线程。

    3.3.1. shutdown方法

    shutdown()后线程池将变成shutdown状态,此时不接收新任务**,但会处理完正在运行的和在阻塞队列中等待处理的任务**。

    /**
     * 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock(); //上锁 
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne) break;
            }
        } finally {
            mainLock.unlock(); //解锁 } }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    shutdown()执行流程:

    1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock

    2、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务

    3、中断所有空闲线程 interruptIdleWorkers()

    正在运行中的线程不会被中断

    4、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理

    5、解锁

    6、尝试终止线程池 tryTerminate()

    可以看到shutdown()方法最重要的几个步骤是:更新线程池状态为shutdown中断所有空闲线程tryTerminated()尝试终止线程池

    interruptIdleWorkers() 中断空闲线程过程如下:

        /**
         * 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续
         */
        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock(); //上锁 
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne) break;
                }
            } finally {
                mainLock.unlock(); //解锁 } }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    关键是:worker.tryLock() 是否成功

    如果worker是空闲状态(即tryLock成功),AQS的state从0–>1,既然是空闲线程那么中断空闲线程。

    3.3.2. shutdownNow方法

    shutdownNow()后线程池将变成stop状态,此时不接收新任务**,不再处理在阻塞队列中等待的任务,还会尝试中断正在处理中的工作线程**。

    无论如何线程池ThreadPoolExecute本身没有响应中断,至于用户是否响应中断时用户自己的事情。

    /**
     * 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
     * 这个任务列表是从任务队列中排出(删除)的
     * 

    * 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination() *

    * 除了尽力尝试停止运行中的任务,没有任何保证 * 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束 */ public List <Runnable> shutdownNow() { List <Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //上锁 try { //判断调用者是否有权限shutdown线程池 checkShutdownAccess(); //CAS+循环设置线程池状态为stop advanceRunState(STOP); //中断所有线程,包括正在运行任务的 interruptWorkers(); tasks = drainQueue(); //将workQueue中的元素放入一个List并返回 } finally { mainLock.unlock(); //解锁 } //尝试终止线程池 tryTerminate(); return tasks; //返回workQueue中未执行的任务 }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    该方法会中断所有的线程,包括正在运行的线程,用户是否响应是用户自己的事情

    线程池中队列中等待执行的task会被移除,并返回给用户。

    shutdownNow() 和 shutdown()的大体流程相似,差别是:

    1、将线程池更新为stop状态

    2、调用**interruptWorkers()**中断所有线程,包括正在运行的线程

    interruptWorkers先对mainLock加锁,然后循环调用interruptIfStarted方法中断Worker的所有线程,其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用t.interrupt()。

    3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

    4. 其他方法

    4.1. reject方法

    private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
    
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    默认的handler是AbortPolicy,它的处理方式是“抛出异常”

    其它的handler:

    • DiscardPolicy

    直接丢弃

    • DiscardOldestPolicy

    丢弃”最老的“

    • CallerRunsPolicy

    调用者线程来执行。但是如果线程池被shutdown,则任务会被丢弃。

    5. 总结

    1、把握int型ctl变量的高3位表示线程池的状态,低29为表示线程池中线程数量是前提

    2、能了解线程池执行任务的总体流程(先core,在队列,在max,在拒绝)

    3、能了解为什么要有Work类(主要是为了中断)

    4、worker线程满足什么条件才会退出

    task发生异常会退出但是又会补充新的线程进来

    只有任务都执行完了且调用了shutdown方法,才会真正退出

    5、shutdown方法对worker线程的影响

    shutdown会对空闲线程interrupt

    shutdown会改变线程池状态为shutdown,worker线程会被状态作出反应

    • shutdown之后不能添加新的task(体现在addWorker中)

    • worker线程执行完任务之后就退出了,而不是在workQueue队列上等待新任务

    6、温故而知新,还是要经常看看经常想想

    6. 参考

    https://blog.csdn.net/qq_41573860/article/details/123291943

  • 相关阅读:
    MLAgents (0) Unity 安装及运行
    基于gpio的子系统编写led的驱动和应用程序测试
    QT中计算日期差,并进行加减
    ConcurrentHashMap(JDK1.8)中红黑树的实现
    SSM框架之MyBatis入门(Maven工程实现全查功能,快速入门,适合小白)
    使用服务器训练模型的注意事项
    uni-app项目由hbuilder项目转化为cli项目
    并发与多线程(5) 条件变量 condition_variable
    【算法leetcode】2325. 解密消息(rust和go重拳出击)
    Unity开发之C#基础-集合(字典)(Dictionary)
  • 原文地址:https://blog.csdn.net/yuchangyuan5237/article/details/133455708