ThreadPoolExecutor执行execute()方法示意图,如下图所示(图片均来自百度地图,懒得画图):
源码解析
private final BlockingQueue<Runnable> workQueue; // 工作线程队列
@ReachabilitySensitive
private final HashSet<Worker> workers = new HashSet<>(); // 工作线程集合
private final ReentrantLock mainLock = new ReentrantLock(); // 锁
private final Condition termination = mainLock.newCondition(); // 信号
private int largestPoolSize; // 线程池最大容量
private volatile long keepAliveTime; // 存活时间长度
private volatile boolean allowCoreThreadTimeOut; // 允许核心线程时间长度
private volatile int corePoolSize; // 核心线程池大小
private volatile int maximumPoolSize; // 最大线程池的大小
private volatile ThreadFactory threadFactory; // 线程工厂--创建线程
// 拒绝策略,当ThreadPoolExecutor已经关闭或已d经饱和时(达到了最大线程池大小且工作队列已满),execute()方法将调用的Handler
private volatile RejectedExecutionHandler handler; //默认拒绝策略 AbortPolicy()
补充: AbortPolicy: 拒绝任务时直接抛出异常,让你感知到任务被拒绝
DiscardPolicy: 当新任务被提交后直接丢弃掉,也不会有任何通知
DiscardOldestPolicy: 如果线程池没被关闭且没有执行能力,则会丢弃任务队列的头结点,这种策略与上面
的策略不同之处在于它丢弃的不是最新提交的,而是队列中存活时间最长的
CallerRunPolicy: 相对而言它就比较完善了,当有新任务提交后,如果线程池没被关闭且没有能力执行,则
把这个任务交于提交任务的线程执行,也就是谁提交任务,谁就负责执行任务。这样做主要有两点好处。
第一点新提交的任务不会被丢弃,这样也就不会造成业务损失。
第二点好处是,由于谁提交任务谁就要负责执行任务,这样提交任务的线程就得负责执行任务,而执行任务
又是比较耗时的,在这段期间,提交任务的线程被占用,也就不会再提交新的任务,减缓了任务提交的速度,
相当于是一个负反馈。在此期间,线程池中的线程也可以充分利用这段时间来执行掉一部分任务,腾出一定
的空间,相当于是给了线程池一定的缓冲期。
private final class Worker extends AbstractQueuedSynchronizer
//执行线程
public void execute(Runnable command){
if(command==null){
throw new NullPointerException();
}
int c = ctl.get() // ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 原子操作
if(workerCountOf(c)<corePoolSize){ // 1. 是否小于核心线程数
// 小于核心线程数目 则 通过 ThreadFactory 创建新线程
if(addWorker(command,true)) return;
c = ctl.get();
}else{
// 大于核心线程数 则 添加到 workQueue 中
if(isRunning(c) && workQueue.offer(command){
int recheck = ctl.get();
if(!isRunning(recheck) && remove(command)){
reject(command);
}else if(workerCountOf(recheck)==0){
addWorker(null,false);
}
}else if(!addWorker(command,false){ //添加到
reject(command);
}
}
}
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
addWorker(Runnable firstTask,boolean core){
retry: // 计数,并且根据是否 core 判断 是否 大于 corePoolSize 或者 maximumPoolSize
for(;;){
for(;;){
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try{
w = new Worker(firstTask);
Thread t = w.thread;
if(t!=null){
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 锁住
try{
workers.add(w);
workerAdded = true;
}finally{
mainLock.unLock(); //解锁
}
if(workerAdded){
t.start();
workerStarted = true;
}
}
}finally{
if(!workerStarted){
addWorkerFailed(w);
}
}
return workerStarted;
}