一、Worker的真实身份
Worker是线程池的内部类。包装了一个线程来不断的执行任务。线程池中的每个线程都被包装成了Worker类,它具有以下特点:
1、它实现了Runnable,内部线程start后,执行的就是Worker本身的run方法。
2、它继承了AQS,实现了一个不可重入的独占锁,执行任务期间加锁。线程池可以通过tryLock来判断Worker是不是正在执行任务。
二、Worker的创建与销毁
1、Worker的创建
线程池在接收提交任务阶段,会根据当前线程数量,决定是否创建线程。如下代码:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
通过上面的源代码,我们会发现有三个地方调用addWorker()方法,而此方法的内部则会创建Worker对象,并完成提交任务的执行。下面,来看看Worker的内部结构:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/** 线程池里的真正的线程,负责执行提交的任务。*/
final Thread thread;
/**Worker线程执行的第一个任务,如果创建的是核心线程的Worker则提交的任务会赋值给此属性。此值可为空*/
Runnable firstTask;
/** 当前线程已完成的任务数,每次执行玩一个提交的任务,此值+1 */
volatile long completedTasks;
}
接下来,我们看下构造函数:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
构造函数中,会把当前状态设置为-1,此状态来源于AbstractQueuedSynchronizer。表示锁是否被抢占,此处设置为-1,是为了让抢锁操作失败,不会导致刚刚创建就被中断,直到执行了runWorker。
firstTask 为当前提交的任务
thread 通过线程池构建时提供的线程工厂创建。此处需要注意,在newThread()时,把this当作传入参数,此时的this代表的是当前new出来的Worker对象。类似于new Thread(new Runnabel()),所以可以推断,当thread调用start方法时,会调用Worker的run()方法。
了解完worker的内部结构,我们继续回看addWorker()方法,我们会从里面找到new Worker(),并且获取到Worker里的Thread,并启动线程。如下图:
Worker w = null;
try {
// new 一个Worker对象,并将当前任务当作参数传给worker
w = new Worker(firstTask);
// 获取到worker里封装的Thread对象,即通过getThreadFactory().newThread(this)创建的Thread
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动当前worker里的线程,因为Thread里的Runnable是Worker对象,所以此处会执行Worker的run()方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
2、Worker的销毁
Worker对象的生命周期随着内部属性thread线程的结束而结束。当线程结束了,会从ThreadPoolExecutor的Worker集合属性workers中删除,从而回收Worker对象。
每个Worker都会从ThreadPoolExecutor的队列中获取任务(详见getTask()方法),当从队列中无法获取到任务时,根据是否是核心线程和allowCoreThreadTimeOut可以确定当前线程是否需要退出。如果线程需要退出,会执行processWorkerExit方法,如下:
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
// 省略执行获取到的任务代码
.
.
.
}
completedAbruptly = false;
} finally {
// 当无法获取到任务,while循环退出,执行此方法
processWorkerExit(w, completedAbruptly);
}
当Worker的processWorkerExit被调用,则表明一个线程退出了,第一个参数为当前线程的包装对象Worker,第二个参数代表是否突然、非正常、发生异常退出,如何理解都行。反正不是正常流程退出,就是发生了各种可能的意外。
processWorkerExit详解如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是突然退出的,需要修改线程池ctl的值,保证ctl的值与真实存在的线程数相等
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// 从集合中删除封装当前线程的worker对象
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
// 如果当前的ctl值小于STOP,即表示线程池当前的状态为RUNNING或者SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 如果突然、非正常退出,此处不执行。否则需要保证线程池至少有一个核心线程
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 当前线程异常突然退出了,需要重新启动一个线程。会重新创建一个Worker对象哦!
addWorker(null, false);
}
}
三、Worker获取并执行任务
1、线程的重复利用
线程池的一个很重要的优点就是通过重复利用已创建的线程,降低线程创建和销毁造成的资源消耗。那么,此处容易产生疑问,一个线程是如何重复利用的呢?答案就在Worker的获取并执行任务方法runWorker()里。接下来,我们一起探秘。
首先,我们要确定一个知识点,通过上面的学习,我们知道线程池里的线程启动后,会执行Worker里的run()方法。
在Worker的run()方法里只有一行代码,那就是调用runWorker(this),把Worker本身当作参数传入。下面,我们来看runWorker方法是如何保障线程重复利用的,如下图:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 通过一个while循环,不断的调用getTask方法从线程池队列里获取提交的Runnabel任务
while (task != null || (task = getTask()) != null) {
// 省略代码,直接看重点
.
.
.
// 如果获取到了任务,直接调用run()方法。此处需要注意,调用run方法并不是启动线程,只是简单的接口调用!
// task.run()方法里就是我们提交的任务的具体实现,以执行我们的业务逻辑。
// 每执行完一个任务,就通过while循环继续从队列中获取下一个,依次执行run方法。
task.run();
// 省略后面的代码。。
.
.
.
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
通过上面的讲解,我们可以总结一句话:一个线程启动后,循环从线程池队列中获取提交的任务,并调用其run()方法。
此方法保证了一个线程可以执行多个任务,从而达到线程的重复利用。
2、获取提交任务
前面,我们讲完了线程池里的线程是如何执行一个提交的任务的。那么,线程是如何获取我们提交的任务呢?答案藏在Worker类的getTask()方法里。接下来,我们一起探秘。
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 此处判断的关键在于rs >= STOP || workQueue.isEmpty()
// 1、如果线程池为STOP状态,那么是不允许获取任务了,符合线程池STOP状态立马停止任务的执行
// 2、如果线程池空了,那么也不允许获取任务了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 修改线程池ctl的值,线程数量减1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断allowCoreThreadTimeOut,是否所有线程允许超时退出,或者当前线程数大于核心线程数,也代表线程可以退出
// allowCoreThreadTimeOut默认为false;而wc > corePoolSize则表示线程里当前线程数是否已经超过了核心线程数
// 如果已经超出,则代表创建了最大线程。而最大线程是可以被回收的。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 获取任务的关键点:通过timed属性的值,以不同的方式从队列中获取任务。
// 1、当timed为true,代表线程允许结束,那么等待指定时间后,还没有新任务提交则返回null,线程就退出了
// 2、当timed为false,代表线程不允许结束,那么执行队列的take()方法,一直阻塞。直到有新任务提交
// 通过一直阻塞,可以保证当前线程永远不退出。
// 此处有一个经典面试题:线程池是如何保证核心线程不退出的。答案就在此处。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}