• 线程池如何优雅关闭


    先在 ThreadPoolExecutor 基础入门 中介绍了线程池相关的基础内容;又在 ThreadPoolExecutor 运行源码分析 中,分析了 ThreadPoolExecutor 的部分源码,包括状态流转、任务执行。这篇文章将介绍线程池应该如何优雅关闭。

    先说结论

    死循环版本:

    xxx(){
    
    	executor.shutdown();
    	// 或
    	// executor.shutdownNow();
    
    	try {
    		boolean loop = true;
    		do {
    			loop != executor.awaitTermination(2, TimeUnit.SECONDS);
    		} while(loop);
    	} catch (interruptedException e){
    
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    贴下 nacos 中的代码,非死循环版本:

        public static void shutdownThreadPool(ExecutorService executor, Logger logger) {
            executor.shutdown();
            int retry = 3;
            while (retry > 0) {
                retry--;
                try {
                    if (executor.awaitTermination(100, TimeUnit.MILLISECONDS)) {
                        return;
                    }
                } catch (InterruptedException e) {
                    executor.shutdownNow();
                    Thread.interrupted();
                } catch (Throwable ex) {
                    if (logger != null) {
                        logger.error("ThreadPoolManager shutdown executor has error : ", ex);
                    }
                }
            }
            executor.shutdownNow();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    分析

    shutdown()

    调用 shutdown() 方法之后线程池并不是立刻就被关闭。这时线程池中可能还有很多任务正在被执行,或是任务队列中有大量正在等待被执行的任务,调用 shutdown() 方法后线程池会在执行完正在执行的任务和队列中等待的任务后才彻底关闭,调用 shutdown() 方法后如果还有新的任务被提交,线程池则会根据拒绝策略直接拒绝后续新提交的任务。

        public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
            	// 检查有没有关闭线程池的权限
                checkShutdownAccess();
                // 设置状态为 SHUTDOWN
                advanceRunState(SHUTDOWN);
                // 下行代码实际执行:interruptIdleWorkers(false),不中断闲置的线程
                interruptIdleWorkers();
                // 给 ScheduledThreadPoolExecutor 用的钩子函数
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    interruptIdleWorkers()

        private void interruptIdleWorkers(boolean onlyOne) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers) {
                    Thread t = w.thread;
                    // 如果 tryLock() 成功,则说明线程处于空闲状态;不成功,说明线程在持有锁,执行某个任务。
                    if (!t.isInterrupted() && w.tryLock()) {
                        try {
                            t.interrupt();
                        } catch (SecurityException ignore) {
                        } finally {
                            w.unlock();
                        }
                    }
                    if (onlyOne)
                        break;
                }
            } finally {
                mainLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    shutdownNow()

    在执行 shutdownNow() 之后,首先会给所有线程池中的线程发送 interrupt 中断信号,尝试中断这些任务的执行,然后会将任务队列中正在等待的所有任务转移到一个 List 中并返回,我们可以根据返回的任务 List 来进行后续补救操作(先记录落库,需要时取出来进行重试之类的)。

        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                // 设置为 STOP 状态
                advanceRunState(STOP);
                // 中断所有线程
                interruptWorkers();
                // 清空队列
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    interruptWorkers()

        private void interruptWorkers() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (Worker w : workers)
                	// 不管线程在不在执行任务,都发送中断信号
                    w.interruptIfStarted();
            } finally {
                mainLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    tryTerminate()

    再来看下 shutdown() 以及 shutdownNow() 都调用到了的 tryTerminate() 方法。

    	final void tryTerminate() {
            for (;;) {
                int c = ctl.get();
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                    return;
                if (workerCountOf(c) != 0) { // Eligible to terminate
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
    			// 如果 workQueue 为空,workCount 为 0,才会执行下述代码
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                	// 将状态切换到 TIDYING
                    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                            terminated();
                        } finally {
                        	// 将状态改为 TERMINATED
                            ctl.set(ctlOf(TERMINATED, 0));
                            // 通知
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    awaitTermination()

    在指定时间内阻塞,指定时间内线程池关闭返回 true。

        public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                for (;;) {
                	// 判断状态是不是 TERMINATED,是则返回 true
                    if (runStateAtLeast(ctl.get(), TERMINATED))
                        return true;
                    if (nanos <= 0)
                        return false;
    				
        			// private final Condition termination = mainLock.newCondition();
                    // 通过 termination 条件变量阻塞,termination 会在 tryTerminate() 中使用
                    nanos = termination.awaitNanos(nanos);
                }
            } finally {
                mainLock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    参考

    Java并发实现原理:JDK源码剖析

    如何优雅的关闭 Java 线程池

  • 相关阅读:
    洛谷-P1255-数楼梯
    Django项目MySQL数据库字段从Char改为TEXT,报错详解 Row size too large
    mysql面试题8:MySQL的日志有哪些?MySQL的bin log、redo log、undo log区别和联系
    JWT原理和整合Springboot实现登录认证
    【Python3】基础 - 基本数据类型
    什么是蜘蛛池?-免费蜘蛛池搭建软件
    idea移除许可证
    天龙八部科举答题问题和答案(全3/8)
    fastTEXT论文解读并附实例代码
    vue2使用change事件监听不了回车事件的问题
  • 原文地址:https://blog.csdn.net/MrBaymax/article/details/126575672