• ThreadPoolExecutor 运行源码分析


    本文基于版本为 1.8.0_281 的 JDK 对 ThreadPoolExecutor 的源码进行分析
    在这里插入图片描述
    不知道比较新的 JDK 内 ThreadPoolExecutor 内实现咋样的,应该大差不差吧,研究好主流的 JDK 1.8 一通百通。好像 11 用了几个新的判断状态的方法,但是整体思路没变。

    阅读以下内容建立在你已经知道 线程池大概的流程 基础上。

    基础铺垫

    先说下可以咱们平常使用时自定义的内容:

    
    	// 阻塞队列
        private final BlockingQueue<Runnable> workQueue;
    
    	// 全局锁,创建新的线程时使用
        private final ReentrantLock mainLock = new ReentrantLock();
    
    	// 存放线程池中所有活跃的工作线程
        private final HashSet<Worker> workers = new HashSet<Worker>();
    
    	// awaitTermination 方式使用的表变量
        private final Condition termination = mainLock.newCondition();
    
    	// 峰值线程数
        private int largestPoolSize;
    
    	// 记录已经成功执行完毕的任务数
        private long completedTaskCount;
    
    	// 创建线程的工程,默认使用 DefaultThreadFactory
        private volatile ThreadFactory threadFactory;
    
        // 非核心线程闲置超时时长,超过后被回收
        private volatile long keepAliveTime;
    
    
        // 核心线程数最大值
        private volatile int corePoolSize;
    
        // 线程总数最大值
        private volatile int maximumPoolSize;
    	
    	// 拒绝策略,默认是 丢弃任务并抛出RejectedExecutionException异常
        private static final RejectedExecutionHandler defaultHandler =
            new AbortPolicy();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    状态流转

    再来说下有关状态流转的内容,各状态标识主要是通过位运算实现。

    COUNT_BITS :此值为 29.

    private static final int COUNT_BITS = Integer.SIZE - 3;
    
    • 1

    CAPACITY:此值的二进制表示为 000 1 1111 1111 1111 1111 1111 1111 1111

        private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    
    • 1

    再来说下几个具体的状态。

        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
        // 运行中状态,可以接收新的任务和执行任务队列中的任务
        private static final int RUNNING    = -1 << COUNT_BITS;
        // shutdown状态,不再接收新的任务,但是会执行任务队列中的任务
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        // 停止状态,不再接收新的任务,也不会执行任务队列中的任务,中断所有执行中的任务
        private static final int STOP       =  1 << COUNT_BITS;
        // 整理中状态,所有任务已经终结,工作线程数为0,过渡到此状态的工作线程会调用钩子方法terminated()
        private static final int TIDYING    =  2 << COUNT_BITS;
        // 终结状态,钩子方法terminated()执行完毕
        private static final int TERMINATED =  3 << COUNT_BITS;
    
    	
        // 获取运行状态:~ CAPACITY 按位
        private static int runStateOf(int c)     { return c & ~CAPACITY; }
        // 获取工作线程数量
        private static int workerCountOf(int c)  { return c & CAPACITY; }
        // 计算 ctl 的值
        private static int ctlOf(int rs, int wc) { return rs | wc; }
    
        /*
         * Bit field accessors that don't require unpacking ctl.
         * These depend on the bit layout and on workerCount being never negative.
         */
    	// c 是不是比 s 小,用来状态判断
        private static boolean runStateLessThan(int c, int s) {
            return c < s;
        }
    	// c 是否大于等于 s,用来状态判断
        private static boolean runStateAtLeast(int c, int s) {
            return c >= s;
        }
    	// 判断是否运行
        private static boolean isRunning(int c) {
            return c < SHUTDOWN;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    其实比较几个状态大小的时候直接比「从左往右数的前三位」就行。

    再贴张来自 Java线程池实现原理及其在美团业务中的实践 的图片说明各个状态特征和的流转吧:
    在这里插入图片描述

    在这里插入图片描述

    源码详解

    execute(Runnable command)

    执行

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn't, by returning false.
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             *
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.
             */
             
             // 获取 ctl 的值
            int c = ctl.get();
            // 1 如果当前工作的线程数小于设置的核心线程数
            if (workerCountOf(c) < corePoolSize) {
            	// 创建新的线程(addWorker 的第二个参数用来决定是根据 corePoolSize 判断还是用 maximumPoolSize 判断)
                if (addWorker(command, true))
                // 添加成功,直接返回
                    return;
                // 添加失败,重新获取 ctl 的值更新 c
                c = ctl.get();
            }
    
    		// 2 此时当前工作线程数量大于等于核心线程数
    		
            // 2.1 如果当前线程池是运行状态并且添加到队列成功
            if (isRunning(c) && workQueue.offer(command)) {
            	// 再次获取 ctl 的值
                int recheck = ctl.get();
                // 再次判断线程池运行状态,如果状态是非运行并且将刚刚入队的任务从队列中移除成功
                if (! isRunning(recheck) && remove(command))
                	// 调用拒绝策略
                    reject(command);
                    
                // 此时可能的情况:
                // 线程池运行、任务入队成功
                // 线程池非运行、入队的任务移除失败(可能被执行了?)
                
         		// 如果当前线程池工作线程数量是 0,添加一个线程(但是这个线程没有被指定的第一个任务)
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
    
            // 2.2 可能的情况:线程池运行但是任务入队失败、线程池不在 running 状态
            // 添加线程(不是核心线程)失败,执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    addWorker(Runnable firstTask, boolean core);

    添加任务

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
    			// 先判断 rs >= SHUTDOWN:查看运行状态是不是大于等于 SHUTDOWN
    			// 再判断以下三个条件:
    			// 1. 看线程池状态是不是 SHUTDOWN 状态
    			// 2. 看 firsttask 是不是为 null
    			// 3. 看任务队列是否为空
    
    			// 情况 1:线程池状态是 SHUTDOWN,如果 firstTask 不是 null 直接返回 false;
    			// 情况 2:线程池状态是 SHUTDOWN,如果 firstTask 是 null 但是队列为空,返回 false
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                	// 获取工作线程数
                    int wc = workerCountOf(c);
                    // 如果工作线程数超过最大容量或者工作线程数大于等于核心线程数(最大线程数),创建核心线程(非核心线程失败)
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 线程数 +1 成功
                    if (compareAndIncrementWorkerCount(c))
                    	// 跳出
                        break retry;
                    // CAS 将线程数加 1 失败,重新获取 ctl 值
                    c = ctl.get();  // Re-read ctl
                    // 如果运行状态变了
                    if (runStateOf(c) != rs)
                    	// 继续循环
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    		// 线程是否启动标识
            boolean workerStarted = false;
            // 线程是否添加标识
            boolean workerAdded = false;
            Worker w = null;
            try {
            	// 创建 worker 对象
                w = new Worker(firstTask);
                // 获取 worker 对象的线程
                final Thread t = w.thread;
                if (t != null) {
                	// 这里加锁的目的是为了防止多线程下更新 workers 等属性出问题
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        // 再次看线程池状态(可能出问题的原因:在获取到锁之前线程池关了或者工厂出问题)
                        int rs = runStateOf(ctl.get());
    					// rs 小于 shutdown -> rs 是 running 状态
    					// 如果 rs 是 RUNNING 状态或 rs 是 SHUTDOWN 状态并且firstTask 为 null,向线程池中添加线程(此处体现了 shutdown 虽然不接受新任务,但是可以继续处理之前任务)
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // hashset 添加
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                            // 线程池中最大的线程数量
                                largestPoolSize = s;
                            // 更新工作线程是否添加成功标识为 true,后面才会调用Thread#start()方法启动真实的线程实例
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                    	// 线程启动
                        t.start();
                        // 标记线程启动成功
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                	// 线程没有启动成功,
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    addWorkerFailed(Worker w)

    添加 worker 失败的处理

        private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                	// 移除一个 worker
                    workers.remove(w);
                // 将 workerCount 减一
                decrementWorkerCount();
                // 
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    worker 类

    再来说下 worker 这个类方便下文展开叙述。

    线程池中的每个线程都会被封装成一个 Worker 对象,ThreadPool 中通过 private final HashSet workers = new HashSet(); 维护了 Worker 集合。

    基于 AQS 实现,欢迎前往阅读斧正笔者之前的文章: AQS 源码解析

    	// 继承了 AQS,实现了 Runnable 接口
        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            // thread 通过构造方法中的工厂产生,可以为 null
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            // 要执行的第一个任务,可以为 null
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
             // 构造函数
            Worker(Runnable firstTask) {
            	// 禁止线程中断,直到 runWorker 方法执行
            	// state 为 1 表示持有,0 表示未持有,runWorker() 中会先解锁在加锁
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                // 产生 thread 实例
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            // 由于实现了 Runnable 接口,启动时会调用,详细看 runworker(Worker w) 方法
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    		// 是否有独占锁,state 为 1 表示持有,0 表示未持有
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    		// 获取资源
            protected boolean tryAcquire(int unused) {
            	// 如果更新状态成功(未持有 -> 持有)
                if (compareAndSetState(0, 1)) {
                	// 成功就讲独占线程设置为当前线程
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    		// 释放资源
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    		// 加锁
            public void lock()        { acquire(1); }
            // 尝试加锁
            public boolean tryLock()  { return tryAcquire(1); }
            // 解锁
            public void unlock()      { release(1); }
            // 是否被加锁
            public boolean isLocked() { return isHeldExclusively(); }
    		// 启动后线程中断
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    补充点注释中的内容:

    Class Worker mainly maintains interrupt control state for threads running tasks, along with other minor bookkeeping.This class opportunistically extends AbstractQueuedSynchronizer to simplify acquiring and releasing a lock surrounding each task execution. This protects against interrupts that are intended to wake up a worker thread waiting for a task from instead interrupting a task being run. We implement a simple non-reentrant mutual exclusion lock rather than use ReentrantLock because we do not want worker tasks to be able to reacquire the lock when they invoke pool control methods like setCorePoolSize. Additionally, to suppress interrupts until the thread actually starts running tasks, we initialize lock state to a negative value, and clear it upon start (in runWorker).

    这个 Worker 主要维护任务线程的中断控制状态,以及其他信息。

    扩展了 AQS 来简化每个执行任务获取和释放锁的工程,这样可以防止中断在运行的线程。

    实现了一个简单的不可重入互斥锁,不是重入锁的原因是不想在设置 setCorePoolSize 等一些控制类的方法时重新获取锁。(我理解的是各干各的不影响,你改核心线程数改你的,不影响当前某个线程)

    runWorker(Worker w)

    运行

        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            // 这个时候运行中断
            w.unlock(); // allow interrupts
            // 记录是否有异常(因为某些异常导致退出循环)
            boolean completedAbruptly = true;
            try {
            	// 任务不为空,直接执行;如果任务为空,去获取任务,获取到任务后执行以下内容
                while (task != null || (task = getTask()) != null) {
                	// 加锁,加了之后就不能被中断了
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
    
                    // 如果线程池状态值最小是 stop(1) 确保线程被中断;如果不是,确保不被中断
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                        	// 执行任务
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                    	// 将当前 task 置 null,方便下次循环取任务
                        task = null;
                        // 累加 worker 完成的任务数
                        w.completedTasks++;
                        //解锁,将 state 置 0
                        w.unlock();
                    }
                }
                // 如果 task 为 null,getTask() 也为 null,说明有异常
                completedAbruptly = false;
            } finally {
            	// 处理线程退出的情况
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    getTask()

    这个方法是为了获取任务

        private Runnable getTask() {
    	    // 上次获取任务是否超时
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                // 条件 1:当前线程池状态为 SHUTDOWN、STOP、TIDYING、TERMINATED
                // 条件 2:当前线程池状态为 STOP、TIDYING、TERMINATED 或者 队列为空
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                	// 将 worker 数量减 1
                	
                	// 这里我的理解是如果线程池处于 STOP、TIDYING、TERMINATED 状态,或者线程池处于 SHUTDOWN 状态但是队列为空,说明任务都有被执行的,当前的 worker 可以下岗了,所以要减一。
                	
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                // allowCoreThreadTimeOut 默认为 false
                // 当前工作线程数量是否大于核心线程数
                // 有一个 true,timed 就为 true
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
    			// 条件 1:工作线程数大于之前的最大线程数「这个是因为可以重新设置」,或者(核心线程允许超时或者当前工作线程数大于核心线程数)并且上一次 getTask 时超时
    			// 条件 2:工作线程数大于 1 或者队列为空
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    // 将工作线程数减 1 是否成功
                    if (compareAndDecrementWorkerCount(c))
                    	// 成功就直接返回 null
                        return null;
                   	// 失败去重试
                    continue;
                }
    
                try {
                	// TRUE 的话通过 poll 拉取,指定时间没拿到返回 null;
                	// FALSE 的话,一直等到 take 到了才返回
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                    	// 不为 null 直接返回
                        return r;
                    // 拿到的为 null,说明超时
                    timedOut = true;
                } catch (InterruptedException retry) {
                	// 线程出异常,置为 false,继续循环
                    timedOut = false;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    processWorkerExit(Worker w, boolean completedAbruptly)

    这个方法主要是为了处理 worker 退出

        private void processWorkerExit(Worker w, boolean completedAbruptly) {
       		// runworker() 方法执行异常,completedAbruptly 为 true,没异常 completedAbruptly 为 false
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            	// 减少一个
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
           		// 统计完成任务梳理
                completedTaskCount += w.completedTasks;
                // 移除当前 worker,或者说线程中减少个线程
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    		// 判断是否要 terminate 线程池
            tryTerminate();
    
            int c = ctl.get();
            // 如果当前状态是 shutdown 或者 running
            if (runStateLessThan(c, STOP)) {
            	// runworker() 执行的时候没异常
                if (!completedAbruptly) {
                	// 允许核心线程超时,最小值为 0,否则为 corePoolSize
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    // 工作线程数大于等于最小值,直接返回不新增非核心线程
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                // runworker() 执行有异常,添加个 worker
                // 还有种情况:当前工作线程数为 0,调用 addWorker() 添加个新的线程,消耗队列中的任务(此时不为队列不为空)
                addWorker(null, false);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    写在最后

    再重复下岗开始说的内容吧:按照需求新建了线程池之后,execute 先正常 addworker,并启动,如果超过设置的核心线程数之后将任务放到队列中,如果还来任务,再将新的任务和线程进行 addworker 并执行,然后各个线程执行完自己任务后再去队列中取任务(getTask() 方法保证了线程复用)。

    本来想着一篇把线程池的一些基础特性(概念)、运行、关闭,后来想了想,但是运行就肝了将近一周,所以内容还是分开写吧。基础的内容慢慢补充,关闭的内容就留在 ThreadPoolExecutor 关闭源码分析 中说吧,敬请期待。

  • 相关阅读:
    Verilog 实现CDC中单bit 跨时钟域,从慢时钟域到快时钟域
    STM32 DMA从存储器发送数据到串口
    数据库事务
    CocosCreator 面试题(三)JavaScript闭包原理和作用
    MFC 注册表
    一、MyBatis-Plus(未完成)
    计算机毕业设计springboot+vue基本微信小程序的考试系统
    python基于django的汽车租赁系统nodejs+vue+element
    计算机是如何工作的??(多进程编程)
    使用MONAI轻松加载医学公开数据集,包括医学分割十项全能挑战数据集和MedMNIST分类数据集
  • 原文地址:https://blog.csdn.net/MrBaymax/article/details/125978748