• 每天一个知识点- 线程池中线程执行任务发生异常会怎么样


    在这里插入图片描述

    每日一题

    在这里插入图片描述

    线程池中线程执行任务发生异常会怎么样

    我们先从测试代码到源码的角度进行分析,最后在进行总结

    1. 线程池提交任务有两个方法
      • submit
      • execute

    使用 execute 提交任务

    1. 测试代码

         public static void main(String[] args) throws InterruptedException, ExecutionException {
              ThreadFactory threadFactory = new ThreadFactory() {
                  AtomicInteger index = new AtomicInteger(1);
                  @Override
                  public Thread newThread(Runnable r) {
                      Thread thread = new Thread(r);
                      thread.setName("threadFactory-"+index);
                      index.incrementAndGet();
                      return thread;
                  }
              };
              ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), threadFactory);
      
              threadPoolExecutor.execute(() -> {
                  System.out.println(Thread.currentThread().getName());
                   int a = 1/0;
              });
              TimeUnit.SECONDS.sleep(2);
      
              threadPoolExecutor.execute(() -> {
                  System.out.println(Thread.currentThread().getName());
              });
      
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
    2. 执行结果

      threadFactory-1
      Exception in thread "threadFactory-1" java.lang.ArithmeticException: 除以零
      	at com.zclvct.leetcode.ThreadPollTest.lambda$main$0(ThreadPollTest.java:28)
      	at com.zclvct.leetcode.ThreadPollTest$$Lambda$1/00000000037A8980.run(Unknown Source)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:823)
      threadFactory-2
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
    3. 分析结果
      在执行中结果中也可看出,execute执行方式抛出异常显示在控制台了 ,并且再次提交任务,发现是另一个线程去执行的任务

    4. 源码分析
      在线程中提交任务是把任务包装成 worker 对象, 调用 runWorker 来执行,一下是 runWorker 方法

      final void runWorker(Worker w) {
              Thread wt = Thread.currentThread();
              Runnable task = w.firstTask;
              w.firstTask = null;
              w.unlock(); // allow interrupts
              boolean completedAbruptly = true;
              try {
                  while (task != null || (task = getTask()) != null) {
                      w.lock();
                      if ((runStateAtLeast(ctl.get(), STOP) ||
                           (Thread.interrupted() &&
                            runStateAtLeast(ctl.get(), STOP))) &&
                          !wt.isInterrupted())
                          wt.interrupt();
                      try {
                          beforeExecute(wt, task);
                          Throwable thrown = null;
                          try {
                              task.run();
                          } catch (RuntimeException x) {
                              thrown = x; throw x;
                          } catch (Error x) {
                              thrown = x; throw x;
                          } catch (Throwable x) {
                              thrown = x; throw new Error(x);
                          } finally {
                              afterExecute(task, thrown);
                          }
                      } finally {
                          task = null;
                          w.completedTasks++;
                          w.unlock();
                      }
                  }
                  completedAbruptly = false;
              } finally {
                  processWorkerExit(w, completedAbruptly);
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39

    我们只看这里的主要部分
    1. beforeExecute(wt, task); 执行调用前的钩子方法
    2. task.run(); 执行任务
    3. afterExecute(task, thrown); 执行调用后的钩子方法
    4. processWorkerExit(w, completedAbruptly);
    其中 afterExecute(task, thrown); 在 finally 代码块中 ,并且 传递了任务本身和异常信息,可以在发生异常时提供业务补偿的方式
    同时 从代码中可以看出 虽然 catch 到异常了 但是没有处理 而是直接抛出
    在最外层调用了processWorkerExit(w, completedAbruptly);
    来看一下这个方法中做了什么

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    	        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    	            decrementWorkerCount();
    	
    	        final ReentrantLock mainLock = this.mainLock;
    	        mainLock.lock();
    	        try {
    	            completedTaskCount += w.completedTasks;
    	            workers.remove(w);
    	        } finally {
    	            mainLock.unlock();
    	        }
    	
    	        tryTerminate();
    	
    	        int c = ctl.get();
    	        if (runStateLessThan(c, STOP)) {
    	            if (!completedAbruptly) {
    	                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
    	                if (min == 0 && ! workQueue.isEmpty())
    	                    min = 1;
    	                if (workerCountOf(c) >= min)
    	                    return; // replacement not needed
    	            }
    	            addWorker(null, false);
    	        }
    	    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    processWorkerExit 方法有两个参数,第一个参数是执行的 worker 对象, 第二个参数 代表 执行过程中是否发生了异常
    执行步骤

    1. 判断是否是意外退出的,如果是意外退出的话,那么就需要把WorkerCount–
    2. 加完锁后,将completedTaskCount,表示总共完成的任务数,并且从WorkerSet中将对应的Worker移除
    3. 判断当前的线程池状态,是否终止线程池
    4. 判断线程池的状态是否小于STOP,也就是处于RUNNING或者SHUTDOWN状态,如果不是不执行
    5. 上一步判断返回true 则判断线程是否抛出异常
      1)如果allowCoreThreadTimeOut=true且队列不为空,那么需要至少保证有一个线程
      2)如果allowCoreThreadTimeOut=false,那么需要保证线程数大于等于corePoolSize
    6. 如果线程抛出异常 放一个空的 worker 尝试新建一个线程

    使用 execute 提交任务总结

    execute 提交任务时,当执行发生异常,那么会直接抛出异常,并且移除异常线程也就是 worker ,并且尝试放入一个新的线程

    使用 submit 提交任务

    1. 测试代码

       public static void main(String[] args) throws InterruptedException, ExecutionException {
              ThreadFactory threadFactory = new ThreadFactory() {
                  AtomicInteger index = new AtomicInteger(1);
                  @Override
                  public Thread newThread(Runnable r) {
                      Thread thread = new Thread(r);
                      thread.setName("threadFactory-"+index);
                      index.incrementAndGet();
                      return thread;
                  }
              };
              ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), threadFactory);
      
              Future<Object> future = (Future<Object>) threadPoolExecutor.submit(() -> {
                  System.out.println(Thread.currentThread().getName());
                  int a = 1 / 0;
              });
      
              TimeUnit.SECONDS.sleep(2);
              threadPoolExecutor.execute(() -> {
                  System.out.println(Thread.currentThread().getName());
              });
              future.get();
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
    2. 运行结果

      threadFactory-1
      threadFactory-1
      Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: 除以零
      	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
      	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
      	at com.zclvct.leetcode.ThreadPollTest.main(ThreadPollTest.java:35)
      Caused by: java.lang.ArithmeticException: 除以零
      	at com.zclvct.leetcode.ThreadPollTest.lambda$main$0(ThreadPollTest.java:28)
      	at com.zclvct.leetcode.ThreadPollTest$$Lambda$1/000000000432EA90.run(Unknown Source)
      	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
      	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
      	at java.util.concurrent.FutureTask.run(FutureTask.java)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
      	at java.lang.Thread.run(Thread.java:823)
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15

      从运行结果上来看 使用submit提交的任务,发生异常 ,不会立刻抛出异常,而是当 调用future.get();时发生异常,同时执行异常的线程没有被抛弃

    3. 源码分析
      submit 提交的任务 实际上是调用FutureTask类的run方法如下: 这里他能提交 Runable 还有 Callable也是使用了适配器模式

      public void run() {
              if (state != NEW ||
                  !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                               null, Thread.currentThread()))
                  return;
              try {
                  Callable<V> c = callable;
                  if (c != null && state == NEW) {
                      V result;
                      boolean ran;
                      try {
                          result = c.call();
                          ran = true;
                      } catch (Throwable ex) {
                          result = null;
                          ran = false;
                          setException(ex);
                      }
                      if (ran)
                          set(result);
                  }
              } finally {
                  // runner must be non-null until state is settled to
                  // prevent concurrent calls to run()
                  runner = null;
                  // state must be re-read after nulling runner to prevent
                  // leaked interrupts
                  int s = state;
                  if (s >= INTERRUPTING)
                      handlePossibleCancellationInterrupt(s);
              }
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32

      从以上代码执行中我们可以看出 result = c.call(); 被包裹在 try 代码块中,并且这里并没有和 runWorker 一样 经过异常抛出,而是在发生异常是调用了 setException(ex); 方法

    protected void setException(Throwable t) {
            if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = t;
                UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
                finishCompletion();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在 setException 中 把异常信息保存了下来 ,并通过cas 操作 FutureTask 的状态为异常状态
    当调用get方法时,查看是异常状态则抛出异常

       private V report(int s) throws ExecutionException {
            Object x = outcome;
            if (s == NORMAL)
                return (V)x;
            if (s >= CANCELLED)
                throw new CancellationException();
            throw new ExecutionException((Throwable)x);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    使用 submit 提交任务 总结

    使用 submit 提交任务 发生异常不会直接抛出,也不会移除当前执行异常的线程,而是将异常保存在 FutureTask中,当调用get时抛出异常。

    setUncaughtExceptionHandler 方法详解

    先对对象维护了 一个 private volatile UncaughtExceptionHandler exceptionHandler; 异常处理器

    thread 定义了 一个 uncaughtException方法 ,当线程执行出现异常的时候,相当于会回调 UncaughtExceptionHandler 接口,通过 getUncaughtExceptionHandler 方法查看当前线程是否设置了 UncaughtExceptionHandler。有就调用,由于线程在创建的时候都会属于一个 ThreadGroup,会尝试调用 ThreadGroup 的 UncaughtExceptionHandler,如果还是没有设置,那么会调用 getDefaultUncaughtExceptionHandler 获取全局默认的 UncaughtExceptionHandler。

    Interface for handlers invoked when a Thread abruptly terminates due to an uncaught exception.
    When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using Thread.getUncaughtExceptionHandler() and will invoke the handler’s uncaughtException method, passing the thread and the exception as arguments. If a thread has not had its UncaughtExceptionHandler explicitly set, then its ThreadGroup object acts as its UncaughtExceptionHandler. If the ThreadGroup object has no special requirements for dealing with the exception, it can forward the invocation to the default uncaught exception handler.

    当线程由于未捕获的异常而突然终止时调用的处理程序接口。
    当线程由于未捕获的异常而即将终止时,Java 虚拟机将使用 Thread.getUncaughtExceptionHandler() 查询线程的 UncaughtExceptionHandler 并将调用处理程序的 uncaughtException 方法,将线程和异常作为参数传递。 如果一个线程没有显式设置它的 UncaughtExceptionHandler,那么它的 ThreadGroup 对象充当它的 UncaughtExceptionHandler。 如果 ThreadGroup 对象对处理异常没有特殊要求,则可以将调用转发给默认的未捕获异常处理程序

  • 相关阅读:
    laravel composer require require-dev和APP_ENV的使用场景
    编译器-条件/循环代码生成
    IDEA提高工作效率的实用技巧
    功能测试【测试用例模板、Bug模板、手机App测试】
    【操作系统】数据校验码——奇偶校验和海明校验
    【c++提高1】数据结构之线段树详解
    Python之文件 打开与关闭
    「Python入门」Python多线程
    基于LSTM-Adaboost的电力负荷预测(Matlab代码实现)
    labview中循环停止事件的深入研究
  • 原文地址:https://blog.csdn.net/qq_44808472/article/details/126506908