1.实现原理
- public class ThreadPoolExecutor extends AbstractExecutorService {
- //...
- private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
- // 存放任务的阻塞队列
- private final BlockingQueue<Runnable> workQueue;
- // 对线程池内部各种变量进⾏互斥访问控制
- private final ReentrantLock mainLock = new ReentrantLock();
- // 线程集合
- private final HashSet<Worker> workers = new HashSet<Worker>();
- //...
- }
- private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
- // ...
- final Thread thread; // Worker封装的线程
- Runnable firstTask; // Worker接收到的第1个任务
- volatile long completedTasks; // Worker执⾏完毕的任务个数
- // ...
- }

- protected void beforeExecute(Thread t, Runnable r) { }
- protected void afterExecute(Runnable r, Throwable t) { }
- protected void terminated() { }




shutdownNow()调⽤了 interruptWorkers(); ⽅法:
interruptIfStarted() ⽅法的实现:
在上⾯的代码中,shutdown() 和shutdownNow()都调⽤了tryTerminate()⽅法,如下所示:
所以,TIDYING和TREMINATED的区别是在⼆者之间执⾏了⼀个钩⼦⽅法terminated(),⽬前是⼀个空实现。
- public void execute(Runnable command) {
- if (command == null)
- throw new NullPointerException();
- int c = ctl.get();
- // 如果当前线程数⼩于corePoolSize,则启动新线程
- if (workerCountOf(c) < corePoolSize) {
- // 添加Worker,并将command设置为Worker线程的第⼀个任务开始执⾏。
- if (addWorker(command, true))
- return;
- c = ctl.get();
- }
- // 如果当前的线程数⼤于或等于corePoolSize,则调⽤workQueue.offer放⼊队列
- if (isRunning(c) && workQueue.offer(command)) {
- int recheck = ctl.get();
- // 如果线程池正在停⽌,则将command任务从队列移除,并拒绝command任务请求。
- if (! isRunning(recheck) && remove(command))
- reject(command);
- // 放⼊队列中后发现没有线程执⾏任务,开启新线程
- else if (workerCountOf(recheck) == 0)
- addWorker(null, false);
- }
- // 线程数⼤于maxPoolSize,并且队列已满,调⽤拒绝策略
- else if (!addWorker(command, false))
- reject(command);
- }
- // 该⽅法⽤于启动新线程。如果第⼆个参数为true,则使⽤corePoolSize作为上限,否则使⽤maxPoolSize
- 作为上限。
- private boolean addWorker(Runnable firstTask, boolean core) {
- retry:
- for (int c = ctl.get();;) {
- // 如果线程池状态值起码是SHUTDOWN和STOP,或则第⼀个任务不是null,或者⼯作队列为空
- // 则添加worker失败,返回false
- if (runStateAtLeast(c, SHUTDOWN)
- && (runStateAtLeast(c, STOP)
- || firstTask != null
- || workQueue.isEmpty()))
- return false;
- for (;;) {
- // ⼯作线程数达到上限,要么是corePoolSize要么是maximumPoolSize,启动线程失败
- if (workerCountOf(c)
- >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
- return false;
- // 增加worker数量成功,返回到retry语句
- if (compareAndIncrementWorkerCount(c))
- break retry;
- c = ctl.get(); // Re-read ctl
- // 如果线程池运⾏状态起码是SHUTDOWN,则重试retry标签语句,CAS
- if (runStateAtLeast(c, SHUTDOWN))
- continue retry;
- // else CAS failed due to workerCount change; retry inner loop
- }
- }
- // worker数量加1成功后,接着运⾏:
- boolean workerStarted = false;
- boolean workerAdded = false;
- Worker w = null;
- try {
- // 新建worker对象
- w = new Worker(firstTask);
- // 获取线程对象
- final Thread t = w.thread;
- if (t != null) {
- final ReentrantLock mainLock = this.mainLock;
- // 加锁
- mainLock.lock();
- try {
- // Recheck while holding lock.
- // Back out on ThreadFactory failure or if
- // shut down before lock acquired.
- int c = ctl.get();
- if (isRunning(c) ||
- (runStateLessThan(c, STOP) && firstTask == null)) {
- // 由于线程已经在运⾏中,⽆法启动,抛异常
- if (t.isAlive()) // precheck that t is startable
- throw new IllegalThreadStateException();
- // 将线程对应的worker加⼊worker集合
- workers.add(w);
- int s = workers.size();
- if (s > largestPoolSize)
- largestPoolSize = s;
- workerAdded = true;
- }
- } finally {
- // 释放锁
- mainLock.unlock();
- }
- // 如果添加worker成功,则启动该worker对应的线程
- if (workerAdded) {
- t.start();
- workerStarted = true;
- }
- }
- } finally {
- // 如果启动新线程失败
- if (! workerStarted)
- // workCount - 1
- addWorkerFailed(w);
- }
- return workerStarted; }
- private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
- // 当前Worker对象封装的线程
- final Thread thread;
- // 线程需要运⾏的第⼀个任务。可以是null,如果是null,则线程从队列获取任务
- Runnable firstTask;
- // 记录线程执⾏完成的任务数量,每个线程⼀个计数器
- volatile long completedTasks;
- /**
- * 使⽤给定的第⼀个任务并利⽤线程⼯⼚创建Worker实例
- * @param firstTask 线程的第⼀个任务,如果没有,就设置为null,此时线程会从队列获取任务。
- */
- Worker(Runnable firstTask) {
- setState(-1); // 线程处于阻塞状态,调⽤runWorker的时候中断
- this.firstTask = firstTask;
- this.thread = getThreadFactory().newThread(this);
- }
- // 调⽤ThreadPoolExecutor的runWorker⽅法执⾏线程的运⾏
- public void run() {
- runWorker(this);
- }
- }
- final void runWorker(Worker w) {
- Thread wt = Thread.currentThread();
- Runnable task = w.firstTask;
- w.firstTask = null;
- // 中断Worker封装的线程
- w.unlock();
- boolean completedAbruptly = true;
- try {
- // 如果线程初始任务不是null,或者从队列获取的任务不是null,表示该线程应该执⾏任务。
- while (task != null || (task = getTask()) != null) {
- // 获取线程锁
- w.lock();
- // 如果线程池停⽌了,确保线程被中断
- // 如果线程池正在运⾏,确保线程不被中断
- if ((runStateAtLeast(ctl.get(), STOP) ||
- (Thread.interrupted() &&
- runStateAtLeast(ctl.get(), STOP))) &&
- !wt.isInterrupted())
- // 获取到任务后,再次检查线程池状态,如果发现线程池已经停⽌,则给⾃⼰发中断信号
- wt.interrupt();
- try {
- // 任务执⾏之前的钩⼦⽅法,实现为空
- beforeExecute(wt, task);
- try {
- task.run();
- // 任务执⾏结束后的钩⼦⽅法,实现为空
- afterExecute(task, null);
- } catch (Throwable ex) {
- afterExecute(task, ex);
- throw ex;
- }
- } finally {
- // 任务执⾏完成,将task设置为null
- task = null;
- // 线程已完成的任务数加1
- w.completedTasks++;
- // 释放线程锁
- w.unlock();
- }
- }
- // 判断线程是否是正常退出
- completedAbruptly = false;
- } finally {
- // Worker退出
- processWorkerExit(w, completedAbruptly);
- }
- }
- private Runnable getTask() {
- boolean timedOut = false; // Did the last poll() time out?
- for (;;) {
- int c = ctl.get();
- // 如果线程池调⽤了shutdownNow(),返回null
- // 如果线程池调⽤了shutdown(),并且任务队列为空,也返回null
- if (runStateAtLeast(c, SHUTDOWN)
- && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
- // ⼯作线程数减⼀
- decrementWorkerCount();
- return null;
- }
- int wc = workerCountOf(c);
- // Are workers subject to culling?
- boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
- if ((wc > maximumPoolSize || (timed && timedOut))
- && (wc > 1 || workQueue.isEmpty())) {
- if (compareAndDecrementWorkerCount(c))
- return null;
- continue;
- }
- try {
- // 如果队列为空,就会阻塞pool或者take,前者有超时时间,后者没有超时时间
- // ⼀旦中断,此处抛异常,对应上⽂场景1。
- Runnable r = timed ?
- workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
- workQueue.take();
- if (r != null)
- return r;
- timedOut = true;
- } catch (InterruptedException retry) {
- timedOut = false;
- }
- }
- }
- private void processWorkerExit(Worker w, boolean completedAbruptly) {
- // 如果线程正常退出,不会执⾏if的语句,这⾥⼀般是⾮正常退出,需要将worker数量减⼀
- if (completedAbruptly)
- decrementWorkerCount();
- final ReentrantLock mainLock = this.mainLock;
- mainLock.lock();
- try {
- completedTaskCount += w.completedTasks;
- // 将⾃⼰的worker从集合移除
- workers.remove(w);
- } finally {
- mainLock.unlock();
- }
- // 每个线程在结束的时候都会调⽤该⽅法,看是否可以停⽌线程池
- tryTerminate();
- int c = ctl.get();
- // 如果在线程退出前,发现线程池还没有关闭
- if (runStateLessThan(c, STOP)) {
- if (!completedAbruptly) {
- int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
- // 如果线程池中没有其他线程了,并且任务队列⾮空
- if (min == 0 && ! workQueue.isEmpty())
- min = 1;
- // 如果⼯作线程数⼤于min,表示队列中的任务可以由其他线程执⾏,退出当前线程
- if (workerCountOf(c) >= min)
- return; // replacement not needed
- }
- // 如果当前线程退出前发现线程池没有结束,任务队列不是空的,也没有其他线程来执⾏
- // 就再启动⼀个线程来处理。
- addWorker(null, false);
- }
- }
12.线程池的4种拒绝策略
在execute(Runnable command)的最后,调⽤了reject(command)执⾏拒绝策略,代码如下所示:

