一次开发过程中,送审之后向三方OA系统推送代办,其中由于优化的原因使用到线程池
ExecutorService todoMessageAsyncThread = ThreadPoolManager.getThreadPool("todoMessageAsyncThreadPool");
todoMessageAsyncThread.submit(() -> {
log.info("processKey:{}, processInstanceId:{}, 开启异步线程推送待办报文。", processKey, processInstanceId);
//....实现具体业务
log.info("processKey:{}, processInstanceId:{}, 结束异步线程推送待办报文。", processKey, processInstanceId);
});
中间有一个判断,该不该推送,然后使用el表达式进行判断,但是测试环境中el表达式配置的不标准,导致现象就是没有推送,也没有日志,子线程就像停住了,没啥动静了。
Executors线程池有两种提交线程的方式execute和submit方式,简单测试如下:
@Test
public void submitTest()throws InterruptedException {
Runnable runnable = () -> {
int i = 1/0;
};
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
System.out.println("execute开始执行");
threadPool1.execute(runnable);
Thread.sleep(1000);
System.out.println("--------------------------");
System.out.println("submit开始执行");
Future<?> submit = threadPool1.submit(runnable);
System.out.println("submit返回结果:"+submit);
/*
execute开始执行
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at test.java.util.concurrent.ExecutorsTest.lambda$submitTest$1(ExecutorsTest.java:40)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
--------------------------
submit开始执行
submit返回结果:java.util.concurrent.FutureTask@22eeefeb[Completed exceptionally: java.lang.ArithmeticException: / by zero]
*/
}
从测试的结果中可以看出来,execute方法中对异常信息进行的打印,而submit方法中没有对异常信息进行打印,而是将异常信息存储在了返回的future中,只有通过future.get()才能阻塞式的获取异常。
先看看execute的源码中的实现:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1 如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个任务来启动一个新线程。对addWorker的调用以原子方式检查runState和workerCount,从而通过返回false来防止错误警报,这些错误警报会在不应该添加线程的情况下添加线程。
* 2. 如果一个任务可以成功排队,那么我们仍然需要仔细检查我们是否应该添加一个线程(因为自上次检查以来已有的线程已经失效),或者池是否在进入该方法后关闭。因此,我们重新检查状态,如果有必要,如果停止,则回滚排队,如果没有,则启动一个新线程。
* 3. 如果我们无法对任务进行排队,那么我们将尝试添加一个新线程。如果它失败了,我们知道我们已经关闭或饱和了,所以拒绝执行任务
*/
int c = ctl.get(); //这里使用32位的int型数据,前3位代表状态,后29位代表线程数,在多线程环境下避免状态恶化线程数不一致
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //当前线程数少于核心线程数,直接添加到worker中
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { //如果不能插入核心线程中,就放入到queue中
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) //重新检查状态
reject(command);
else if (workerCountOf(recheck) == 0) //queue满了
addWorker(null, false);
}
else if (!addWorker(command, false)) //queue满了,放入最大线程中
reject(command);
}
其中最重要的启动子线程的方法是addWorker方法,将线程封装成Runable,传入execute方法中。
Worker也是一个线程,运行的时候调用worker的run方法:
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run(); //自定义任务的run方法
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex; //执行后有异常抛异常
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
再来看submit方法的实现源码:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
submit方法在中间调用了execute方法,但是是将子线程封装成了FutureTask,然后调用的execute方法。
这样在执行这个子线程的时候会执行FutureTask的run方法,而在run方法中,callable.call()方法直接被catch,然后将异常信息使用setException方法获取,并将异常设置到outcome里,不会抛异常出去。源码如下:
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //callable的接口
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); //这里直接吃掉了
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
//1.创建一个自己定义的线程池
ExecutorService executorService = new ThreadPoolExecutor(
2,
3,
0,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(10)
) {
//重写afterExecute方法
@Override
protected void afterExecute(Runnable r, Throwable t) {
//这个是excute提交的时候
if (t != null) {
System.out.println("afterExecute里面获取到excute提交的异常信息,处理异常" + t.getMessage());
}
//如果r的实际类型是FutureTask 那么是submit提交的,所以可以在里面get到异常
if (r instanceof FutureTask) {
try {
Future<?> future = (Future<?>) r;
//get获取异常
future.get();
} catch (Exception e) {
System.out.println("afterExecute里面获取到submit提交的异常信息,处理异常" + e);
}
}
}
};
//当线程池抛出异常后 execute
executorService.execute(new task());
//当线程池抛出异常后 submit
executorService.submit(new task());
}