• 2 线程池-ThreadPoolExector分析


    线程池系列文章

    1 手写线程池
    2 ThreadPoolExecutor分析
    3 线程池扩展

    ThreadPoolExecutor分析

    提交任务

    按照:核心线程->入队列->最大线程->拒绝策略

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
          	//ctl:使用一个AtomicInteger实现了线程池状态和线程数量的双重表示
          	//Intgeger32位,高3位表示线程池的状态(有7中方式),低29位表示线程的数量。
          	//1个变量表示两种含义,主要可以同步修改两种状态,不然单独修改,需要考虑相互之间的影响
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
            	//添加核心线程
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //添加核心线程失败,任务入队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //如果线程池被关闭,移除提交的任务,直接触发决绝策略
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                    //如果线程数量等于0,添加一个线程
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
            	//添加普通线程失败,触发拒绝策略
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    添加Worker

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            //当前for循环,主要处理添加worker的各种前置判断,因为大量使用的Atomic变量,因此需要通过for循环处理CAS。
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    //当前线程数是否大于最大线程数
                    //当前线程数是否大于核心线程数/最大线程数
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //线程数+1,处理CAS
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    //再次校验线程池状态
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    		//通过for循环,成功的把worker数量+1,这样就可以正式添加worker
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    //因为workers、largestPoolSize 都是普通变量,因此需要添加锁
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                    	//添加worker成功,启动
                        t.start();
                        //启动成功,如果启动失败,那么直接抛出异常,走addWorkerFailed
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    手动线程池中:添加worker,只是简单的判断一下当前线程数是否大于核心线程/最大线程数后,直接add,没有考虑相关的并发,类似于:for()的cas和ReentrantLock对workers和largestPoolSize的手动加锁。

    启动worker

    添加worker时,有:t.start(); 其中t是来自worker的Thread,手动线程中也是对Thread进行了包装。进入Worker中run方法查看:

    public void run() {
                runWorker(this);
            }
    
    • 1
    • 2
    • 3

    只是简单的调用线程池中的runWorker方法。

    final void runWorker(Worker w) {
    		//Thread.currentThread():其实就是Worker中包装的Thread,因为方法的入口是run方法,而run方法的触发是在start时,start的调用是在addWorker时的t。
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            //任务是否被突然终止,主要是因为执行任务时,任务有可能抛出异常。
            boolean completedAbruptly = true;
            try {
            	//while死循环,一直获取任务,直到任务为空,当前线程退出。
                while (task != null || (task = getTask()) != null) {
                	//获取任务后,立刻上锁,主要是表明当前worker是工作中,不允许打断,防止可能的打断影响业务功能。手动线程时通过Boolean变量进行表明
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    // 这块暂时不太清楚,后面再分析。
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                    	//提供前置钩子,可以在任务执行前进行一些处理。
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                        	//真正的执行任务了
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                        	//提供后置钩子
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    获取任务

    启动任务跟手动线程池逻辑差不多,但是启动方法从worker中挪移到ThreadPoolExectuor中。

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    		//第一眼看到for循环,可能很困惑,如果手写之后,就明白了
    		//获取任务为空时,所有线程都退出,通过for循环,线程退出也需要进行条件判断。
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?        
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      			//淘汰线程:1 线程数大于最大线程数(手动降低了最大线程数,
                //有可能当前线程数大于最大线程数),减少线程到最大线程数。
                //2 线程数大于核心线程数,线程超时未获取任务,减少线程数到核心线程数
                //3 线程数不大于核心线程数,如果设置允许核心线程退出,并且队列为空,取消所有线程
                // && (wc>1 || workQueue.isEmpty())分析:当线程数等于核心线程数时,wc>1恒成立
                //所以线程数大于最大线程数,线程数大于核心线程数时,以前面的为准。
                //当线程数=核心线程数时,如果允许核心线程退出,timed恒成立,
                //那么(wc > maximumPoolSize || (timed && timedOut)也恒成立,
                //因此只需要考虑wc > 1 || workQueue.isEmpty(),空闲时间,workQueue.isEmpty也是恒成立的
                //因此允许核心线程退出,线程池空闲时,核心线程也可以退出。
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    //线程数-1,因为线程数是Atomic,因此理由上面的for循环实现减一
                    if (compareAndDecrementWorkerCount(c))
                    	//返回null,线程的while循环退出
                        return null;
                    continue;
                }
    
                try {
                	//线程超过核心线程数,采用超时获取任务策略,保证线程有退出的机会
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    //线程等待超时,for循环判断是否应该退出,
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    核心线程退出时:没有依靠timedOut判断队列是否为空,而是直接通过workQueue.isEmpty,强制依赖进行退出。

    线程退出

    在启动线程时,runWorker中while循环失败后,会自动退出。

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            //线程突然结束,主要因为任务的run方法抛出异常。
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();                
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                //正常结束
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    真正处理退出

    
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    		//正常退出时,已经在getTask进行线程数-1,但是异常退出没有机会修改线程数,
    		//需要事后补充。
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                //这个方法就是自定义while循环处理cas,而getTask是借助for循环
                decrementWorkerCount();
    
    		//移除线程
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    		//判断一下是否要关闭线程池
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
            	//正常退出,如果线程数小于核心线程数,仍然要加一个,配置了核心线程数可以退出除外。
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                //非正常退出,被删掉的线程再加回来
                addWorker(null, false);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    通过上述线程退出机制,一旦任务抛出异常,就会导致执行该任务的线程退出,同时会新创建个线程处理剩余的任务。但是整个流程,都没有说业务抛出的异常如何处理,所以这就是线程池会吞掉异常的说法。

    线程退出的顺序

    1. 超过最大线程数->最大线程数->超过核心线程数(线程池空闲)->核心线程数(设置允许核心线程退出+线程池空闲)->没有线程了。
    2. 线程池空闲:workQueue没有任务了,也没有新任务提交了。
  • 相关阅读:
    面试Python一定要记住的超基础知识点【速记】
    爬虫基础 & JS逆向
    量子OFFICE之q6platform讲座:1-FileSystem
    第2章-矩阵及其运算-矩阵运算(2)
    算法|图论 1 广度深度
    接扫理解.exe文件的结构原理即运行过程
    Nacos作为配置中心详解
    java计算机毕业设计基于ssm框架的校园闲置二手商品交易平台
    《HTML+CSS+JavaScript》之第14章 文本样式
    渗透测试学习day2
  • 原文地址:https://blog.csdn.net/u010652576/article/details/126715445