• 面试官: 线程池是如何做到线程复用的?有了解过吗,说说看


    ThreadPoolExecutor中是如何做到线程复用的?

    我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,一个线程执行完线程任务后不销毁,继续执行另外一个线程任务。那么它是如何做到的?这得从addWorker()说起

    addWorker()

    • 先看上半部分addWorker()
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 对边界设定的检查
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                    firstTask == null &&
                    ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    retry:可能有些同学没用过,它只是一个标记,它的下一个标记就是for循环,在for循环里面调用continue/break再紧接着retry标记时,就表示从这个地方开始执行continue/break操作,但这不是我们关注的重点。

    从上面的代码,我们可以看出,ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。这个addWorker是excute方法中调用的

    • 我们接着看下半部分
    private boolean addWorker(Runnable firstTask, boolean core) {
            // 上半部分
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    // core是ture,需要创建的线程为核心线程,则先判断当前线程是否大于核心线程
                    // 如果core是false,证明需要创建的是非核心线程,则先判断当前线程数是否大于总线程数
                    // 如果不小于,则返回false
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
    
            // 下半部分
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 创建worker对象
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    // 获取线程全局锁
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        
                        int rs = runStateOf(ctl.get());
                        // 判断线程池状态
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // 将当前线程添加到线程组    
                            workers.add(w);
                            int s = workers.size();
                            // 如果线程组中的线程数大于最大线程池数 largestPoolSize赋值s
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            // 添加成功    
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    // 添加成功后执行线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 添加失败后执行 addWorkerFailed
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    再看 addWorkerFailed(),与上边相反,相当于一个回滚操作,会移除失败的工作线程

    private void addWorkerFailed(Worker w) {
            // 同样需要全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    workers.remove(w);
                decrementWorkerCount();
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    Worker

    我们接着看Worker对象

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    
            //.....
            // 省略下边代码
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    Worker类实现了Runnable接口,所以Worker也是一个线程任务。在构造方法中,创建了一个线程,回过头想想addWorker()里为啥可以t.start()应该很清楚了吧, 并且在构造方法中调用了线程工厂创建了一个线程实例,我们上节讲过线程工厂。其实这也不是关注的重点,重点是这个runWorker()

    final void runWorker(Worker w) {
            // 获取当前的线程实例
            Thread wt = Thread.currentThread();
            // 直接从第一个任务开始执行 
            Runnable task = w.firstTask;
            // 获取完之后把worker的firstTask置为null 防止下次获取到
            w.firstTask = null;
            // 线程启动之后,通过unlock方法释放锁
            w.unlock(); // allow interrupts
    
            // 线程异常退出时 为 true
            boolean completedAbruptly = true;
            try {
                // Worker执行firstTask或从workQueue中获取任务,直到任务为空
                while (task != null || (task = getTask()) != null) {
                    // 获取锁以防止在任务执行过程中发生中断
                    w.lock();
                    // 判断边界值 如果线程池中断 则中断线程
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        // 相当于钩子方法
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            // 执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while循环中,worker会不断地调用getTask方法从阻塞队列中获取任务然后调用task.run()执行任务,从而达到复用线程的目的。只要getTask方法不返回null,此线程就不会退出。

    我们接着看getTask()

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                // 如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。
                // 如果有设置允许线程超时或者线程数量超过了核心线程数量,并且线程在规定时间内均未poll到任务且队列为空则递减worker数量
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                // 如果timed为true,则会调用workQueue的poll方法获取任务.
                // 超时时间是keepAliveTime。如果超过keepAliveTime时长,
    
                // 如果timed为false, 则会调用workQueue的take方法阻塞在当前。
                // 队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    大家有没有想过这里为啥要用take和poll,它们都是出队的操作,这么做有什么好处?

    take & poll

    我们说take()方法会将核心线程阻塞挂起,这样一来它就不会占用太多的cpu资源,直到拿到Runnable 然后返回。

    如果allowCoreThreadTimeOut设置为true,那么核心线程就会去调用poll方法,因为poll可能会返回null,所以这时候核心线程满足超时条件也会被销毁

    非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收 。

    再回头看一下runWorker()是不是设计的很巧妙~

    结束语

    本节内容不是很好理解,想继续探讨的同学可以继续阅读它的源码,这部分内容了解一下就好,其实我们从源码中可以看到大量的线程状态检查,代码写的很健壮,可以从中学习一下

  • 相关阅读:
    问题usr/bin/env: “python‘: Too many levels of symbolic links太多层链接的bug pycharm
    想去银行测试?那这套题目你必须要会
    GitLab搭建
    解读大模型(LLM)的token
    CSS高度塌陷问题和外边距重叠的解决方案(clear方法)
    设计加速!11个Adobe XD插件推荐!
    建信金融科技--软开笔试题1.答案--java--9.12
    matlab工具箱如何设置自己的BP神经网络初始权重
    3.cuBLAS开发指南中文版--cuBLAS数据类型引用
    React的类式组件和函数式组件之间有什么区别?
  • 原文地址:https://blog.csdn.net/weixin_57907028/article/details/125503266