• ThreadPoolExecutor源码分析


    为什么要使用线程池?

    减少开销,便于管理。

    一、线程池的核心属性

    为了可以更清晰的去分析线程池的源码,核心属性必须掌握

    ctl属性,维护这线程池状态以及工作线程个数。

    1. //线程池的核心属性
    2. // 核心属性就是ctl,如果不认识AtomicInteger,就把ctl当成int看
    3. // ctl一个int类型表示了线程池的2大核心内容
    4. // 第一个:线程池的状态
    5. // 第二个:工作线程个数
    6. // ctl是int类型,而int是32个bit位组成的数值
    7. // 高3位记录:线程池的状态,低29位记录:工作线程个数
    8. private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    9. // 这个29就是为了方便对int类型数值进程分割。
    10. private static final int COUNT_BITS = 29;
    11. // 工作线程最大个数
    12. private static final int CAPACITY = (1 << COUNT_BITS) - 1;
    13. 00000000 00000000 00000000 00000001
    14. 00100000 00000000 00000000 00000000
    15. 00011111 11111111 11111111 11111111
    16. // 高3位记录的线程池状态
    17. // RUNNING:属于正常状态,可以正常接收任务,并且处理任务
    18. private static final int RUNNING = -1 << COUNT_BITS;
    19. // SHUTDOWN:处理关闭状态,不接收新任务,但是会处理完线程池中之前接收到的任务
    20. private static final int SHUTDOWN = 0 << COUNT_BITS;
    21. // STOP:处理停止状态,不接收新任务,中断正在执行的任务,之前接收的任务也不去处理
    22. private static final int STOP = 1 << COUNT_BITS;
    23. // TIDYING:过渡状态,是从SHUTDOWN或者STOP转换过来的,工作线程都凉凉了,任务也处理完了(了解)
    24. private static final int TIDYING = 2 << COUNT_BITS;
    25. // TERMINATED:终止状态,从TIDYING执行了terminated方法,转换到TERMINATED(了解)
    26. private static final int TERMINATED = 3 << COUNT_BITS;
    27. // 线程池最大允许有多少个工作线程?

    二、线程池的7个参数

    JUC包下的Executors提供了几种JDK自带的线程池构建方式,可以不用手动去构建。

    but,不推荐使用上述方式去构建线程池。

    为了让咱们可以更好的去控制线程池对象,推荐直接采用new的方式去构建ThreadPoolExecutor

    为了去手动new,就要对ThreadPoolExecutor对象的有参构造参数有掌握

    1. // 线程池的7个参数
    2. // 核心线程数,默认不会被干掉
    3. public ThreadPoolExecutor(int corePoolSize,
    4. // 最大线程数,在核心线程数之外的非核心线程个数,
    5. // maximumPoolSize - corePoolSize = 非核心线程个数
    6. int maximumPoolSize,
    7. // 最大空闲时间(一般针对非核心线程)
    8. long keepAliveTime,
    9. // 时间单位(一般针对非核心线程)
    10. TimeUnit unit,
    11. // 工作队列,核心线程个数满足corePoolSize,再来任务,就扔到工作队列
    12. BlockingQueue workQueue,
    13. // 线程工厂,为了更方面后期出现故障时,定位问题,在构建线程时,指定好
    14. // 一些细节信息,给个名字~~
    15. ThreadFactory threadFactory,
    16. // 拒绝策略,核心满了,队列满了,非核心到位了,再来任务走拒绝策略
    17. RejectedExecutionHandler handler) {
    18. // 省略部分代码
    19. }
    20. // 在线程池中,核心线程和非核心线程都属于工作线程。
    21. new ThreadPoolExecutor(5,7,1,TimeUnit.SECOND,new ArrayBlockingQueue<>(10),线程工厂,拒绝策略)

    三、线程池的执行流程(execute方法)

    要将任务提交给线程池执行时,需要调用线程池的execute方法,将任务传递进去

    1. // 线程池执行任务的核心方法
    2. // 线程池的执行流程图就是基于这个方法画出来的。
    3. public void execute(Runnable command) {
    4. // 非空判断~
    5. if (command == null) throw new NullPointerException();
    6. // 获取ctl属性,命名c
    7. int c = ctl.get();
    8. // =======================创建核心线程===============================
    9. // workerCountOf(c):获取工作线程的个数
    10. // 如果工作线程数 小于 核心线程数
    11. if (workerCountOf(c) < corePoolSize) {
    12. // 通过addWorker方法创建 核心线程。
    13. // command是传递的任务,true代表核心线程
    14. // 如果创建工作线程成功:返回true
    15. // 如果创建工作线程失败:返回false
    16. if (addWorker(command, true))
    17. // 告辞,任务交给线程执行了
    18. return;
    19. // 到这,说明失败了,走下一流程
    20. c = ctl.get();
    21. }
    22. // =======================将任务添加到工作队列===============================
    23. // 判断线程池状态是不是RUNNING
    24. // 只有状态正常,才会走offer方法(添加任务到工作队列)
    25. // offer方法,添加成功返回true,添加失败返回false
    26. if (isRunning(c) && workQueue.offer(command)) {
    27. // 任务已经放到工作队列了。
    28. // 任务放到队列后,要重新检查一次,重新拿到ctl
    29. int recheck = ctl.get();
    30. // 判断线程池状态是不是RUNNING
    31. // 如果状态不是RUNNING,将刚刚放进来的任务从队列移除~~~
    32. if (!isRunning(recheck) && remove(command))
    33. // 执行拒绝策略。
    34. reject(command);
    35. // 如果状态是RUNNING,同时工作线程数是0个。
    36. else if (workerCountOf(recheck) == 0)
    37. // 如果没有工作线程,添加一个非核心,空任务线程去解决掉工作队列没人处理的任务
    38. addWorker(null, false);
    39. }
    40. // =======================创建非核心线程===============================
    41. // addWorker创建非核心线程,如果成功,返回true
    42. // 如果添加非核心线程失败,执行reject方法
    43. else if (!addWorker(command, false))
    44. // =======================拒绝策略===============================
    45. // 执行拒绝策略
    46. reject(command);
    47. }

    四、线程池如何添加工作线程(addWorker方法)

    前面就提到了添加工作线程,查看addWorker的源码

    1. // 添加工作线程的方式
    2. private boolean addWorker(Runnable firstTask, boolean core) {
    3. // 判断是否可以正常添加工作线程
    4. retry:
    5. for (;;) {
    6. //=======================判断线程池状态===================================
    7. // 获取ctl属性
    8. int c = ctl.get();
    9. // 获取线程状态
    10. int rs = runStateOf(c);
    11. // 如果线程池状态不是RUNNING(正常到这,就不能添加工作线程了)
    12. if (rs >= SHUTDOWN &&
    13. // 可能出现工作队列有任务,但是线程池没有工作线程
    14. // 需要添加一个非核心空任务的工作线程,这里就是。
    15. // 只要一个不满足,就打破了之前效果,不需要添加
    16. !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
    17. // 当前状态不允许添加工作线程,返回false
    18. return false;
    19. for (;;) {
    20. //=======================判断工作线程个数===================================
    21. // 阿巴阿巴~~
    22. int wc = workerCountOf(c);
    23. // 如果工作线程个数,大于线程池允许的工作线程最大数,满足,告辞,不能添加
    24. if (wc >= CAPACITY ||
    25. // core:true添加核心,false添加非核心
    26. // 根据不同情况,比较不同的参数
    27. // 如果线程数不满足new线程池时的参数,告辞,不能添加
    28. wc >= (core ? corePoolSize : maximumPoolSize))
    29. // 不能添加。
    30. return false;
    31. // 没进到if,可以添加工作线程
    32. // 基于CAS的方式,对ctl进行 + 1操作
    33. // 线程A,线程B
    34. if (compareAndIncrementWorkerCount(c))
    35. // 如果CAS成功,直接跳出外层循环,执行添加工作线程,并且启动工作线程
    36. break retry;
    37. // 重新获取ctl
    38. c = ctl.get();
    39. // 如果线程池状态不变,直接正常重新执行内部循环
    40. if (runStateOf(c) != rs)
    41. // 如果线程池状态变了,重新判断线程池状态,走外部循环。
    42. continue retry;
    43. }
    44. }
    45. // 真正的去添加工作线程,并且启动工作线程
    46. // 声明了三个标识
    47. // workerStarted:工作线程启动了吗?
    48. boolean workerStarted = false;
    49. // workerAdded:工作线程创建了吗?
    50. boolean workerAdded = false;
    51. // Worker:工作线程
    52. Worker w = null;
    53. try {
    54. // 创建工作线程
    55. w = new Worker(firstTask);
    56. // 获取到工作线程中的Thread
    57. final Thread t = w.thread;
    58. // 这个if几乎不会发生,除非你写ThreadFactory有问题,或者是业务没通过
    59. if (t != null) {
    60. try {
    61. // 拿到线程池状态
    62. int rs = runStateOf(ctl.get());
    63. // rs < SHUTDOWN:满足,代表线程池状态ok是RUNNING
    64. if (rs < SHUTDOWN ||
    65. // 状态是SHUTDOWN,任务是空,阿巴阿巴~~~
    66. (rs == SHUTDOWN && firstTask == null)) {
    67. // 除非你写ThreadFactory有问题,或者是业务没通过,线程运行了,这里就扔异常
    68. if (t.isAlive())
    69. throw new IllegalThreadStateException();
    70. // 添加Worker工作线程到workers。
    71. // private final HashSet workers = new HashSet();
    72. workers.add(w);
    73. // 在记录工作线程的历史最大值
    74. int s = workers.size();
    75. if (s > largestPoolSize)
    76. largestPoolSize = s;
    77. // 工作线程创建了!!!
    78. workerAdded = true;
    79. }
    80. }
    81. // 添加工作线程成功,那就启动线程
    82. if (workerAdded) {
    83. t.start();
    84. // 工作线程启动了!!!
    85. workerStarted = true;
    86. }
    87. }
    88. } finally {
    89. // 工作线程启动失败
    90. if (!workerStarted)
    91. // 将Worker从HashSet移除,并且对ctl - 1
    92. addWorkerFailed(w);
    93. }
    94. return workerStarted;
    95. }

    五、工作线程执行任务方式(runWorker方法)

    runWorker其实就是启动工作线程做的事。

    核心就是Worker对象在调用了start方法后,会执行Worker类中的run方法。

    1. // 工作线程干活的方法
    2. final void runWorker(Worker w) {
    3. // 获取当前线程
    4. Thread wt = Thread.currentThread();
    5. // 第一次从Worker中拿任务,赋值给task属性
    6. Runnable task = w.firstTask;
    7. // 将Worker中的任务置位空
    8. w.firstTask = null;
    9. try {
    10. // task != null:说明任务是最开始addWorker传递过来的。
    11. // (task = getTask()) != null:说明任务是从工作队列中获取到的
    12. while (task != null || (task = getTask()) != null) {
    13. // 删掉了部分catch~~~
    14. try {
    15. // 前置增强(勾子函数)
    16. beforeExecute(wt, task);
    17. try {
    18. // 执行任务
    19. task.run();
    20. } finally {
    21. // 后置增强(勾子函数)
    22. afterExecute(task, thrown);
    23. }
    24. } finally {
    25. // 任务值为空
    26. task = null;
    27. // 记录当前Worker完成了一个任务
    28. w.completedTasks++;
    29. }
    30. }
    31. }
    32. }

    六、工作线程获取任务方式&结束工作线程方式(getTask方法)

    Worker工作线程除了addWorker自带的任务之外,就是从工作队列获取

    1. // 工作线程从工作队列获取任务
    2. private Runnable getTask() {
    3. // 超时了咩? 默认没!
    4. boolean timedOut = false;
    5. // 死循环。
    6. for (;;) {
    7. // 拿到ctl,获取线程池状态
    8. int c = ctl.get();
    9. int rs = runStateOf(c);
    10. // 第一个:线程池状态是SHUTDOWN,并且阻塞队列为空
    11. // 第二个:线程池是STOP状态
    12. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
    13. // ctl - 1,当前线程可以销毁了~
    14. decrementWorkerCount();
    15. return null;
    16. }
    17. // 线程池状态正常。
    18. // 重新获取工作线程个数
    19. int wc = workerCountOf(c);
    20. // 回头看!
    21. // allowCoreThreadTimeOut:默认为false,如果为true,核心线程也要超时
    22. // wc > corePoolSize:工作线程大于核心线程数(现在有非核心线程)
    23. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    24. // 如果工作线程大于最大线程数,基本不会满足,就是false
    25. // (timed && timedOut):true代表之前走了poll,但是没拿到任务
    26. if ((wc > maximumPoolSize || (timed && timedOut))
    27. // 工作线程至少2个
    28. // 工作队列为空
    29. && (wc > 1 || workQueue.isEmpty())) {
    30. // 干掉一个非核心线程。(循环结束拿不到任务即可)
    31. // CAS对ctl - 1,销毁线程
    32. if (compareAndDecrementWorkerCount(c))
    33. return null;
    34. continue;
    35. }
    36. try {
    37. // 基于阻塞队列的poll或者是take获取任务
    38. // poll可以指定等待多久拿任务(非核心线程)
    39. // take死等任务。(核心线程)
    40. Runnable r = timed ?
    41. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
    42. workQueue.take();
    43. // 如果拿到任务,正常返回
    44. if (r != null)
    45. return r;
    46. // 没拿到任务,timeOut是true
    47. timedOut = true;
    48. } catch (InterruptedException retry) {
    49. timedOut = false;
    50. }
    51. }
    52. }

    一般线程池参数设置多少?

    这个没有固定的公式,虽然很多书上有设置公式,但是不一定适合你的业务。

    如果要搞一个合适的线程池参数设置,你需要去动态的监控线程池,并且可以动态修改。

    只能根据细粒度的测试慢慢调试一个比较合适的参数。

    线程池提供了核心属性的get方法,还有核心参数动态set的功能。

  • 相关阅读:
    自动化部署的艺术:Conda包依赖管理的终极指南
    Elasticsearch:使用 docker compose 来实现热温冷架构的 Elasticsearch 集群
    Linux开发工具之编辑器vim
    VS Code实用插件推荐
    网络攻防中黑客常用技术跨站脚本技术:html, js 的自解码机制,解码顺序,浏览器urlencode 的影响,测试样例
    SpringBoot请求参数与响应返回值,ResponseEntity<T>自定义响应
    BizWorks 应⽤平台基于 KubeVela 的实践
    云服务器ECS的简介
    重要公告|投票委托已经上线,应该如何选择社区代表?
    c#简易学生管理系统
  • 原文地址:https://blog.csdn.net/weixin_39372979/article/details/126717504