• 线程池执行流程


    源码分析

    execute(提交)方法源码:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    addWorker(添加线程):

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (int c = ctl.get();;) {
                // Check if queue empty only if necessary.
                if (runStateAtLeast(c, SHUTDOWN)
                    && (runStateAtLeast(c, STOP)
                        || firstTask != null
                        || workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    if (workerCountOf(c)
                        >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateAtLeast(c, SHUTDOWN))
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int c = ctl.get();
    
                        if (isRunning(c) ||
                            (runStateLessThan(c, STOP) && firstTask == null)) {
                            if (t.getState() != Thread.State.NEW)
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            workerAdded = true;
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    addWorker方法

    先看addWorker方法的第一部分,这个方法弄懂了,execute方法就非常好理解:

    第一部分

    在这里插入图片描述
    先讲大致意思:

    1. 使用标志位并进入死循环
    2. 判断当前线程池状态、任务是否为空、阻塞队列是否为空
    3. 根据core判断当前线程量(区分核心线程、最大线程数量)是否达到最大限制
    4. 将当前线程数量+1,只有此方法成功才可退出死循环
    5. 判断当前状态,为true跳到标志位处,跳过本次循环,开启下次循环

    首先一进来方法,就是两个死循环:

    for(int c =ctl.get();;){
    	for(;;){
    		...
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    而唯一跳出死循环的办法就在第四步:

    break retry;
    
    • 1

    可能很多人不理解为什么break要这么写,这其实就是标志位的写法,很多源码中都能看到
    详情参考链接:java retry: 详解

    所以break retry的含义就是 跳入到标志位处,且不再进入标志位下方的循环
    continue retry:跳入到标志位处,跳过本次循环,进入下次循环

    我们再来看看方法块中成员变量ctl方法runstateAtleast()compareAndIncrementWorkerCount()

    成员变量ctl

    在这里插入图片描述
    源码中简单说明了ctl是一个成员变量,主要作用是记录线程池的生命周期状态当前线程数,ctl是原子性的,作者通过巧妙的设计,将一个整形变量按二进制分为两部分,也就是说ctl中的成员变量value通过拆装箱可以表达两种含义(线程池的生命周期状态、当前线程数),想了解更深的可以参考链接(反正我是看不懂):详解Java线程池的ctl(线程池控制状态)【源码分析】

    线程池的声明周期状态:

    • RUNNING:处于RUNNING状态的线程池能够接受新任务,以及对新添加的任务进行处理。
    • SHUTDOWN:处于SHUTDOWN状态的线程池不可以接受新任务,但是可以对已添加的任务进行处理。
    • STOP:处于STOP状态的线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
    • TIDYING:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
    • TERMINATED:线程池彻底终止的状态。

    拆装箱方法详解:线程池(二、ctl 的设计分析)

    runStateAtLeast()

    字面意思:线程池状态大于等于xx
    来看这个方法使用:

    runStateAtLeast(ctl.get(), SHUTDOWN))
    
      private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ctl.get():就是获取ctl的成员变量value,之前也说到成员value是可以表达两种含义的(线程池的生命周期状态、当前线程数),这里就有用到表达声明周期状态的用法。

    SHUTDOWN是有初始化的,所以方法可以简单理解成:若当前ctl的生命周期状态>=SHUTDOWN就证明当前线程池的生命周期状态是SHUTDOWN状态或者是其它状态,但绝不是RUNNING状态,线程池初始化的五种状态,值越大就代表当前线程池就越危险。
    在这里插入图片描述

    workerCountOf()

    获取当前线程池的已有的线程数量。

    compareAndIncrementWorkerCount©

    /**
         * Attempts to CAS-increment the workerCount field of ctl.
         */
        private boolean compareAndIncrementWorkerCount(int expect) {
            return ctl.compareAndSet(expect, expect + 1);
        }
    
    
    
    	/**
         * Atomically sets the value to {@code newValue}
         * if the current value {@code == expectedValue},
         * with memory effects as specified by {@link VarHandle#compareAndSet}.
         *
         * @param expectedValue the expected value
         * @param newValue the new value
         * @return {@code true} if successful. False return indicates that
         * the actual value was not equal to the expected value.
         */
        public final boolean compareAndSet(int expectedValue, int newValue) {
            return U.compareAndSetInt(this, VALUE, expectedValue, newValue);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    将当前ctl的value值加1,可以理解成当前线程数量+1

    第二部分

    在这里插入图片描述

    1. 新建线程,不区分核心线程、非核心线程,核心线程不会被删除,其实可以理解成保留核心线程大小的线程(保留maximumPoolSize个线程)
    2. 加锁,保证线程安全进行加锁。
    3. 判断当前线程池时状态,将新建的线程加入线程队列/集合(管理线程),线程队列在线程池创建时就创建,所以可以浅显的理解成线程池其实有两个队列,分别是阻塞队列(workQueue)-管理任务、线程队列(workers)-管理线程
    4. 解锁
    5. 执行任务,注意这里是直接执行任务了,并没有交给阻塞队列
    6. 回退,将刚加入的线程取消

    Worker

    Woker就是线程池的内部类,构造方法就是调用线程工厂新建线程

     /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
    
        /**
         * Returns the thread factory used to create new threads.
         *
         * @return the current thread factory
         * @see #setThreadFactory(ThreadFactory)
         */
        public ThreadFactory getThreadFactory() {
        	// 返回当前线程池的线程工厂
            return threadFactory;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    execute方法

    在这里插入图片描述

    1. 判断当前线程数量是否小于核心线程个数,小于则直接新增线程并直接执行任务(注意是直接,没有放入到阻塞队列中),
    2. 大于则判断当前线程池状态是否为RUNNING,并将任务当前加入阻塞队列中
    3. 加入阻塞队列成功后 需要判断当前线程池状态是否正常,不正常则回退阻塞队列(remove(command))并执行拒绝策略(reject(command))
    4. 加入阻塞队列成功后 需要判断当前线程数量是否为0(当核心线程数为0时,会导致任务无法被线程执行),为true则新增一个线程但并没有执行任务,因为任务已经添加进入阻塞队列了,且firstTask为null,所以addWorker方法执行的任务是空的
    5. 加入阻塞队列失败后 创建一个新线程并执行马上执行任务,若是创建线程失败(可能的原因:线程数量已达到最大限制、线程池状态异常)则采用拒绝策略

    这里可能大部分不会理解,为什么在第二步加入阻塞队列之后还需要再判断有没有线程、线程池状态,这里因为在创建线程池的时候,核心线程大小个数可能会被设置成0,若是设置为0,则可以跳过第一步的判断,若是没有第四步则会导致任务都会进入阻塞队列,但是却没有线程执行任务

    总结

    在这里插入图片描述

    1. 先查看当前线程池的线程数量是否小于核心线程数大小,小于则新建线程并直接执行任务
    2. 大于则放入阻塞队列中,加入阻塞后,任务在未来某个时间点被一个空闲的线程取出执行,但是在加入到阻塞队列后,还会检查当前是否有线程存在(因为当核心线程数量设置为0的时候,程序也会执行到一步,但是会出现一个情况:阻塞队列里的任务无法被提取执行,因为当前线程池并没有线程,所有这里会创建一个线程)
    3. 当阻塞队列也满了,则会新建线程并立即执行任务
    4. 若线程数量也达到最大限制就会执行对应的拒绝策略
  • 相关阅读:
    java递归简介说明
    Python中prettytable库
    MySQL 字段值按照分隔符拆分成列(行转列)
    【Idea】Idea 集成工具研发所使用的快捷键集合
    多御安全浏览器使用技巧,剖析7大优点特性
    精选30个大模型高频面试题
    导航【数据结构与算法】【精致版】
    【STM32】STM32F103C6T6标准外设库
    AUTOSAR知识点 之 AUTOSAR-OS (三):基础认知+任务调度博文知识点补充
    2023最新SSM计算机毕业设计选题大全(附源码+LW)之java家政服务系统dw173
  • 原文地址:https://blog.csdn.net/qq_60264381/article/details/130687546