• 当线程池任务抛出异常


    最近在应用场景中需要用线程池开多线程,但是有时候通过日志和监控会发现,异步线程的任务突然停止,搞得我排查起来一脸懵逼,无从下手,后来师兄帮我翻业务代码才发现,原来是新线程里的任务抛出运行时异常了,导致我开的用户线程直接“跪”了。那么为什么线程池中的线程不会将异常抛出来呢,抛出异常的线程又会是什么状态呢?此贴特地分析一下

    情况复现

    #submit

    1. 线程执行的任务抛出异常,但是没有被处理
    2. 发生业务异常的用户线程并没有“挂掉”,而是变成了WATTING
    3. 程序还在运行

    #execute

    1. 线程执行的任务抛出异常,被成功捕获
    2. 发生业务异常的用户线程直接结束,状态变成了TERMINATED
    3. 系统没有结束,还在运行

    #schedule

    1. 线程执行的任务抛出异常,但是没有被处理
    2. 线程池中线程的状态为WATTING
    3. 程序继续运行,但是抛出异常的定时任务不再被执行

    原因分析

    #execute

    对于execute来说,这个比较好理解,从源码中可以发现,线程池中的线程在执行runWorker的时候,如果任务中抛出异常,则线程会直接将异常抛出:

    1. final void runWorker(Worker w) {
    2. // ... 省略代码
    3. try {
    4. while (task != null || (task = getTask()) != null) {
    5. // ... 省略代码
    6. try {
    7. beforeExecute(wt, task);
    8. Throwable thrown = null;
    9. try {
    10. task.run();
    11. } catch (RuntimeException x) {
    12. thrown = x; throw x; // 这里抛出异常
    13. } catch (Error x) {
    14. thrown = x; throw x;
    15. } catch (Throwable x) {
    16. thrown = x; throw new Error(x);
    17. } finally {
    18. afterExecute(task, thrown);
    19. }
    20. } finally {
    21. task = null;
    22. w.completedTasks++;
    23. w.unlock();
    24. }
    25. }
    26. completedAbruptly = false;
    27. } finally {
    28. processWorkerExit(w, completedAbruptly); // 抛出异常后,执行这里
    29. }
    30. }

    并且,在抛出异常后,为了避免异常任务的线程被污染,执行该任务的worker线程会被销毁,然后重新创建一个无任务非核心的worker线程。所以,即使线程池中的所有任务都失败了,只要核心线程数不为0,程序就将会一直被workQueue#poll阻塞,导致Jvm不会退出

    1. private void processWorkerExit(Worker w, boolean completedAbruptly) {
    2. if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    3. decrementWorkerCount();
    4. final ReentrantLock mainLock = this.mainLock;
    5. mainLock.lock();
    6. try {
    7. completedTaskCount += w.completedTasks;
    8. workers.remove(w); // 移除当前异常Worker
    9. } finally {
    10. mainLock.unlock();
    11. }
    12. tryTerminate();
    13. int c = ctl.get();
    14. if (runStateLessThan(c, STOP)) { // 如果不是STOP状态
    15. if (!completedAbruptly) {
    16. int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    17. if (min == 0 && ! workQueue.isEmpty())
    18. min = 1;
    19. if (workerCountOf(c) >= min) // 且线程数小于核心线程
    20. return;
    21. }
    22. addWorker(null, false); // 新增一个线程执行
    23. }
    24. }

    #submit

    对于submit来说,可能要相对特殊一点,因为它执行的任务是有返回值的,在一开始的提交任务的时候,线程池就通过FutureTask对其进行了封装

    1. public Future submit(Runnable task) {
    2. if (task == null) throw new NullPointerException();
    3. RunnableFuture ftask = newTaskFor(task, null); // 封装该应用
    4. execute(ftask);
    5. return ftask;
    6. }

    所以,在线程池执行runWorker的方法时,实际进入的下面的方法:

    1. public void run() {
    2. // ... 省略代码
    3. try {
    4. Callable c = callable;
    5. if (c != null && state == NEW) {
    6. V result;
    7. boolean ran;
    8. try {
    9. result = c.call();
    10. ran = true;
    11. } catch (Throwable ex) {
    12. result = null;
    13. ran = false;
    14. setException(ex); // FutureTask会catch异常
    15. }
    16. if (ran)
    17. set(result);
    18. }
    19. } finally {
    20. // ... 省略代码
    21. }
    22. }

    这个时候,真相就慢慢浮出水面,FutureTask会将异常封装成outcome的Object对象,在使用FuntureTask#get调用report方法的时候,将异常封装成ExecutionException重新抛出:

    1. public V get() throws InterruptedException, ExecutionException {
    2. int s = state;
    3. if (s <= COMPLETING)
    4. s = awaitDone(false, 0L);
    5. return report(s);
    6. }
    7. private V report(int s) throws ExecutionException {
    8. Object x = outcome;
    9. if (s == NORMAL)
    10. return (V)x;
    11. if (s >= CANCELLED)
    12. throw new CancellationException();
    13. throw new ExecutionException((Throwable)x); // 抛出该异常
    14. }

    至于此时为啥线程池中的线程还处于WAITING状态,则是因为当前任务执行结束,Worker在调用workQueue#poll方法被阻塞了而已

    #schedule

    #schedule是定时线程池的执行方法,按照道理讲,即使任务抛出异常,线程池也应该定时执行该任务,那为什么任务被停止了呢?

    这要回到FutureTask的子类ScheduledFutureTask中去看,我们可以发现,正常情况下,当定时任务被执行完成的时候,线程会重新把该任务放到执行队列中

    1. public void run() {
    2. boolean periodic = isPeriodic();
    3. if (!canRunInCurrentRunState(periodic))
    4. cancel(false);
    5. else if (!periodic)
    6. ScheduledFutureTask.super.run();
    7. else if (ScheduledFutureTask.super.runAndReset()) {
    8. setNextRunTime();
    9. reExecutePeriodic(outerTask);// 重新将任务放到队列中执行
    10. }
    11. }

    但是我们深入进#runAndReset方法中就会发现,当任务抛出异常,#runAndReset返回的是false,此时就不会将定时任务入列了,所以,抛出异常的定时任务,是不会被重新执行的

    1. protected boolean runAndReset() {
    2. // ...省略代码
    3. boolean ran = false;
    4. int s = state;
    5. try {
    6. Callable c = callable;
    7. if (c != null && s == NEW) {
    8. try {
    9. c.call(); // don't set result
    10. ran = true;
    11. } catch (Throwable ex) {
    12. setException(ex); // 抛出异常后,ran仍然是false
    13. }
    14. }
    15. } finally {
    16. // ...省略代码
    17. }
    18. return ran && s == NEW; // 此处返回false
    19. }

    解决方案

    1. 在任务中捕获异常并处理,防止干扰到线程池线程的执行
    2. 如果是#execute方法的话,可以利用ThreadFactory设置处理线程unCatch异常的逻辑
    3. 如果是#submit或者是#schedule这样通过Callable的调用,则需要调用其#get方法,对任务的异常进行显示的处理

    一些思考

    这次的问题,只要了解线程池,还是比较简单的;相对来说,比较充分地暴露出来我基础知识不够展示的地方,以前都是背八股文,虽然在背的时候,对线程池的ctl,各种配置张口就来,但是过段时间后,在真正使用,遇见问题时还是一脸懵逼。果然有句话说得好,“看过的,只能是别人的,唯有经历了,才是自己的”~

    如何关闭线程

    线程池shutdown的时候,就会关闭一些线程,这个是如何做的呢?

    当调用线程池的#shutdown方法的时候,线程池会遍历所有的workers,然后将每个workers中的闲置线程中断,当线程接收到中断的时候,会从#getTask的阻塞中跳出来,重新检查两个地方:

    1. 队列是否为空
    2. 当前状态是否是STOP(调用shutDownNow方法)
    1. private Runnable getTask() {
    2. boolean timedOut = false; // Did the last poll() time out?
    3. for (;;) {
    4. int c = ctl.get();
    5. int rs = runStateOf(c);
    6. // Check if queue empty only if necessary.
    7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    8. decrementWorkerCount();
    9. return null; // 如果是shutDown则退出
    10. }
    11. // ... 省略代码
    12. }
    13. }

    如果符合,则#getTask会返回null,此时,工作线程进入进入#processWorkerExit关闭当前工作线程,再进入#addWorker方法,会发现,此时也有两点:

    1. STOP状态
    2. SHUTDOWN且队列为空
    1. private boolean addWorker(Runnable firstTask, boolean core) {
    2. retry:
    3. for (;;) {
    4. int c = ctl.get();
    5. int rs = runStateOf(c);
    6. // Check if queue empty only if necessary.
    7. if (rs >= SHUTDOWN &&
    8. ! (rs == SHUTDOWN &&
    9. firstTask == null &&
    10. ! workQueue.isEmpty()))
    11. return false; // 此时便不会新增worker
    12. // ... 省略代码
    13. return workerStarted;
    14. }

    如果符合,则不会再新创建线程。

    核心通过中断线程,将闲置线程从#poll中解放出来,并且关闭;如果是shutDown状态,则需要等到任务执行完才可以关闭;如果是stop状态,则不会等到任务执行完,就会关闭

    线程池相关知识

    看完ThreadPoolExecutor的代码后,我特地抽时间画了一张图,希望可以加深自己的理解:

    同理,ScheduledThreadPoolExecutor作为ThreadPoolExecutor的子类,基本上大差不差。与其最大的不同之处就在于队列:定时线程池自定义了DelayedWorkQueue队列,work线程从队列中获取任务时,队列会去判断时间,只有时间满足,才会将任务出列。同时在任务被执行完成后,定时线程池会将该任务重新入队,便于之后重新调用

  • 相关阅读:
    MWC 2024 | 广和通携手意法半导体发布智慧家居解决方案
    【快速学习系列】Mybatis缓存和使用SpringBoot开启MyBatis缓存+ehcache
    人工智能基础_机器学习044_使用逻辑回归模型计算逻辑回归概率_以及_逻辑回归代码实现与手动计算概率对比---人工智能工作笔记0084
    Zookeeper 单机集群Windows和Linux部署搭建教程
    这款键盘你真的要考虑一下!——Keychron K3测评
    剑指offer(C++)-JZ69:跳台阶(算法-动态规划)
    华为云云耀云服务器L实例评测|轻量级应用服务器对决:基于 Geekbench 深度测评华为云云耀云服务器L实例的处理器性能
    Maven构建多模块项目
    SpringCloud Alibaba - 分布式事务理论(CAP 定理 和 BASE 理论)
    React过渡动画
  • 原文地址:https://blog.csdn.net/coder_what/article/details/126256420