在日常开发中经常会遇到需要使用其它线程将大量任务异步处理的场景(异步化以及提升系统的吞吐量),而在使用线程的过程中却存在着两个痛点。
而线程池正是为解决上述痛点而生的,其通过两个手段来解决上述痛点。
池化线程资源,顾名思义就是维护一个存活线程的集合(池子)。提交任务的用户程序不直接控制线程的创建和销毁,不用每次执行任务时都申请创建一个新线程,而是通过线程池间接的获得线程去处理异步任务。
线程池中的线程在执行完任务后通常也不会被系统回收掉,而是继续待在池子中用于执行其它的任务(执行堆积的待执行任务或是等待新任务)。
线程池通过池化线程资源,避免了系统反复创建/销毁线程的开销,大幅提高了处理大规模异步任务时的性能。
如果程序都统一使用线程池来处理异步任务,则线程池内部便可以对系统资源的使用施加一定限制。
例如用户可以指定一个线程池最大可维护的线程数量,以避免耗尽系统资源。
当用户提交任务的速率过大,导致线程池中的线程数到达指定的最大值时依然无法满足需求时,线程池可以通过丢弃部分任务或限制提交任务的流量的方式来处理这一问题。
线程池通过对线程资源的使用进行统一收口,用户可以通过设置线程池的参数来控制系统资源的使用,从而避免系统资源耗尽。
前面介绍了线程池的概念,而要深入理解线程池的工作原理最好的办法便是找到一个优秀的线程池实现来加以研究。
而自jdk1.5中引入的通用线程池框架ThreadPoolExecutor便是一个很好的学习对象。其内部实现不算复杂,却在高效实现核心功能的同时还提供了较丰富的拓展能力。
下面从整体上介绍一下jdk通用线程池ThreadPoolExecutor的工作原理(基于jdk8)。
首先ThreadPoolExecutor允许用户从两个不同维度来控制线程资源的使用,即最大核心线程数(corePoolSize)和最大线程数(maximumPoolSize)。
最大核心线程数:核心线程指的是通常常驻线程池的线程。常驻线程在线程池没有任务空闲时也不会被销毁,而是处于idle状态,这样在新任务到来时就能很快的进行响应。
最大线程数:和第一节中提到的一样,即线程池中所能允许的活跃线程的最大数量。
在向ThreadPoolExecutor提交任务时(execute方法),会执行一系列的判断来决定任务应该如何被执行(源码在下一节中具体分析)。
线程池的优雅停止一般要能做到以下几点:
线程池自启动后便会有大量的工作线程在内部持续不断并发的执行提交的各种任务,而要想做到优雅停止并不是一件容易的事情。
因此ThreadPoolExecutor中最复杂、细节最多的部分并不在于上文中的正常工作流程,而在于分散在各个地方但又紧密协作的,控制优雅停止的逻辑。
除了正常的工作流程以及优雅停止的功能外,ThreadPoolExecutor还提供了一些比较好用的功能
如费曼所说:What I can not create I do not understand(我不能理解我创造不了的东西)。
通过模仿jdk的ThreadPoolExecutor实现,从零开始实现一个线程池,可以迫使自己去仔细的捋清楚jdk线程池中设计的各种细节,加深理解而达到更好的学习效果。
前面提到ThreadPoolExecutor的核心逻辑主要分为两部分,一是正常运行时处理提交的任务的逻辑,二是实现优雅停止的逻辑。
因此我们实现的线程池MyThreadPoolExecutor(以My开头用于区分)也会分为两个版本,v1版本只实现前一部分即正常运行时执行任务的逻辑,将有关线程池优雅停止的逻辑全部去除。
相比直接啃jdk最终实现的源码,v1版本的实现会更简单更易理解,让正常执行任务时的逻辑更加清晰而不会耦合太多关于优雅停止的逻辑。
ThreadPoolExecutor中有许多的成员变量,大致可以分为三类。
可由用户自定义的、用于控制线程池运行的配置参数
其中前6个配置参数都可以在ThreadPoolExecutor的构造函数中指定,而allowCoreThreadTimeOut则可以通过暴露的public方法allowCoreThreadTimeOut来动态的设置。
其中大部分属性都是volatile修饰的,目的是让运行过程中可以用过提供的public方法动态修改这些值后,线程池中的工作线程或提交任务的用户线程能及时的感知到变化(线程间的可见性),并进行响应(比如令核心线程自动的idle退出)
这些配置属性具体如何控制线程池行为的原理都会在下面的源码解析中展开介绍。理解这些参数的工作原理后才能在实际的业务中使用线程池时为其设置合适的值。
仅供线程池内部工作时使用的属性
这里重点介绍一下ctl属性。ctl虽然是一个32位的整型字段(AtomicInteger),但实际上却用于标识两个业务属性,即当前线程池的运行状态和worker线程的总数量。
在线程池初始化时状态位RUNNING,worker线程数量位0(private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));)。
ctl的32位中的高3位用于标识线程池当前的状态,剩余的29位用于标识线程池中worker线程的数量(因此理论上ThreadPoolExecutor最大可容纳的线程数并不是231-1(32位中符号要占一位),而是229-1)
由于聚合之后单独的读写某一个属性不是很方便,所以ThreadPoolExecutor中提供了很多基于位运算的辅助函数来简化这些逻辑。
ctl这样聚合的设计比起拆分成两个独立的字段有什么好处?
在ThreadPoolExecutor中关于优雅停止的逻辑中有很多地方是需要同时判断当前工作线程数量与线程池状态后,再对线程池状态工作线程数量进行更新的(具体逻辑在下一篇v2版本的博客中展开)。
且为了执行效率,不使用互斥锁而是通过cas重试的方法来解决并发更新的问题。而对一个AtomicInteger属性做cas重试的更新,要比同时控制两个属性进行cas的更新要简单很多,执行效率也高很多。
ThreadPoolExecutor共有五种状态,但有四种都和优雅停止有关(除了RUNNING)。
但由于v1版本的MyThreadPoolExecutorV1不支持优雅停止,所以不在本篇博客中讲解这些状态具体的含义以及其是如何变化的(下一篇v2版本的博客中展开)
记录线程池运行过程中的一些关键指标
- public class MyThreadPoolExecutorV1 implements MyThreadPoolExecutor{
-
- /**
- * 指定的最大核心线程数量
- * */
- private volatile int corePoolSize;
-
- /**
- * 指定的最大线程数量
- * */
- private volatile int maximumPoolSize;
-
- /**
- * 线程保活时间(单位:纳秒 nanos)
- * */
- private volatile long keepAliveTime;
-
- /**
- * 存放任务的工作队列(阻塞队列)
- * */
- private final BlockingQueue<Runnable> workQueue;
-
- /**
- * 线程工厂
- * */
- private volatile ThreadFactory threadFactory;
-
- /**
- * 拒绝策略
- * */
- private volatile MyRejectedExecutionHandler handler;
-
- /**
- * 是否允许核心线程在idle一定时间后被销毁(和非核心线程一样)
- * */
- private volatile boolean allowCoreThreadTimeOut;
-
- /**
- * 主控锁
- * */
- private final ReentrantLock mainLock = new ReentrantLock();
-
- /**
- * 当前线程池已完成的任务数量
- * */
- private long completedTaskCount;
-
- /**
- * 维护当前存活的worker线程集合
- * */
- private final HashSet<MyWorker> workers = new HashSet<>();
-
- /**
- * 当前线程池中存在的worker线程数量 + 状态的一个聚合(通过一个原子int进行cas,来避免对两个业务属性字段加锁来保证一致性)
- * v1版本只关心前者,即worker线程数量
- */
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- private static final int COUNT_BITS = Integer.SIZE - 3;
-
- /**
- * 32位的有符号整数,有3位是用来存放线程池状态的,所以用来维护当前工作线程个数的部分就只能用29位了
- * 被占去的3位中,有1位原来的符号位,2位是原来的数值位。
- * */
- private static final int CAPACITY = (1 << COUNT_BITS) - 1;
-
- /**
- * 线程池状态poolStatus常量(状态值只会由小到大,单调递增)
- * 线程池状态迁移图:
- * ↗ SHUTDOWN ↘
- * RUNNING ↓ TIDYING → TERMINATED
- * ↘ STOP ↗
- * 1 RUNNING状态,代表着线程池处于正常运行的状态。能正常的接收并处理提交的任务
- * 线程池对象初始化时,状态为RUNNING
- * 对应逻辑:private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- *
- * 2 SHUTDOWN状态,代表线程池处于停止对外服务的状态。不再接收新提交的任务,但依然会将workQueue工作队列中积压的任务处理完
- * 调用了shutdown方法时,状态由RUNNING -> SHUTDOWN
- * 对应逻辑:shutdown方法中的advanceRunState(SHUTDOWN);
- *
- * 3 STOP状态,代表线程池处于停止状态。不再接受新提交的任务,同时也不再处理workQueue工作队列中积压的任务,当前还在处理任务的工作线程将收到interrupt中断通知
- * 之前未调用shutdown方法,直接调用了shutdownNow方法,状态由RUNNING -> STOP
- * 之前先调用了shutdown方法,后调用了shutdownNow方法,状态由SHUTDOWN -> STOP
- * 对应逻辑:shutdownNow方法中的advanceRunState(STOP);
- *
- * 4 TIDYING状态,代表着线程池即将完全终止,正在做最后的收尾工作
- * 当前线程池状态为SHUTDOWN,任务被消费完工作队列workQueue为空,且工作线程全部退出完成工作线程集合workers为空时,tryTerminate方法中将状态由SHUTDOWN->TIDYING
- * 当前线程池状态为STOP,工作线程全部退出完成工作线程集合workers为空时,tryTerminate方法中将状态由STOP->TIDYING
- * 对应逻辑:tryTerminate方法中的ctl.compareAndSet(c, ctlOf(TIDYING, 0)
- *
- * 5 TERMINATED状态,代表着线程池完全的关闭。之前线程池已经处于TIDYING状态,且调用的钩子函数terminated已返回
- * 当前线程池状态为TIDYING,调用的钩子函数terminated已返回
- * 对应逻辑:tryTerminate方法中的ctl.set(ctlOf(TERMINATED, 0));
- * */
- private static final int RUNNING = -1 << COUNT_BITS;
- private static final int SHUTDOWN = 0 << COUNT_BITS;
- private static final int STOP = 1 << COUNT_BITS;
- private static final int TIDYING = 2 << COUNT_BITS;
- private static final int TERMINATED = 3 << COUNT_BITS;
-
- // Packing and unpacking ctl
- private static int workerCountOf(int c) { return c & CAPACITY; }
- private static int ctlOf(int rs, int wc) { return rs | wc; }
-
- /**
- * 跟踪线程池曾经有过的最大线程数量(只能在mainLock的并发保护下更新)
- */
- private int largestPoolSize;
-
- private boolean compareAndIncrementWorkerCount(int expect) {
- return this.ctl.compareAndSet(expect, expect + 1);
- }
- private boolean compareAndDecrementWorkerCount(int expect) {
- return ctl.compareAndSet(expect, expect - 1);
- }
-
- private void decrementWorkerCount() {
- do {
- // cas更新,workerCount自减1
- } while (!compareAndDecrementWorkerCount(ctl.get()));
- }
-
- public MyThreadPoolExecutorV1(int corePoolSize,
- int maximumPoolSize,
- long keepAliveTime,
- TimeUnit unit,
- BlockingQueue<Runnable> workQueue,
- ThreadFactory threadFactory,
- MyRejectedExecutionHandler handler) {
- // 基本的参数校验
- if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) {
- throw new IllegalArgumentException();
- }
-
- if (unit == null || workQueue == null || threadFactory == null || handler == null) {
- throw new NullPointerException();
- }
-
- // 设置成员变量
- this.corePoolSize = corePoolSize;
- this.maximumPoolSize = maximumPoolSize;
- this.workQueue = workQueue;
- this.keepAliveTime = unit.toNanos(keepAliveTime);
- this.threadFactory = threadFactory;
- this.handler = handler;
- }
-
- public ThreadFactory getThreadFactory() {
- return threadFactory;
- }
- }
Worker工作线程
ThreadPoolExecutor中的工作线程并不是裸的Thread,而是被封装在了一个Worker的内部类中。
Worker实现了Runnable所以可以作为一个普通的线程来启动,在run方法中只是简单的调用了一下runWorker(runWorker后面再展开)。
Worker类有三个成员属性:
Worker内封装的实际的工作线程对象thread,其在构造函数中由线程池的线程工厂threadFactory生成,传入this,所以thread在start后,便会调用run方法进而执行runWorker。
线程工厂可以由用户在创建线程池时通过参数指定,因此用户在自由控制所生成的工作线程的同时,也需要保证newThread能正确的返回一个可用的线程对象。
除此之外,Worker对象还继承了AbstractQueuedSynchronizer(AQS)类,简单的实现了一个不可重入的互斥锁。
AQS中维护了一个volatile修饰的int类型的成员变量state,其具体的含义可以由使用者自己定义。
在Worker中,state的值有三种状态:
具体这三种情况分别在什么时候出现会在下面解析提交任务源码的那部分里详细介绍。
- /**
- * jdk的实现中令Worker继承AbstractQueuedSynchronizer并实现了一个不可重入的锁
- * AQS中的state属性含义
- * -1:标识工作线程还未启动
- * 0:标识工作线程已经启动,但没有开始处理任务(可能是在等待任务,idle状态)
- * 1:标识worker线程正在执行任务(runWorker中,成功获得任务后,通过lock方法将state设置为1)
- * */
- private final class MyWorker extends AbstractQueuedSynchronizer implements Runnable{
-
- final Thread thread;
- Runnable firstTask;
- volatile long completedTasks;
-
- public MyWorker(Runnable firstTask) {
- // Worker初始化时,state设置为-1,用于interruptIfStarted方法中作为过滤条件,避免还未开始启动的Worker响应中断
- // 在runWorker方法中会通过一次unlock将state修改为0
- setState(-1);
-
- this.firstTask = firstTask;
-
- // newThread可能是null
- this.thread = getThreadFactory().newThread(this);
- }
-
- @Override
- public void run() {
- runWorker(this);
- }
-
- protected boolean isHeldExclusively() {
- return getState() != 0;
- }
-
- protected boolean tryAcquire(int unused) {
- if (compareAndSetState(0, 1)) {
- setExclusiveOwnerThread(Thread.currentThread());
- return true;
- }
- return false;
- }
-
- protected boolean tryRelease(int unused) {
- setExclusiveOwnerThread(null);
- setState(0);
- return true;
- }
-
- public void lock(){
- acquire(1);
- }
-
- public boolean tryLock(){
- return tryAcquire(1);
- }
-
- public void unlock(){
- release(1);
- }
-
- public boolean isLocked(){
- return isHeldExclusively();
- }
-
- void interruptIfStarted() {
- Thread t;
- // 三个条件同时满足,才去中断Worker对应的thread
- // getState() >= 0,用于过滤还未执行runWorker的,刚入队初始化的Worker
- // thread != null,用于过滤掉构造方法中ThreadFactory.newThread返回null的Worker
- // !t.isInterrupted(),用于过滤掉那些已经被其它方式中断的Worker线程(比如用户自己去触发中断,提前终止线程池中的任务)
- if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
- try {
- t.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- }
- }
execute执行提交的任务
下面介绍本篇博客的重点,即线程池是如何执行用户所提交的任务的。
用户提交任务的入口是public的execute方法,Runnable类型的参数command就是提交的要执行的任务。
MyThreadPoolExecutorV1的execute方法(相比jdk的实现v1版本去掉了关于优雅停止的逻辑)
- /**
- * 提交任务,并执行
- * */
- public void execute(Runnable command) {
- if (command == null){
- throw new NullPointerException("command参数不能为空");
- }
-
- int currentCtl = this.ctl.get();
- if (workerCountOf(currentCtl) < this.corePoolSize) {
- // 如果当前存在的worker线程数量低于指定的核心线程数量,则创建新的核心线程
- boolean addCoreWorkerSuccess = addWorker(command,true);
- if(addCoreWorkerSuccess){
- // addWorker添加成功,直接返回即可
- return;
- }
- }
-
- // 走到这里有两种情况
- // 1 因为核心线程超过限制(workerCountOf(currentCtl) < corePoolSize == false),需要尝试尝试将任务放入阻塞队列
- // 2 addWorker返回false,创建核心工作线程失败
- if(this.workQueue.offer(command)){
- // workQueue.offer入队成功
-
- if(workerCountOf(currentCtl) == 0){
- // 在corePoolSize为0的情况下,当前不存在存活的核心线程
- // 一个任务在入队之后,如果当前线程池中一个线程都没有,则需要兜底的创建一个非核心线程来处理入队的任务
- // 因此firstTask为null,目的是先让任务先入队后创建线程去拉取任务并执行
- addWorker(null,false);
- }else{
- // 加入队列成功,且当前存在worker线程,成功返回
- return;
- }
- }else{
- // 阻塞队列已满,尝试创建一个新的非核心线程处理
- boolean addNonCoreWorkerSuccess = addWorker(command,false);
- if(!addNonCoreWorkerSuccess){
- // 创建非核心线程失败,执行拒绝策略(失败的原因和前面创建核心线程addWorker的原因类似)
- reject(command);
- }else{
- // 创建非核心线程成功,成功返回
- return;
- }
- }
- }
-
- /**
- * 根据指定的拒绝处理器,执行拒绝策略
- * */
- private void reject(Runnable command) {
- this.handler.rejectedExecution(command, this);
- }
可以看到,execute方法源码中对于任务处理的逻辑很清晰,也能与ThreadPoolExecutor运行时工作流程中所介绍的流程所匹配。
addWorker方法(创建新的工作线程)
在execute方法中当需要创建核心线程或普通线程时,便需要通过addWorker方法尝试创建一个新的工作线程。
- /**
- * 向线程池中加入worker
- * */
- private boolean addWorker(Runnable firstTask, boolean core) {
- // retry标识外层循环
- retry:
- for (;;) {
- int currentCtl = ctl.get();
-
- // 用于cas更新workerCount的内层循环(注意这里面与jdk的写法不同,改写成了逻辑一致但更可读的形式)
- for (;;) {
- // 判断当前worker数量是否超过了限制
- int workerCount = workerCountOf(currentCtl);
- if (workerCount >= CAPACITY) {
- // 当前worker数量超过了设计上允许的最大限制
- return false;
- }
- if (core) {
- // 创建的是核心线程,判断当前线程数是否已经超过了指定的核心线程数
- if (workerCount >= this.corePoolSize) {
- // 超过了核心线程数,创建核心worker线程失败
- return false;
- }
- } else {
- // 创建的是非核心线程,判断当前线程数是否已经超过了指定的最大线程数
- if (workerCount >= this.maximumPoolSize) {
- // 超过了最大线程数,创建非核心worker线程失败
- return false;
- }
- }
-
- // cas更新workerCount的值
- boolean casSuccess = compareAndIncrementWorkerCount(currentCtl);
- if (casSuccess) {
- // cas成功,跳出外层循环
- break retry;
- }
-
- // compareAndIncrementWorkerCount方法cas争抢失败,重新执行内层循环
- }
- }
-
- boolean workerStarted = false;
-
- MyWorker newWorker = null;
- try {
- // 创建一个新的worker
- newWorker = new MyWorker(firstTask);
- final Thread myWorkerThread = newWorker.thread;
- if (myWorkerThread != null) {
- // MyWorker初始化时内部线程创建成功
-
- // 加锁,防止并发更新
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
-
- try {
- if (myWorkerThread.isAlive()) {
- // 预检查线程的状态,刚初始化的worker线程必须是未唤醒的状态
- throw new IllegalThreadStateException();
- }
-
- // 加入worker集合
- this.workers.add(newWorker);
-
- int workerSize = workers.size();
- if (workerSize > largestPoolSize) {
- // 如果当前worker个数超过了之前记录的最大存活线程数,将其更新
- largestPoolSize = workerSize;
- }
-
- // 创建成功
- } finally {
- // 无论是否发生异常,都先将主控锁解锁
- mainLock.unlock();
- }
-
- // 加入成功,启动worker线程
- myWorkerThread.start();
- // 标识为worker线程启动成功,并作为返回值返回
- workerStarted = true;
- }
- }finally {
- if (!workerStarted) {
- addWorkerFailed(newWorker);
- }
- }
-
- return workerStarted;
- }
addWorker可以分为两部分:判断当前是否满足创建新工作线程的条件、创建并启动新的Worker工作线程。
判断当前是否满足创建新工作线程的条件
入口处开始的retry标识的for循环部分,便是用于判断是否满足创建新工作线程的条件。
需要注意的是:这里面有两个for循环的原因在于v1版本省略了优雅停止的逻辑(所以实际上v1版本能去掉内层循环的)。如果线程池处于停止状态则不能再创建新工作线程了,因此也需要判断线程池当前的状态,
不满足条件则也需要返回false,不创建工作线程。
而且compareAndIncrementWorkerCount中cas更新ctl时,如果并发的线程池被停止而导致线程池状态发生了变化,也会导致cas失败重新检查。
这也是jdk的实现中为什么把线程池状态和工作线程数量绑定在一起的原因之一,这样在cas更新时可以原子性的同时检查两个字段的并发争抢。(更具体的细节会在下一篇博客的v2版本中介绍)
创建并启动新的Worker工作线程
在通过retry那部分的层层条件检查后,紧接着便是实际创建新工作线程的逻辑。
虽然在前面线程池工作流程的分析中提到了核心线程与非核心线程的概念,但Worker类中实际上并没有核心/非核心的标识。
经过了工作线程启动前的条件判断后,新创建的工作线程实际上并没有真正的核心与非核心的差别。
addWorkerFailed(addWorker的逆向回滚操作)
addWorker中工作线程可能会启动失败,所以要对addWorker中对workers集合以及workerCount等数据的操作进行回滚。
- /**
- * 当创建worker出现异常失败时,对之前的操作进行回滚
- * 1 如果新创建的worker加入了workers集合,将其移除
- * 2 减少记录存活的worker个数(cas更新)
- * 3 检查线程池是否满足中止的状态,防止这个存活的worker线程阻止线程池的中止(v1版本不考虑,省略了tryTerminate)
- */
- private void addWorkerFailed(MyWorker myWorker) {
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- if (myWorker != null) {
- // 如果新创建的worker加入了workers集合,将其移除
- workers.remove(myWorker);
- }
- // 减少存活的worker个数
- decrementWorkerCount();
-
- // 尝试着将当前worker线程终止(addWorkerFailed由工作线程自己调用)
- // tryTerminate();
- } finally {
- mainLock.unlock();
- }
- }
runWorker(工作线程核心执行逻辑)
前面介绍了用户如何向线程池提交任务,以及如何创建新工作线程Worker,下面介绍工作线程在线程池中是如何运行的。
- /**
- * worker工作线程主循环执行逻辑
- * */
- private void runWorker(MyWorker myWorker) {
- // 时worker线程的run方法调用的,此时的current线程的是worker线程
- Thread workerThread = Thread.currentThread();
-
- Runnable task = myWorker.firstTask;
- // 已经暂存了firstTask,将其清空(有地方根据firstTask是否存在来判断工作线程中负责的任务是否是新提交的)
- myWorker.firstTask = null;
-
- // 将state由初始化时的-1设置为0
- // 标识着此时当前工作线程开始工作了,这样可以被interruptIfStarted选中
- myWorker.unlock();
-
- // 默认线程是由于中断退出的
- boolean completedAbruptly = true;
- try {
- // worker线程处理主循环,核心逻辑
- while (task != null || (task = getTask()) != null) {
- // 将state由0标识为1,代表着其由idle状态变成了正在工作的状态
- // 这样interruptIdleWorkers中的tryLock会失败,这样工作状态的线程就不会被该方法中断任务的正常执行
- myWorker.lock();
-
- // v1版本此处省略优雅停止相关的核心逻辑
-
- try {
- // 任务执行前的钩子函数
- beforeExecute(workerThread, task);
- Throwable thrown = null;
- try {
- // 拿到的任务开始执行
- task.run();
- } catch (RuntimeException | Error x) {
- // 使用thrown收集抛出的异常,传递给afterExecute
- thrown = x;
- // 同时抛出错误,从而中止主循环
- throw x;
- } catch (Throwable x) {
- // 使用thrown收集抛出的异常,传递给afterExecute
- thrown = x;
- // 同时抛出错误,从而中止主循环
- throw new Error(x);
- } finally {
- // 任务执行后的钩子函数,如果任务执行时抛出了错误/异常,thrown不为null
- afterExecute(task, thrown);
- }
- } finally {
- // 将task设置为null,令下一次while循环通过getTask获得新任务
- task = null;
- // 无论执行时是否存在异常,已完成的任务数加1
- myWorker.completedTasks++;
- // 无论如何将myWorker解锁,标识为idle状态
- myWorker.unlock();
- }
-
- }
- // getTask返回了null,说明没有可执行的任务或者因为idle超时、线程数超过配置等原因需要回收当前线程。
- // 线程正常的退出,completedAbruptly为false
- completedAbruptly = false;
- }finally {
- // getTask返回null,线程正常的退出,completedAbruptly值为false
- // task.run()执行时抛出了异常/错误,直接跳出了主循环,此时completedAbruptly为初始化时的默认值true
- processWorkerExit(myWorker, completedAbruptly);
-
- // processWorkerExit执行完成后,worker线程对应的run方法(run->runWorker)也会执行完毕
- // 此时线程对象会进入终止态,等待操作系统回收
- // 而且processWorkerExit方法内将传入的Worker从workers集合中移除,jvm中的对象也会因为不再被引用而被GC回收
- // 此时,当前工作线程所占用的所有资源都已释放完毕
- }
- }
getTask尝试获取任务执行
runWorker中是通过getTask获取任务的,getTask中包含着工作线程是如何从工作队列中获取任务的关键逻辑。
- /**
- * 尝试着从阻塞队列里获得待执行的任务
- * @return 返回null代表工作队列为空,没有需要执行的任务; 或者当前worker线程满足了需要退出的一些条件
- * 返回对应的任务
- * */
- private Runnable getTask() {
- boolean timedOut = false;
-
- for(;;) {
- int currentCtl = ctl.get();
-
- // 获得当前工作线程个数
- int workCount = workerCountOf(currentCtl);
-
- // 有两种情况需要指定超时时间的方式从阻塞队列workQueue中获取任务(即timed为true)
- // 1.线程池配置参数allowCoreThreadTimeOut为true,即允许核心线程在idle一定时间后被销毁
- // 所以allowCoreThreadTimeOut为true时,需要令timed为true,这样可以让核心线程也在一定时间内获取不到任务(idle状态)而被销毁
- // 2.线程池配置参数allowCoreThreadTimeOut为false,但当前线程池中的线程数量workCount大于了指定的核心线程数量corePoolSize
- // 说明当前有一些非核心的线程正在工作,而非核心的线程在idle状态一段时间后需要被销毁
- // 所以此时也令timed为true,让这些线程在keepAliveTime时间内由于队列为空拉取不到任务而返回null,将其销毁
- boolean timed = allowCoreThreadTimeOut || workCount > corePoolSize;
-
- // 有共四种情况不需要往下执行,代表
- // 1 (workCount > maximumPoolSize && workCount > 1)
- // 当前工作线程个数大于了指定的maximumPoolSize(可能是由于启动后通过setMaximumPoolSize调小了maximumPoolSize的值)
- // 已经不符合线程池的配置参数约束了,要将多余的工作线程回收掉
- // 且当前workCount > 1说明存在不止一个工作线程,意味着即使将当前工作线程回收后也还有其它工作线程能继续处理工作队列里的任务,直接返回null表示自己需要被回收
-
- // 2 (workCount > maximumPoolSize && workCount <= 1 && workQueue.isEmpty())
- // 当前工作线程个数大于了指定的maximumPoolSize(maximumPoolSize被设置为0了)
- // 已经不符合线程池的配置参数约束了,要将多余的工作线程回收掉
- // 但此时workCount<=1,说明将自己这个工作线程回收掉后就没有其它工作线程能处理工作队列里剩余的任务了
- // 所以即使maximumPoolSize设置为0,也需要等待任务被处理完,工作队列为空之后才能回收当前线程,否则还会继续拉取剩余任务
-
- // 3 (workCount <= maximumPoolSize && (timed && timedOut) && workCount > 1)
- // workCount <= maximumPoolSize符合要求
- // 但是timed && timedOut,说明timed判定命中,需要以poll的方式指定超时时间,并且最近一次拉取任务超时了timedOut=true
- // 进入新的一次循环后timed && timedOut成立,说明当前worker线程处于idle状态等待任务超过了规定的keepAliveTime时间,需要回收当前线程
- // 且当前workCount > 1说明存在不止一个工作线程,意味着即使将当前工作线程回收后也还有其它工作线程能继续处理工作队列里的任务,直接返回null表示自己需要被回收
-
- // 4 (workCount <= maximumPoolSize && (timed && timedOut) && workQueue.isEmpty())
- // workCount <= maximumPoolSize符合要求
- // 但是timed && timedOut,说明timed判定命中,需要以poll的方式指定超时时间,并且最近一次拉取任务超时了timedOut=true
- // 进入新的一次循环后timed && timedOut成立,说明当前worker线程处于idle状态等待任务超过了规定的keepAliveTime时间,需要回收当前线程
- // 但此时workCount<=1,说明将自己这个工作线程回收掉后就没有其它工作线程能处理工作队列里剩余的任务了
- // 所以即使timed && timedOut超时逻辑匹配,也需要等待任务被处理完,工作队列为空之后才能回收当前线程,否则还会继续拉取剩余任务
- if ((workCount > maximumPoolSize || (timed && timedOut))
- && (workCount > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(currentCtl)) {
- // 满足上述条件,说明当前线程需要被销毁了,返回null
- return null;
- }
-
- // compareAndDecrementWorkerCount方法由于并发的原因cas执行失败,continue循环重试
- continue;
- }
-
- try {
- // 根据上面的逻辑的timed标识,决定以什么方式从阻塞队列中获取任务
- Runnable r = timed ?
- // timed为true,通过poll方法指定获取任务的超时时间(如果指定时间内没有队列依然为空,则返回)
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- // timed为false,通过take方法无限期的等待阻塞队列中加入新的任务
- workQueue.take();
- if (r != null) {
- // 获得了新的任务,getWork正常返回对应的任务对象
- return r;
- }else{
- // 否则说明timed=true,且poll拉取任务时超时了
- timedOut = true;
- }
- } catch (InterruptedException retry) {
- // poll or take任务等待时worker线程被中断了,捕获中断异常
- // timeout = false,标识拉取任务时没有超时
- timedOut = false;
- }
- }
- }
processWorkerExit(处理工作线程退出)
在runWorker中,如果getTask方法没有拿到任务返回了null或者任务在执行时抛出了异常就会在最终的finally块中调用processWorkerExit方法,令当前工作线程销毁退出。
工作线程退出时所占用资源的回收
- /**
- * 处理worker线程退出
- * @param myWorker 需要退出的工作线程对象
- * @param completedAbruptly 是否是因为中断异常的原因,而需要回收
- * */
- private void processWorkerExit(MyWorker myWorker, boolean completedAbruptly) {
- if (completedAbruptly) {
- // 如果completedAbruptly=true,说明是任务在run方法执行时出错导致的线程退出
- // 而正常退出时completedAbruptly=false,在getTask中已经将workerCount的值减少了
- decrementWorkerCount();
- }
-
- ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- // 线程池全局总完成任务数累加上要退出的工作线程已完成的任务数
- this.completedTaskCount += myWorker.completedTasks;
- // workers集合中将当前工作线程剔除
- workers.remove(myWorker);
-
- // completedTaskCount是long类型的,workers是HashSet,
- // 都是非线程安全的,所以在mainLock的保护进行修改
- } finally {
- mainLock.unlock();
- }
-
- int currentCtl = this.ctl.get();
-
- if (!completedAbruptly) {
- // completedAbruptly=false,说明不是因为中断异常而退出的
- // min标识当前线程池允许的最小线程数量
- // 1 如果allowCoreThreadTimeOut为true,则核心线程也可以被销毁,min=0
- // 2 如果allowCoreThreadTimeOut为false,则min应该为所允许的核心线程个数,min=corePoolSize
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- if (min == 0 && ! workQueue.isEmpty()) {
- // 如果min为0了,但工作队列不为空,则修正min=1,因为至少需要一个工作线程来将工作队列中的任务消费、处理掉
- min = 1;
- }
- if (workerCountOf(currentCtl) >= min) {
- // 如果当前工作线程数大于了min,当前线程数量是足够的,直接返回(否则要执行下面的addWorker恢复)
- return;
- }
- }
- // 两种场景会走到这里进行addWorker操作
- // 1 completedAbruptly=true,说明线程是因为中断异常而退出的,需要重新创建一个新的工作线程
- // 2 completedAbruptly=false,且上面的workerCount<min,则说明当前工作线程数不够,需要创建一个
- // 为什么参数core传的是false呢?
- // 因为completedAbruptly=true而中断退出的线程,无论当前工作线程数是否大于核心线程,都需要创建一个新的线程来代替原有的被退出的线程
- addWorker(null, false);
- }
动态修改配置参数
ThreadPoolExecutor除了支持启动前通过构造函数设置配置参数外,也允许在线程池运行的过程中动态的更改配置。而要实现动态的修改配置,麻烦程度要比启动前静态的指定大得多。
举个例子,在线程池的运行过程中如果当前corePoolSize=20,且已经创建了20个核心线程时(workerCount=20),现在将corePoolSize减少为10或者增大为30时应该如何实时的生效呢?
下面通过内嵌于代码中的注释,详细的说明了allowCoreThreadTimeOut、corePoolSize、maximumPoolSize这三个关键配置参数实现动态修改的原理。
- /**
- * 设置是否允许核心线程idle超时后退出
- * */
- public void allowCoreThreadTimeOut(boolean value) {
- if (value && keepAliveTime <= 0) {
- throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
- }
- // 判断一下新旧值是否相等,避免无意义的volatile变量更新,导致不必要的cpu cache同步
- if (value != allowCoreThreadTimeOut) {
- allowCoreThreadTimeOut = value;
- if (value) {
- // 参数值value为true,说明之前不允许核心线程由于idle超时而退出
- // 而此时更新为true说明现在允许了,则通过interruptIdleWorkers唤醒所有的idle线程
- // 令其走一遍runWorker中的逻辑,尝试着让idle超时的核心线程及时销毁
- interruptIdleWorkers();
- }
- }
- }
-
- /**
- * 动态更新核心线程最大值corePoolSize
- * */
- public void setCorePoolSize(int corePoolSize) {
- if (corePoolSize < 0) {
- throw new IllegalArgumentException();
- }
-
- // 计算差异
- int delta = corePoolSize - this.corePoolSize;
- // 赋值
- this.corePoolSize = corePoolSize;
- if (workerCountOf(this.ctl.get()) > corePoolSize) {
- // 更新完毕后,发现当前工作线程数超过了指定的值
- // 唤醒所有idle线程,让目前空闲的idle超时的线程在workerCount大于maximumPoolSize时及时销毁
- interruptIdleWorkers();
- } else if (delta > 0) {
- // 差异大于0,代表着新值大于旧值
-
- // We don't really know how many new threads are "needed".
- // As a heuristic, prestart enough new workers (up to new
- // core size) to handle the current number of tasks in
- // queue, but stop if queue becomes empty while doing so.
- // 我们无法确切的知道有多少新的线程是所需要的。
- // 启发式的预先启动足够的新工作线程用于处理工作队列中的任务
- // 但当执行此操作时工作队列为空了,则立即停止此操作(队列为空了说明当前负载较低,再创建更多的工作线程是浪费资源)
-
- // 取差异和当前工作队列中的最小值为k
- int k = Math.min(delta, workQueue.size());
-
- // 尝试着一直增加新的工作线程,直到和k相同
- // 这样设计的目的在于控制增加的核心线程数量,不要一下子创建过多核心线程
- // 举个例子:原来的corePoolSize是10,且工作线程数也是10,现在新值设置为了30,新值比旧值大20,理论上应该直接创建20个核心工作线程
- // 而工作队列中的任务数只有10,那么这个时候直接创建20个新工作线程是没必要的,只需要一个一个创建,在创建的过程中新的线程会尽量的消费工作队列中的任务
- // 这样就可以以一种启发性的方式创建合适的新工作线程,一定程度上节约资源。后面再有新的任务提交时,再从runWorker方法中去单独创建核心线程(类似惰性创建)
- while (k-- > 0 && addWorker(null, true)) {
- if (workQueue.isEmpty()) {
- // 其它工作线程在循环的过程中也在消费工作线程,且用户也可能不断地提交任务
- // 这是一个动态的过程,但一旦发现当前工作队列为空则立即结束
- break;
- }
- }
- }
- }
-
- /**
- * 动态更新最大线程数maximumPoolSize
- * */
- public void setMaximumPoolSize(int maximumPoolSize) {
- if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
- throw new IllegalArgumentException();
- }
- this.maximumPoolSize = maximumPoolSize;
- if (workerCountOf(this.ctl.get()) > maximumPoolSize) {
- // 更新完毕后,发现当前工作线程数超过了指定的值
- // 唤醒所有idle线程,让目前空闲的idle超时的线程在workerCount大于maximumPoolSize时及时销毁
- interruptIdleWorkers();
- }
- }
目前为止,通过v1版本的MyThreadPoolExecutor源码,已经将jdk线程池ThreadPoolExecutor在RUNNING状态下提交任务,启动工作线程执行任务相关的核心逻辑讲解完毕了(不考虑优雅停止)。
jdk线程池默认支持的四种拒绝策略
jdk线程池支持用户传入自定义的拒绝策略处理器,只需要传入实现了RejectedExecutionHandler接口的对象就行。
而jdk在ThreadPoolExecutor中提供了默认的四种拒绝策略方便用户使用。
上面介绍的四种jdk默认拒绝策略分别适应不同的业务场景,需要用户仔细考虑最适合的拒绝策略。同时灵活的、基于接口的设计也开放的支持用户去自己实现更贴合自己业务的拒绝策略处理器。
- /**
- * 默认的拒绝策略:AbortPolicy
- * */
- private static final MyRejectedExecutionHandler defaultHandler = new MyAbortPolicy();
-
- /**
- * 抛出RejectedExecutionException的拒绝策略
- * 评价:能让提交任务的一方感知到异常的策略,比较通用,也是jdk默认的拒绝策略
- * */
- public static class MyAbortPolicy implements MyRejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
- // 直接抛出异常
- throw new RejectedExecutionException("Task " + command.toString() +
- " rejected from " + executor.toString());
- }
- }
-
- /**
- * 令调用者线程自己执行command任务的拒绝策略
- * 评价:在线程池压力过大时,让提交任务的线程自己执行该任务(异步变同步),
- * 能够有效地降低线程池的压力,也不会丢失任务,但可能导致整体业务吞吐量大幅降低
- * */
- public static class MyCallerRunsPolicy implements MyRejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
- if (!executor.isShutdown()) {
- // 如果当前线程池不是shutdown状态,则令调用者线程自己执行command任务
- command.run();
- }else{
- // 如果已经是shutdown状态了,就什么也不做直接丢弃任务
- }
- }
- }
-
- /**
- * 直接丢弃任务的拒绝策略
- * 评价:简单的直接丢弃任务,适用于对任务执行成功率要求不高的场合
- * */
- public static class MyDiscardPolicy implements MyRejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
- // 什么也不做的,直接返回
- // 效果就是command任务被无声无息的丢弃了,没有异常
- }
- }
-
- /**
- * 丢弃当前工作队列中最早入队的任务,然后将当前任务重新提交
- * 评价:适用于后出现的任务能够完全代替之前任务的场合(追求最终一致性)
- * */
- public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandler {
- @Override
- public void rejectedExecution(Runnable command, MyThreadPoolExecutor executor) {
- if (!executor.isShutdown()) {
- // 如果当前线程池不是shutdown状态,则丢弃当前工作队列中最早入队的任务,然后将当前任务重新提交
- executor.getQueue().poll();
- executor.execute(command);
- }else{
- // 如果已经是shutdown状态了,就什么也不做直接丢弃任务
- }
- }
- }
jdk默认的四种线程池实现
jdk中除了提供了默认的拒绝策略,还在Executors类中提供了四种基于ThreadPoolExecutor的、比较常用的线程池,以简化用户对线程池的使用。
这四种线程池可以通过Executors提供的public方法来分别创建:
newFixedThreadPool
newFixedThreadPool方法创建一个工作线程数量固定的线程池,其创建ThreadPoolExecutor时传入的核心线程数corePoolSize和最大线程数maximumPoolSize是相等的。
因此其工作队列传入是一个无界的LinkedBlockingQueue,无界的工作队列意味着永远都不会创建新的非核心线程。
在默认allowCoreThreadTimeOut为false的情况下,线程池中的所有线程都是不会因为idle超时而销毁的核心线程。
适用场景:由于工作线程数量固定,“fixedThreadPool”适用于任务流量较为稳定的场景
- public static ExecutorService newFixedThreadPool(int nThreads) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
()); - }
newCachedThreadPool
newCachedThreadPool方法创建一个工作线程数量有巨大弹性的线程池,其核心线程数corePoolSize=0而最大线程数maximumPoolSize为Integer.MAX_VALUE,60s的保活时间。
同时其工作队列是SynchronousQueue,是一种队列容量为0、无法缓存任何任务的阻塞队列(任何时候插入数据(offer)时必须有消费者线程消费,否则生产者线程将会被阻塞)。
这也意味着“cachedThreadPool”中没有核心线程,所有工作线程在任务负载较低时都会在60s的idle后被销毁;同时当负载较高,新任务到来时由于所有的工作线程都在执行其它任务,将会立即创建一个新的非核心线程来处理任务。
适用场景:由于可以无限制的创建新线程来做到及时响应任务,“cachedThreadPool”适用于任务流量较大且不稳定,对任务延迟容忍度较低的场景
- public static ExecutorService newCachedThreadPool() {
- return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue
()); - }
newSingleThreadExecutor
newSingleThreadExecutor方法创建一个单线程的线程池,其核心线程数corePoolSize=1且最大线程数maximumPoolSize也为1,其工作队列是无界队列。
这意味着“singleThreadExecutor”中任何提交的任务都将严格按照先入先出的顺序被执行。
适用场景:“singleThreadExecutor”适用于任务量较小、对任务延迟容忍度较高、并要求任务顺序执行的场景。
- public static ExecutorService newSingleThreadExecutor() {
- return new FinalizableDelegatedExecutorService
- (new ThreadPoolExecutor(1, 1,
- 0L, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue
())); - }
newScheduledThreadPool
newScheduledThreadPool方法创建一个支持定时任务、延迟任务执行的线程池(关于jdk定时任务线程池ScheduledThreadPoolExecutor的工作原理会在未来的博客中展开)
适用场景:“scheduledThreadPool”适用于需要任务定时或者延迟执行的场景。
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
- return new ScheduledThreadPoolExecutor(corePoolSize);
- }
-
- public ScheduledThreadPoolExecutor(int corePoolSize) {
- super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
- new DelayedWorkQueue());
- }
jdk默认提供的线程池的缺陷