• 面试官: 有了解过线程池的工作原理吗?说说看


    本节主要带大家从ThreadPoolExecutor源码角度来了解一下线程池的工作原理,一起来看下吧~

    Executor 接口

    首先Executor这个接口是线程池实现的顶层接口类,我们上节遇到的ExecutorService也是继承了Executor

    public interface ExecutorService extends Executor {...}
    

    ExecutorService的上层AbstractExecutorService这个抽象类实现了接口ExecutorService

    public abstract class AbstractExecutorService implements ExecutorService {...}
    

    ThreadPoolExecutor继承了AbstractExecutorService

    public class ThreadPoolExecutor extends AbstractExecutorService {...}
    

    ThreadPoolExecutor这个类我们需要重点看一下,它是接口的实现类,我们以newCachedThreadPool为例

    1. public static ExecutorService newCachedThreadPool() {
    2. return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
    3. 60L, TimeUnit.SECONDS,
    4. new SynchronousQueue<Runnable>());
    5. }

    可以看到内部其实还是调用了ThreadPoolExecutor,我们再看newFixedThreadPool

    1. public static ExecutorService newFixedThreadPool(int nThreads) {
    2. return new ThreadPoolExecutor(nThreads, nThreads,
    3. 0L, TimeUnit.MILLISECONDS,
    4. new LinkedBlockingQueue<Runnable>());
    5. }

    内部也是调了它, 下面我们就看下这个类

    ThreadPoolExecutor

    首先我们从构造函数看起,它主要有四个构造函数

    构造函数一

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue) {
    6. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    7. Executors.defaultThreadFactory(), defaultHandler);
    8. }

    一共有五个参数:

    • corePoolSize - 保留在池中的线​​程数,即使是空闲的,除非设置allowCoreThreadTimeOut
    • maximumPoolSize – 池中允许的最大线程数
    • keepAliveTime – 当线程数大于核心时,这是多余的空闲线程在终止前等待新任务的最长时间。
    • unit – keepAliveTime参数的时间单位
    • workQueue – 用于在执行任务之前保存任务的队列。此队列将仅保存由execute方法提交的Runnable任务。

    构造函数二

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. ThreadFactory threadFactory) {
    7. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    8. threadFactory, defaultHandler);
    9. }

    其它参数同上

    • threadFactory 执行器创建新线程时使用的工厂

    构造函数三

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. RejectedExecutionHandler handler) {
    7. this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
    8. Executors.defaultThreadFactory(), handler);
    9. }
    • handler 由于达到线程边界和队列容量而阻塞执行时使用的处理程序

    构造函数四

    1. public ThreadPoolExecutor(int corePoolSize,
    2. int maximumPoolSize,
    3. long keepAliveTime,
    4. TimeUnit unit,
    5. BlockingQueue<Runnable> workQueue,
    6. ThreadFactory threadFactory,
    7. RejectedExecutionHandler handler) {
    8. if (corePoolSize < 0 ||
    9. maximumPoolSize <= 0 ||
    10. maximumPoolSize < corePoolSize ||
    11. keepAliveTime < 0)
    12. throw new IllegalArgumentException();
    13. if (workQueue == null || threadFactory == null || handler == null)
    14. throw new NullPointerException();
    15. this.acc = System.getSecurityManager() == null ?
    16. null :
    17. AccessController.getContext();
    18. this.corePoolSize = corePoolSize;
    19. this.maximumPoolSize = maximumPoolSize;
    20. this.workQueue = workQueue;
    21. this.keepAliveTime = unit.toNanos(keepAliveTime);
    22. this.threadFactory = threadFactory;
    23. this.handler = handler;
    24. }

    这个把之前都综合了一下,其实可以看到前几个内部都调用了this,调用自身,也就是调用这个构造函数,进行一些初始化

    BlockingQueue 阻塞队列

    有几个参数比较好理解,我们来看下这个参数workQueue, 它是一个阻塞队列,这里简要给大家提一下,这块内容也比较重要,后边会专门去讲

    BlockingQueue本身是一个还接口,它有几个比较常用的阻塞队列

    • LinkedBlockingQueue 链式阻塞队列,底层数据结构是链表

    • ArrayBlockingQueue 数组阻塞队列,底层数据结构是数组,需要指定队列的大小。

    • SynchronousQueue 同步队列,内部容量为0,每个put操作必须等待一个take操作

    • DelayQueue 延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素

    ThreadFactory 线程工厂

    这个是线程工厂类,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等, 同样它也是一个接口,我们在ThreadPoolExecutor内部看到了 Executors.defaultThreadFactory(),这个是一个默认工厂

    1. static class DefaultThreadFactory implements ThreadFactory {
    2. private static final AtomicInteger poolNumber = new AtomicInteger(1);
    3. private final ThreadGroup group;
    4. private final AtomicInteger threadNumber = new AtomicInteger(1);
    5. private final String namePrefix;
    6. DefaultThreadFactory() {
    7. SecurityManager s = System.getSecurityManager();
    8. group = (s != null) ? s.getThreadGroup() :
    9. Thread.currentThread().getThreadGroup();
    10. namePrefix = "pool-" +
    11. poolNumber.getAndIncrement() +
    12. "-thread-";
    13. }
    14. public Thread newThread(Runnable r) {
    15. Thread t = new Thread(group, r,
    16. namePrefix + threadNumber.getAndIncrement(),
    17. 0);
    18. if (t.isDaemon())
    19. t.setDaemon(false);
    20. if (t.getPriority() != Thread.NORM_PRIORITY)
    21. t.setPriority(Thread.NORM_PRIORITY);
    22. return t;
    23. }
    24. }

    没有指定参数,就会默认创建DefaultThreadFactory,还有其它的factory,大家可以自行看下,区别就在创建线程时指定的参数

    RejectedExecutionHandler 拒绝策略

    RejectedExecutionHandler同样是一个接口,这个处理器是用来专门处理拒绝的任务,也就是ThreadPoolExecutor无法处理的程序。同理,我们可以看到ThreadPoolExecutor内部有调了defaultHandler

    1. private static final RejectedExecutionHandler defaultHandler =
    2. new AbortPolicy();

    这个是默认的拒绝策略, 可以看到它的默认处理是抛出拒绝的异常

    1. public static class AbortPolicy implements RejectedExecutionHandler {
    2. /**
    3. * Creates an {@code AbortPolicy}.
    4. */
    5. public AbortPolicy() { }
    6. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    7. throw new RejectedExecutionException("Task " + r.toString() +
    8. " rejected from " +
    9. e.toString());
    10. }
    11. }

    再带大家看下另外的策略, DiscardPolicy,这个策略不会抛出异常,它会丢弃这个任务

    1. public static class DiscardPolicy implements RejectedExecutionHandler {
    2. /**
    3. * Creates a {@code DiscardPolicy}.
    4. */
    5. public DiscardPolicy() { }
    6. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    7. }
    8. }

    DiscardOldestPolicy 该策略丢弃最旧的未处理请求,然后重试execute ,除非执行程序被关闭,在这种情况下任务被丢弃。

    1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    2. /**
    3. * Creates a {@code DiscardOldestPolicy} for the given executor.
    4. */
    5. public DiscardOldestPolicy() { }
    6. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    7. // 判断是否关闭
    8. if (!e.isShutdown()) {
    9. e.getQueue().poll();
    10. // 任务重试
    11. e.execute(r);
    12. }
    13. }
    14. }

    CallerRunsPolicy 直接在execute方法的调用线程中运行被拒绝的任务,除非执行程序已关闭,在这种情况下,任务将被丢弃。

    1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
    2. /**
    3. * Creates a {@code CallerRunsPolicy}.
    4. */
    5. public CallerRunsPolicy() { }
    6. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    7. // 直接判断是否关闭 未关闭就执行
    8. if (!e.isShutdown()) {
    9. r.run();
    10. }
    11. }
    12. }

    线程调度策略

    看完构造函数,下面看下它的一些常量

    1. private static final int COUNT_BITS = Integer.SIZE - 3;
    2. // runState is stored in the high-order bits
    3. private static final int RUNNING = -1 << COUNT_BITS;
    4. private static final int SHUTDOWN = 0 << COUNT_BITS;
    5. private static final int STOP = 1 << COUNT_BITS;
    6. private static final int TIDYING = 2 << COUNT_BITS;
    7. private static final int TERMINATED = 3 << COUNT_BITS;

    通过变量名,我们大致知道是用来表示线程池的状态。线程池本身有一个调度线程,这个线程就是用于管理整个线程池的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等,所以它本身也有上面的状态值。

    当线程池被创建后就会处于RUNNING状态, 主池控制状态ctl是一个原子整数

     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    

    调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,不会等待阻塞队列的任务完成。

    1. public void shutdown() {
    2. final ReentrantLock mainLock = this.mainLock;
    3. mainLock.lock();
    4. try {
    5. checkShutdownAccess();
    6. advanceRunState(SHUTDOWN);
    7. interruptIdleWorkers();
    8. onShutdown(); // hook for ScheduledThreadPoolExecutor
    9. } finally {
    10. mainLock.unlock();
    11. }
    12. tryTerminate();
    13. }

    另外,还有一个shutdownNow,调用后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。

    1. public List<Runnable> shutdownNow() {
    2. List<Runnable> tasks;
    3. final ReentrantLock mainLock = this.mainLock;
    4. mainLock.lock();
    5. try {
    6. checkShutdownAccess();
    7. advanceRunState(STOP);
    8. // 中断所有线程
    9. interruptWorkers();
    10. // 将任务队列排空到一个新列表中 这里要注意下
    11. tasks = drainQueue();
    12. } finally {
    13. mainLock.unlock();
    14. }
    15. tryTerminate();
    16. return tasks;
    17. }

    当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。

    1. final void tryTerminate() {
    2. for (;;) {
    3. int c = ctl.get();
    4. if (isRunning(c) ||
    5. runStateAtLeast(c, TIDYING) ||
    6. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
    7. return;
    8. // 不等于0时 中断任务线程
    9. if (workerCountOf(c) != 0) { // Eligible to terminate
    10. interruptIdleWorkers(ONLY_ONE);
    11. return;
    12. }
    13. final ReentrantLock mainLock = this.mainLock;
    14. mainLock.lock();
    15. try {
    16. // 将状态设置为 TIDYING
    17. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    18. try {
    19. // 终止
    20. terminated();
    21. } finally {
    22. // 执行完 terminated 转为 TERMINATED状态
    23. ctl.set(ctlOf(TERMINATED, 0));
    24. termination.signalAll();
    25. }
    26. return;
    27. }
    28. } finally {
    29. mainLock.unlock();
    30. }
    31. // else retry on failed CAS
    32. }
    33. }

    execute

    这个是执行任务的核心方法,我们一起看一下

    1. public void execute(Runnable command) {
    2. // 如果任务不存在 抛空异常
    3. if (command == null)
    4. throw new NullPointerException();
    5. // 获取当前状态值
    6. int c = ctl.get();
    7. // 当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
    8. if (workerCountOf(c) < corePoolSize) {
    9. if (addWorker(command, true))
    10. return;
    11. c = ctl.get();
    12. }
    13. // 如果不小于corePoolSize,则将任务添加到workQueue队列。
    14. if (isRunning(c) && workQueue.offer(command)) {
    15. int recheck = ctl.get();
    16. // 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
    17. if (! isRunning(recheck) && remove(command))
    18. reject(command);
    19. // 线程池处于running状态,但是没有线程,则创建线程
    20. else if (workerCountOf(recheck) == 0)
    21. addWorker(null, false);
    22. }
    23. // 如果放入workQueue失败,则创建非核心线程执行任务,
    24. else if (!addWorker(command, false))
    25. // 如果这时创建失败,就会执行拒绝策略。
    26. reject(command);
    27. }

    在源码中,我们可以看到,多次进行了isRunning判断。在多线程的环境下,线程池的状态是多变的。很有可能刚获取线程池状态后线程池状态就改变了

    总结

    下面给大家简要的总结一下线程池的处理流程

    1. 线程总数量小于线程池中保留的线程数量(corePoolSize),无论线程是否空闲,都会新建一个核心线程执行任务,这一步需要获取全局锁

    2. 线程总数量大于corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行,从而达到线程的复用

    3. 当缓存队列满了,会创建非核心线程去执行这个任务。

    4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取拒绝策略进行处理。

    结束语

    它的源码还是比较长的,一篇文章说不清楚,有兴趣的同学可以通过本篇文章的理解继续阅读它的源码。

    下一节, 继续带大家详探讨ThreadPoolExecutor中是如何进行线程复用

    关注关注,主页更多的java课程学习路线,笔记,面试等架构资料

     

  • 相关阅读:
    [附源码]Python计算机毕业设计Django大学生考勤管理系统论文
    stm32---外部中断
    MySQL最新2023年面试题及答案,汇总版(3)【MySQL最新2023年面试题及答案,汇总版-第三十三刊】
    记录vite下使用require报错和解决办法
    Javascript 笔记:object
    盘点52个Python各行各业管理系统源码Python爱好者不容错过
    kafka简介
    数据结构之堆
    深度讲解React Props
    使用枚举 代替简单工厂的switch或者if else
  • 原文地址:https://blog.csdn.net/uuqaz/article/details/125504913