• 线程池源码解析 3.excute() 方法


    线程池源码解析—excute()方法

    execute()

    • execute 方法是线程池的核心方法,所有的方法,包括包装的 FutureTask,都是调用这个方法。

    大致流程

    这里只是总结了一遍大致的流程,一些细节问题见下面的流程图或者参考源码。

    • 当提交任务时,首先判断当前线程池内的线程数是否达到了核心线程,没有达到核心线程数就开线程去执行任务,如果达到了核心线程数,就尝试将任务加入阻塞队列中。
    • 如果说队列也满了,就尝试继续开线程执行任务,如果此时线程池中的存活线程已经等于了 maximumPoolSize,那么直接走拒绝策略。没有到达最大线程,则开启线程执行任务。

    流程图

    execute

    源码解析

       /*
        * command可以是普通的Runnable实现类,也可以是FutureTask(本质也是一个Runnable)
        */   
       public void execute(Runnable command) {
            // command不能为null
            if (command == null)
                throw new NullPointerException(); 
           	/* 
           	 * 获取内部的ctl的值赋值给c
           	 * 高3位表示线程池状态,低29位表示当前线程池的线程数量
           	 */
            int c = ctl.get();
            /*
             * workerCountOf(c)就是获取ctl的低29位,即当前线程池中存活的线程数量
             * 条件成立:表示当前线程数量小于核心线程数量,此次提交任务,直接创建一个新的worker(线程),对应线程池中多了一个新的线程
             */
            if (workerCountOf(c) < corePoolSize) {
                /*
                 * addWorker(Runnable firstTask, boolean core) 为创建worker并执行任务的过程,会创建worker对象,并且将command作为firstTask
                 * @param firstTask 表示当前的任务
                 * @param core 表示此次创建的线程是否算到核心线程数的头上
                 *        core == true 表示采用核心线程数量限制  false表示采用 maximumPoolSize
                 */
                if (addWorker(command, true))
                    // 创建成功后,直接返回。addWorker方法内部会启动新创建的worker,执行firstTask
                    return;
                /*
                 * 执行到这条语句,说明addWorker()一定失败了...
                 * 有几种可能? 
                 *   1.存在并发现象,execute方法是可能有多个线程同时调用的,当workerCountOf(c) < corePoolSize条件成立时,其他线程可能也成立了,
                 *     并且向线程池中创建了worker,如果此时线程池中的线程数已经到达了上限等,那么就会失败。
                 *   2.当前线程池状态发生了改变
                 *     2.1 如果当前线程池状态是非RUNNING,addWorker(firstTask != null, true|false)一定会失败
                 *     2.2 SHUTDOWN状态下,也有可能创建成功,前提是firstTask == null 并且当前队列不为空(特殊情况)
                 */ 
                // 重新获取一下ctl的值
                c = ctl.get();
            }
            /*
             * 执行到这里有几种情况?
             *   1.当前线程数量已经达到了corePoolSize
             *   2.addWorker()失败
             */
           	/*
           	 * 条件成立:表示当前线程池处于running状态,则尝试将任务放到阻塞队列中
           	 */
            if (isRunning(c) && workQueue.offer(command)) {
                // 执行到这里,说明当前任务已经入队
                // 再次获取一下ctl的值
                int recheck = ctl.get();
                /*
                * 根据ctl的值 判断当前线程池处于非RUNNING状态,则尝试从队列中移除任务
                * 条件1:! isRunning(recheck) 成立:说明你提交到队列之后,线程池状态被外部线程给修改 比如:shutdown() shutdownNow()
                *        这种情况 需要把刚刚提交的任务删除掉。
                * 条件2:remove(command) 移除当前任务,有可能成功,也有可能失败
                *        成功:提交之后,线程池中的线程还未消费(处理)
                *        失败:提交之后,在shutdown() shutdownNow()之前,就被线程池中的线程 给处理了。
                */
                if (!isRunning(recheck) && remove(command))
                    // 任务移除成功,走拒绝策略
                    reject(command); 
                /*
                 * 有几种情况会到这里?
                 *   1.当前线程池是running状态(这个概率最大)
                 *   2.线程池状态是非running状态 但是remove提交的任务失败
                 *
                 * 这里是一个担保机制,担保线程池是running状态,
                 * 但是如果线程池中存活的线程是0的话,就会很尴尬,这里就调用addWorker()开一个线程去执行任务
                 * 保证线程池在running状态下,最起码得有一个线程在工作。
                 */
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            /*
             * 执行到这里有几种情况?
             *   1.当前线程池是非running状态
             *   2.任务入队失败
             * 
             * 1.如果线程池处于非running状态,调用addWorker()一定会失败,走拒绝策略
             * 2.如果说队列满了,这个时候如果当前线程数量没有达到maximumPoolSize,会创建新的worker直接执行任务(救急线程)
             *   如果达到了最大线程数,addWorker()就会失败,走拒绝策略
             */ 
            else if (!addWorker(command, false))
                // 走拒绝策略
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    addWorker()

    总体流程分析

    先判断当前线程池状态是否允许继续添加任务,允许添加的话,CAS操作ctl的值尝试+1CAS 成功然后传入任务通过 new Worker() 的方式构造一个 Worker,然后加锁尝试将 worker 对象放入线程池,如果加入线程池成功,释放锁后就调用 start() 执行任务。如果任务最终没有被执行,就需要执行后续的清理逻辑。

    源码解析

       /*
        * @param firstTask可以为null,表示启动worker之后,worker自动从队列中获取任务,如果不是null,worker优先执行firstTask(内部任务)
        * @param core 采用的线程数限制,如果为true,采用核心线程数限制,false采用maximumPoolSize线程数限制 (传入的参数一般都是false)
        * @return boolean 表示任务是否已经启动
        * 返回值总结:true表示创建worker成功且线程启动成功,加入到set集合中
        *            false: ① 线程池状态大于SHUTDOWN时,一定会失败
        *                    ② 当前状态 = SHUTDOWN 但是队列中已经没有任务了 或 当前状态是SHUTDOWN且队列未空,但是firstTask不为null
        *                    ③ 当前线程池已经达到了指定指标(core或max)
        *                    ④ ThreadFactory创建的线程是null
        */
       private boolean addWorker(Runnable firstTask, boolean core) {   
            // 自旋 判断当前线程池状态是否允许创建线程
            retry: for (;;) {
                // 获取当前的ctl值保存到c中
                int c = ctl.get();
                // rs:当前线程池的运行状态
                int rs = runStateOf(c);
                /*
                 * 这个判断主要是为了判断当前线程池是SHUTDOWN状态,但是队列里还有任务尚未处理完,这个时候是允许添加worker(线程)的,
                 * 但是不允许再次提交task
                 */
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null  &&
                       ! workQueue.isEmpty()))
                    return false;
    			
                // 上面的代码,就是判断当前线程池的状态是否允许继续添加线程
                
                // 内部自旋 获取创建线程令牌的过程
                for (;;) {
                    // 获取当前线程池中的线程数量
                    int wc = workerCountOf(c);
                    /*
                     * 条件1:wc >= CAPACITY 永远不成立,因为CAPACITY是一个5亿多大的数字
                     * 条件2:根据传来的core参数判断当前线程数是否大于等于核心线程数或者是最大线程数,大于等于的话直接return false
                     */
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        // 执行到这里,说明当前无法添加线程了,已经达到指定限制了
                        return false;
                    /*
                     * 尝试使用CAS方式更新ctl,将线程数量+1,成功,相当于申请到了一块令牌
                     * 成功:说明更新成功,结束外层自旋
                     * 失败:说明有竞争,其他线程已经修改了ctl的值 或者外部线程可能调用了
                     * shutdown或者shutdownNow(),导致线程池状态发生变化,CAS也会失败
                     */ 
                    if (compareAndIncrementWorkerCount(c))
                        // 走到这,一定是CAS成功,申请到令牌了。结束外层自旋,继续向下执行外部代码(创建Worker)
                        break retry;
                    // CAS失败,获取最新的ctl值
                    c = ctl.get();  
                    // 判断当前线程池状态是否发生过变化(如果外部在这之前调用过shutdown/shutdownNow会导致状态变化),如果变化了,继续自旋
                    if (runStateOf(c) != rs)
    					// 状态发生变化后,直接返回到外层循环,外层循环负责判断当前线程池状态 和 是否允许创建线程
                        continue retry;
                }
            }
            // 表示创建的worker是否已经启动
            boolean workerStarted = false;
            // 表示创建的worker是否添加到了workers(HashSet)中
            boolean workerAdded = false;
            // w表示后面创建worker的一个引用
            Worker w = null;
            try {
                // 创建Worker,执行完后,线程“应该”已经准备好了
                w = new Worker(firstTask); 
                // 将新创建的worker节点的线程 赋值给t
                final Thread t = w.thread;
                /*
                 * 为什么要做 t != null这个判断?
                 * 为了防止ThreadFactory实现类有bug,因为ThreadFactory是一个接口
                 */
                if (t != null) {
                    // 将全局锁的引用保存到mainLock中
                    final ReentrantLock mainLock = this.mainLock;
                    /*
                     * 尝试加锁,可能会阻塞,直到获取成功为止,同一时刻操纵线程池内部的相关操作,都必须持锁
                     */
                    mainLock.lock();  
             		// 只要走到了这里,说明当前线程已经加锁成功,其他线程是无法修改当前线程池状态的
                    try {
                        // 获取线程池运行状态保存到rs中
                        int rs = runStateOf(ctl.get());
                        /*
                         * 条件1:判断当前线程池的状态是否正常(RUNNING)
                         * 条件2:判断当前线程池为SHUTDOWN状态并且firstTask为null,其实就是判断是否是SHUTDOWN状态下的特殊情况(队列里还有任务)
                         */
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            // 这里防止程序员自定义ThreadFactory实现类时,创建线程返回给外部之前,将线程start了..
                            if (t.isAlive()) 
                                // 抛出异常
                                throw new IllegalThreadStateException();
                            // 将创建的worker添加到线程集合(就是线程池)中
                            workers.add(w);
                            // 获取最新线程池线程的数量
                            int s = workers.size(); 
                            // 条件成立,更新largestPoolSize 
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            // 更新标志位,表示线程已经追加到线程池中了
                            workerAdded = true;
                        }
                    } finally {
                        // 释放全局锁
                        mainLock.unlock();
                    }
                    /*
                     * 条件成立:workerAdded为true,表示添加worker成功
                     * 条件不成立:表示线程池在lock之前,状态发生了变化,导致添加线程失败
                     */
                    if (workerAdded) {
                        // 成功后,将线程启动
                        t.start();
                        // 启动标记设置为true
                        workerStarted = true;
                    }
                }
            } finally {
                // 判断启动标记
                if (!workerStarted)
    				// worker启动失败,需要做清理工作
                    addWorkerFailed(w);
            }
            // 返回新创建的线程是否启动 true|false
            return workerStarted;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128

    addWorkerFaild()

        /*
         * 在addWorker()方法的内部自旋中,我们使用 compareAndIncrementWorkerCount(c) 将线程数+1了,
         * 但执行到这个方法说明我们创建线程失败了,所以需要进行清理工作,将ctl的值-1
         */
    	private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                if (w != null)
                    // 从workers中移除失败的worker
                    workers.remove(w);
                // 使用CAS + 自旋将ctl的值-1
                decrementWorkerCount();
                // 见后续SHUTDOWN()
                tryTerminate();
            } finally {
                // 释放锁
                mainLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    参考

  • 相关阅读:
    5款开源、美观、强大的WPF UI组件库
    模糊神经网络算法matlab,模糊神经网络算法原理
    神经网络模型参数辨识,神经网络信号识别
    Java并发面试题:(七)ThreadLocal原理和内存泄漏
    【中间件】Redis监控以及指标
    【Java网络编程】二
    记录一次线上内存溢出排查详细过程
    SAP MM学习笔记37 - 请求书照合中的配送费用
    【git 使用】使用 git rebase -i 修改任意的提交信息/合并多个提交
    使用nodel实现前后端数据渲染
  • 原文地址:https://blog.csdn.net/weixin_53407527/article/details/127829809