我们先从测试代码到源码的角度进行分析,最后在进行总结
测试代码
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadFactory threadFactory = new ThreadFactory() {
AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("threadFactory-"+index);
index.incrementAndGet();
return thread;
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), threadFactory);
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName());
int a = 1/0;
});
TimeUnit.SECONDS.sleep(2);
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
}
执行结果
threadFactory-1
Exception in thread "threadFactory-1" java.lang.ArithmeticException: 除以零
at com.zclvct.leetcode.ThreadPollTest.lambda$main$0(ThreadPollTest.java:28)
at com.zclvct.leetcode.ThreadPollTest$$Lambda$1/00000000037A8980.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:823)
threadFactory-2
分析结果
在执行中结果中也可看出,execute执行方式抛出异常显示在控制台了 ,并且再次提交任务,发现是另一个线程去执行的任务
源码分析
在线程中提交任务是把任务包装成 worker 对象, 调用 runWorker 来执行,一下是 runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
我们只看这里的主要部分
1. beforeExecute(wt, task); 执行调用前的钩子方法
2. task.run(); 执行任务
3. afterExecute(task, thrown); 执行调用后的钩子方法
4. processWorkerExit(w, completedAbruptly);
其中 afterExecute(task, thrown); 在 finally 代码块中 ,并且 传递了任务本身和异常信息,可以在发生异常时提供业务补偿的方式
同时 从代码中可以看出 虽然 catch 到异常了 但是没有处理 而是直接抛出。
在最外层调用了processWorkerExit(w, completedAbruptly);
来看一下这个方法中做了什么
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
processWorkerExit 方法有两个参数,第一个参数是执行的 worker 对象, 第二个参数 代表 执行过程中是否发生了异常
执行步骤
execute 提交任务时,当执行发生异常,那么会直接抛出异常,并且移除异常线程也就是 worker ,并且尝试放入一个新的线程
测试代码
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadFactory threadFactory = new ThreadFactory() {
AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("threadFactory-"+index);
index.incrementAndGet();
return thread;
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 2, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), threadFactory);
Future<Object> future = (Future<Object>) threadPoolExecutor.submit(() -> {
System.out.println(Thread.currentThread().getName());
int a = 1 / 0;
});
TimeUnit.SECONDS.sleep(2);
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName());
});
future.get();
}
运行结果
threadFactory-1
threadFactory-1
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: 除以零
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.zclvct.leetcode.ThreadPollTest.main(ThreadPollTest.java:35)
Caused by: java.lang.ArithmeticException: 除以零
at com.zclvct.leetcode.ThreadPollTest.lambda$main$0(ThreadPollTest.java:28)
at com.zclvct.leetcode.ThreadPollTest$$Lambda$1/000000000432EA90.run(Unknown Source)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:823)
从运行结果上来看 使用submit提交的任务,发生异常 ,不会立刻抛出异常,而是当 调用future.get();时发生异常,同时执行异常的线程没有被抛弃
源码分析
submit 提交的任务 实际上是调用FutureTask类的run方法如下: 这里他能提交 Runable 还有 Callable也是使用了适配器模式
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
从以上代码执行中我们可以看出 result = c.call(); 被包裹在 try 代码块中,并且这里并没有和 runWorker 一样 经过异常抛出,而是在发生异常是调用了 setException(ex); 方法
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
在 setException 中 把异常信息保存了下来 ,并通过cas 操作 FutureTask 的状态为异常状态
当调用get方法时,查看是异常状态则抛出异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
使用 submit 提交任务 发生异常不会直接抛出,也不会移除当前执行异常的线程,而是将异常保存在 FutureTask中,当调用get时抛出异常。
先对对象维护了 一个 private volatile UncaughtExceptionHandler exceptionHandler; 异常处理器
thread 定义了 一个 uncaughtException方法 ,当线程执行出现异常的时候,相当于会回调 UncaughtExceptionHandler 接口,通过 getUncaughtExceptionHandler 方法查看当前线程是否设置了 UncaughtExceptionHandler。有就调用,由于线程在创建的时候都会属于一个 ThreadGroup,会尝试调用 ThreadGroup 的 UncaughtExceptionHandler,如果还是没有设置,那么会调用 getDefaultUncaughtExceptionHandler 获取全局默认的 UncaughtExceptionHandler。
Interface for handlers invoked when a Thread abruptly terminates due to an uncaught exception.
When a thread is about to terminate due to an uncaught exception the Java Virtual Machine will query the thread for its UncaughtExceptionHandler using Thread.getUncaughtExceptionHandler() and will invoke the handler’s uncaughtException method, passing the thread and the exception as arguments. If a thread has not had its UncaughtExceptionHandler explicitly set, then its ThreadGroup object acts as its UncaughtExceptionHandler. If the ThreadGroup object has no special requirements for dealing with the exception, it can forward the invocation to the default uncaught exception handler.
当线程由于未捕获的异常而突然终止时调用的处理程序接口。
当线程由于未捕获的异常而即将终止时,Java 虚拟机将使用 Thread.getUncaughtExceptionHandler() 查询线程的 UncaughtExceptionHandler 并将调用处理程序的 uncaughtException 方法,将线程和异常作为参数传递。 如果一个线程没有显式设置它的 UncaughtExceptionHandler,那么它的 ThreadGroup 对象充当它的 UncaughtExceptionHandler。 如果 ThreadGroup 对象对处理异常没有特殊要求,则可以将调用转发给默认的未捕获异常处理程序