Java线程池ThreadPoolExecutor源码解析
1.ThreadPoolExecutor的构造实现
以jdk8为准,常说线程池有七大参数,通常而言,有四个参数是比较重要的
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
corePoolSize
:核心线程数,具体含义理解代码maximumPoolSize
:最大线程数keepAliveTime
:线程空闲的存活时间unit
:时间单位BlockingQueue
:阻塞队列,用来保存等待执行的任务
接下来去看完整参数的构造实现:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
ThreadFactory
:线程工厂,用来创造线程 -
RejectedExecutionHandler
:拒绝策略-
如果核心线程数等其他参数非法化就会抛出相应的异常
if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException();
-
之后进行初始化赋值
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;
- 注:acc是一个成员变量,用来管理线程池中线程的访问控制上下文,其实现类是
AccessControlContext
- 注:acc是一个成员变量,用来管理线程池中线程的访问控制上下文,其实现类是
-
2.线程池的执行execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
总共大致分为三步:要想理解线程池的执行,要先去理解控制字段其具体含义
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
先声明线程池的五种状态,再看其他字段方法具体执行了其他什么操作
- RUNNING: -1 << COUNT_BITS,即高3位为111
- SHUTDOWN: 0 << COUNT_BITS,即高3位为000
- STOP : 1 << COUNT_BITS,即高3位为001
- TIDYING : 2 << COUNT_BITS,即高3位为010
- TERMINATED: 3 << COUNT_BITS,即高3位为011
至于其每种空置状态的具体意义,根据英文释义结合代码具体理解,而非直接理解,通过位移位的操作将高3位与低29位分离开来,高三位表示此时整个线程池的运行状态,低29位表示线程池中线程的数量,再去看execute执行过程即可.
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
- 用于获取此时线程中的线程数,如果小于核心线程数,就添加任务,添加任务成功则返回,失败则重新获取控制字段,
addworker
后续了解,复杂的东西简单化,理解大致操作思想最为核心.
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
- 根据控制字段c去判断线程池的运行状态是否正在运行,如果添加任务成功则不会执行失败,或者说此时线程数有可能已经大于了核心线程数也有可能走到这,所以会将任务添加到阻塞队列中去,然后重新去获得控制字段,再去做校验,如果此时线程池不是正在运行的状态并且删除任务成功,这一步主要是为了防止阻塞队列添加任务成功这个过程,可能线程池不运行了,那么这时候就需要将添加的那个任务删除,并对他执行拒绝策略,又或者是此时线程池中的线程数已经为0,说明没有线程在工作了,因此添加一个空任务,至于第二个参数在
addWorker
中在做说明
else if (!addWorker(command, false))
reject(command);
- 字面意思就是添加任务失败,执行拒绝策略,则是为了应对线程池已经到了满负载的状态
3.线程池的任务添加addworker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
代码量很长,但是大致可分为两部分,且逻辑很清晰
-
这里使用了标签语法,前半段大致是是否需要添加线程做一系列准备
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
- 获取控制字段
c
:这个字段包含了运行状态信息和线程池数量信息,是一个复合字段,而rs
则是获取高三位的线程池状态信息 - 根据前面线程池状态信息,运行
RUNNING
值最小,因此判断线程池如果处于非运行的状态,则去判断是否处于关闭的状态,判断第一个任务是否为空,队列不为空,但是由于前面取反操作,其真正含义是:如果线程池的状态不是SHUTDOWN
,或者任务队列为空,或者有待执行的任务,那么就不会拒绝新任务的提交,否则就返回false,表示添加任务失败 - 接下来死循环表示需要去添加执行的任务,首先获取线程池中的线程数,关键的地方在这,如果此时的线程数大于等于容量或者(这里根据传递的参数
core
来选则比较的目标是核心线程数还是最大线程数),比较失败,则说明超过了接受的范畴,添加任务失败,如果没有失败,则通过底层CAS操作使得线程数加1,然后直接结束调用,跳出循环,,如果CAS失败,则说明ctl字段受到了变化,此期间有其他任务参与,重新获取此字段,去判断一下重新获取的ctl字段和之前的rs字段是否相等,这是为了保障多线程情况下出现的一种并发竞争问题导致的线程数发生错乱.
- 获取控制字段
-
至此,上半部分的核心已经解决,下半部分此时真正去实现任务的添加,通过线程池中的内部类Worker去实现
boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
-
两个布尔类型暂时不用管,大致猜测意思即可,将firstask任务交付给worker,由worker内部的thread线程去执行,因此需要去理解worker的实现
3.1Worker内部的工作者
3.11构造方法实现
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
接收一个
Runnable
参数做为任务进行初始化,这里用到了AQS的一些实现,然后通过线程工厂创造一个新的线程,赋予给内部的成员变量引用- 还有一些锁的一些操作,后续再看
-
如果工作者的内部线程已经被创造好,实现就绪,要先获得线程池的互斥锁,然后对接下来的操作进行互斥访问
-
重新获取最新的线程池的运行状态,只有当线程池处于运行状态或者处于关闭状态但没有待执行的任务时,才能将新工作线程添加到线程池中,也就是worker中去,因此一个worker内部具备一个thread,如果想要实现许多线程去完成线程池的相应操作,需要将worker封装成集合,因此线程池内部还有一成员变量:
private final HashSet
workers = new HashSet (); 这样就确保了每一个
worker
都是独一无二的,不会重复的,也就意味着每一个线程都不一样. -
而最后一个
largestPoolSize
则是保留历史的最大线程数的,用来记录,至此就已经添加成功了,只不过此时还没有执行 -
之后解锁,用之前标志位
workerAdded
表示添加成功,然后启动线程,也就是去执行这个任务,再用另一个标志workerStarted
表示启动成功. -
最后则是检查是否有什么异常在启动期间,如果没启动成功,则调用
addWorkerFiled
方法去处理3.12 addWorkerFailed方法
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); tryTerminate(); } finally { mainLock.unlock(); } }
- 紧接上文,也就是启动失败的话,会将执行任务的worker
remove
(底层通过HashMap实现键的删除),然后减少线程数,等待一会,这个过程是互斥的,因为牵扯到控制字段
- 紧接上文,也就是启动失败的话,会将执行任务的worker
-
至此,添加任务如果成功,则进行执行,如果成功开启执行,则成功返回
因此,根据线程池的执行添加流程,大致可以将此过程通过绘图的方式表现出来:
-
4.工作者的run方法是如何运行的
在worker
中还有一个方法
public void run() {
runWorker(this);
}
是其执行的具体操作
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
- 上锁之前的操作都很容易看懂,处了
getTask
,这个方法用来获取阻塞队列中的任务,后续再理解- 首先看第一段if,就是用来查看此时线程池的状态,如果不处于关闭或者运行的状态,或者线程处于中断的状态,则确保线程中断
- 接下来是一部分异常和错误的处理以及执行一些前置任务和一些后置任务
- 最后完成的任务数加一,解锁,将标志位是否中断,改为false,表示执行成功.
5.获取任务的getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
-
首先标志位用来判断是否超时,默认情况下不超时,跟之前的参数挂钩,后续再看,然后进入死循环,不断循环去执行后续操作
-
获取控制位c和rs运行状态,之后的if操作含义是如果线程池处于关闭的状态或者此时队列为空,就说明没有任务需要处理了,此时让线程池中的线程数减一,返回,另一种情况则是线程池的状态处于关闭状态之上,则说明线程池现在不执行任务了,不需要管队列中是否还有任务存在,则同上减一,返回。
-
然后重新获取线程池的线程数,接下来的time布尔这个字段有些作用,后面的
allowCoreThreadTimeOut
是一个控制字段,用来表示线程是否允许超时而返回的一个字段,如:线程池中的核心线程如果因为长时间没有得到任务的滋养,就如同线程之间会发生饥饿
一样,因此存在一个字段用来控制超时是否生效.因此如果线程数大于核心线程数或者开启超时控制字段,就说明会执行超时退出. -
接下来的if判断是用来执行递减线程数的一个操作,底层采取CAS就不多说了,
wc > maximumPoolSize
:用来表示如果大于了最大线程数,说明需要减少线程数,至于为什么会出现这种情况,等会理解.(timed && timedOut)
:说明开启超时退出,且上一次获取任务因超时返回,这个需看后面代码理解.上面的两个条件满足其中之一即可.wc > 1 || workQueue.isEmpty())
而这个操作则是为了减少不必要的线程开销,如果阻塞队列为空说明没有任务,那自然不需要多余的线程数去执行,因此会发生接下来的操作,递减线程数,然后跳到下一次循环. -
之后接下来就是从阻塞队列中获取任务的核心了,第一步是根据超时控制字段来决定行为方式,允许超时退出的话,通过
poll
方式,不允许则通过take
方式,两种方式大致是一个等待一定时间,如果为空是前提.另一个是无限等待,会阻塞线程.其具体实现通过阻塞队列的真正实现类别去实现.如果获取到了任务,就返回,如果没有则timeout设置为true,表示没有接受到任务,因此前文的timeout就理解了.通常而言线程池中的线程数是不允许超过最大线程数的,但通常而言这是一种机制的完整性和规范,假如是自定义线程池的情况下,就有可能出现这种情况,另外一种是本人推测虽然由于增加工作线程数的操作底层是通过CAS去实现的,底层是原子性的,同时进行CAS操作就有可能导致ABA问题出现,或者操作失败,或者不断自旋的可能,
6.任务的提交submit
众所周知,任务需要进行提交给线程池,再有线程池去执行,而Runnable接口实现的run方法是没有返回值的,而在线程中Callable通常具备返回值,且配备Future去接受结果.因此submit具备不同的操作
这里以AbstractExecutorService(线程池的父类)接口为例:
public Future> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public Future submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
RunnableFuture
接口的实现类FutureTask
,总而言之就是转换为一个Runnable,然后进行提交,最后返回一个future,至于FutureTask
具体内容自行详解.
7.线程池的关闭
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
实现逻辑也很清楚,检查是否可以关闭线程,然后设置线程的状态,interruptIdleWorkers()
这个方法算是关键的,他会去中断worker;onShutdown
是一个空方法,留给子类去实现的.
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
他会去遍历集合workers,获取每一个worker的工作线程,然后尝试去中断,最后结束.