handler就是我们可以设置的拒绝策略管理器:

ThreadPoolExecutor类中默认的实现是:
策略2:线程池抛异常:
策略3:线程池直接丢掉任务,神不知⻤不觉:
策略4:删除队列中最早的任务,将当前任务⼊队列

示例程序:
-
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.TimeUnit;
- public class ThreadPoolExecutorDemo {
- public static void main(String[] args) {
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- 3,
- 5,
- 1,
- TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(3),
- // new ThreadPoolExecutor.AbortPolicy()
- // new ThreadPoolExecutor.CallerRunsPolicy()
- // new ThreadPoolExecutor.DiscardOldestPolicy()
- new ThreadPoolExecutor.DiscardPolicy()
- );
- for (int i = 0; i < 20; i++) {
- int finalI = i;
- executor.execute(new Runnable() {
- @Override
- public void run() {
- System.out.println(Thread.currentThread().getId() + "[" + finalI
- + "] -- 开始");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(Thread.currentThread().getId() + "[" + finalI
- + "] -- 结束");
- }
- });
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- executor.shutdown();
- boolean flag = true;
- try {
- do {
- flag = !executor.awaitTermination(1, TimeUnit.SECONDS);
- System.out.println(flag);
- } while (flag);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println("线程池关闭成功。。。");
- System.out.println(Thread.currentThread().getId());
- }
- }