• 线程池的异常处理机制


    起因

    一次开发过程中,送审之后向三方OA系统推送代办,其中由于优化的原因使用到线程池

    ExecutorService todoMessageAsyncThread = ThreadPoolManager.getThreadPool("todoMessageAsyncThreadPool");
    		todoMessageAsyncThread.submit(() -> {
    			log.info("processKey:{}, processInstanceId:{}, 开启异步线程推送待办报文。", processKey, processInstanceId);
    			//....实现具体业务
    			log.info("processKey:{}, processInstanceId:{}, 结束异步线程推送待办报文。", processKey, processInstanceId);
    		});
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    中间有一个判断,该不该推送,然后使用el表达式进行判断,但是测试环境中el表达式配置的不标准,导致现象就是没有推送,也没有日志,子线程就像停住了,没啥动静了。

    原因分析

    Executors线程池有两种提交线程的方式execute和submit方式,简单测试如下:

        @Test
        public void submitTest()throws InterruptedException {
            Runnable runnable = () -> {
               int i = 1/0;
            };
            ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
            System.out.println("execute开始执行");
            threadPool1.execute(runnable);
    
            Thread.sleep(1000);
            System.out.println("--------------------------");
    
            System.out.println("submit开始执行");
            Future<?> submit = threadPool1.submit(runnable);
            System.out.println("submit返回结果:"+submit);
    /*
    execute开始执行
    Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    	at test.java.util.concurrent.ExecutorsTest.lambda$submitTest$1(ExecutorsTest.java:40)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    --------------------------
    submit开始执行
    submit返回结果:java.util.concurrent.FutureTask@22eeefeb[Completed exceptionally: java.lang.ArithmeticException: / by zero]
    
     */
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    从测试的结果中可以看出来,execute方法中对异常信息进行的打印,而submit方法中没有对异常信息进行打印,而是将异常信息存储在了返回的future中,只有通过future.get()才能阻塞式的获取异常。

    先看看execute的源码中的实现:

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             *
             * 1 如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个任务来启动一个新线程。对addWorker的调用以原子方式检查runState和workerCount,从而通过返回false来防止错误警报,这些错误警报会在不应该添加线程的情况下添加线程。
             * 2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为自上次检查以来已有的线程已经失效),或者池是否在进入该方法后关闭。因此,我们重新检查状态,如果有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。
             * 3.  如果我们无法对任务进行排队,那么我们将尝试添加一个新线程。如果它失败了,我们知道我们已经关闭或饱和了,所以拒绝执行任务
             */
            int c = ctl.get();  //这里使用32位的int型数据,前3位代表状态,后29位代表线程数,在多线程环境下避免状态恶化线程数不一致
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true)) //当前线程数少于核心线程数,直接添加到worker中
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) { //如果不能插入核心线程中,就放入到queue中
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command)) //重新检查状态
                    reject(command);
                else if (workerCountOf(recheck) == 0) //queue满了
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))  //queue满了,放入最大线程中
                reject(command);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    其中最重要的启动子线程的方法是addWorker方法,将线程封装成Runable,传入execute方法中。
    Worker也是一个线程,运行的时候调用worker的run方法:

        public void run() {
            runWorker(this);
        }
    
        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        try {
                            task.run();     //自定义任务的run方法
                            afterExecute(task, null);
                        } catch (Throwable ex) {
                            afterExecute(task, ex);
                            throw ex;   //执行后有异常抛异常
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    再来看submit方法的实现源码:

        public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    submit方法在中间调用了execute方法,但是是将子线程封装成了FutureTask,然后调用的execute方法。
    这样在执行这个子线程的时候会执行FutureTask的run方法,而在run方法中,callable.call()方法直接被catch,然后将异常信息使用setException方法获取,并将异常设置到outcome里,不会抛异常出去。源码如下:

        public void run() {
            if (state != NEW ||
                !RUNNER.compareAndSet(this, null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call(); //callable的接口
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex); //这里直接吃掉了
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    解决方案

    1. 如果没有返回值,建议直接使用execute,不要使用submit
    2. 自己在业务代码中try-catch-finally
    3. 重写Runnable的afterExecute
    
            //1.创建一个自己定义的线程池
            ExecutorService executorService = new ThreadPoolExecutor(
                    2,
                    3,
                    0,
                    TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue(10)
            ) {
                //重写afterExecute方法
                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    //这个是excute提交的时候
                    if (t != null) {
                        System.out.println("afterExecute里面获取到excute提交的异常信息,处理异常" + t.getMessage());
                    }
                    //如果r的实际类型是FutureTask 那么是submit提交的,所以可以在里面get到异常
                    if (r instanceof FutureTask) {
                        try {
                            Future<?> future = (Future<?>) r;
                            //get获取异常
                            future.get();
    
                        } catch (Exception e) {
                            System.out.println("afterExecute里面获取到submit提交的异常信息,处理异常" + e);
                        }
                    }
                }
            };
            //当线程池抛出异常后 execute
            executorService.execute(new task());
            
            //当线程池抛出异常后 submit
            executorService.submit(new task());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
  • 相关阅读:
    2022-27-27——SpringBoot2.0集成WebSocket,实现后台向前端推送信息
    uniapp封装mixins实现H5和微信小程序的微信支付
    嵌入式系统,ARM微处理器特点,ARM体系结构,特征、状态、操作模式等,中断分类,JTAG调试接口
    8、Linux:一起玩转压缩/解压命令2
    K210入门必看(MAIX DOCK)(一)
    Zookeeper
    Flask+Echarts搭建全国疫情可视化大屏
    React基础教程:TodoList案例
    开源游戏服务器框架NoahGameFrame(NF)客户端的Log日志系统(五)
    vue3 中 ref、toRef、toRefs 和 reactive 的区别
  • 原文地址:https://blog.csdn.net/weixin_46399870/article/details/134517679