• 看明白Java多线程:CountDownLatch和CyclicBarrier


    1、CountDownLatch

    主要用于:

    一个主线程等待多个子线程都执行完成后,才能继续执行的场景
    适用于框架启动前,进行多项初始化工作等场景:多项初始化工作都完成后,才能开始启动框架处理。
    或者用于统计多个线程是否都已执行完成:kafka消费者多线程执行消费,多个子线程并行执行,只有当子线程都执行完成后,才能提交offset

    1. package com.autoee.demo.javabase.mutiThread.CountDownLatch;
    2. import cn.hutool.core.thread.ThreadUtil;
    3. import cn.hutool.core.util.RandomUtil;
    4. import lombok.SneakyThrows;
    5. import java.util.concurrent.CountDownLatch;
    6. /**
    7. * Title:
    8. * Desc:
    9. * Date: 2022-8-26
    10. * @author Double
    11. * @version 1.0.0
    12. */
    13. public class CountDownLatchTest1 {
    14. // CountDownLatch主要用来解决:
    15. // 一个主线程等待多个子线程都执行完成后,才能继续执行的场景
    16. // 适用于框架启动前,进行多项初始化工作等场景:多项初始化工作都完成后,才能开始启动框架处理。
    17. // 或者用于统计多个线程是否都已执行完成:kafka消费者多线程执行消费,多个子线程并行执行,只有当子线程都执行完成后,才能提交offset
    18. public static void main(String[] args) throws InterruptedException {
    19. // CountDownLatch中的个数要和实际执行的子线程数一致,否则执行过程就混乱了。
    20. int threadCount = 5;
    21. CountDownLatch tCountDownLatch = new CountDownLatch(threadCount);
    22. for (int i = 0; i < threadCount; i++) {
    23. new Thread("thread" + i) {
    24. @Override
    25. public void run() {
    26. String name = Thread.currentThread().getName();
    27. ThreadUtil.sleep(RandomUtil.randomInt(100) * 100);
    28. System.out.println("子线程" + name + "执行完成。");
    29. // 当前子线程执行完成,减1
    30. tCountDownLatch.countDown();
    31. }
    32. }.start();
    33. }
    34. // 阻塞等待上面的多个线程都执行完成
    35. tCountDownLatch.await();
    36. System.out.println("上面的子线程都已执行完成,主线程开始后续处理。。。");
    37. }
    38. }

    执行结果

    子线程thread3执行完成。
    子线程thread0执行完成。
    子线程thread2执行完成。
    子线程thread4执行完成。
    子线程thread1执行完成。
    上面的子线程都已执行完成,主线程开始后续处理。。。

    2、CyclicBarrier

    主要用于

    多个线程并行执行多个阶段的任务,第1阶段任务所有子线程都执行完成后,所有子线程再开始第2阶段的任务,
    第2阶段任务所有子线程都执行完成后,再进入下一阶段任务...
    每个阶段所有子线程都执行完成后,主线程还可以执行一次该阶段的汇总任务。
    1. package com.autoee.demo.javabase.mutiThread.CyclicBarrier;
    2. import cn.hutool.core.thread.ThreadUtil;
    3. import lombok.SneakyThrows;
    4. import java.util.concurrent.BrokenBarrierException;
    5. import java.util.concurrent.ConcurrentHashMap;
    6. import java.util.concurrent.CountDownLatch;
    7. import java.util.concurrent.CyclicBarrier;
    8. /**
    9. * Title:
    10. * Desc:
    11. * Date: 2022-8-26
    12. * @author Double
    13. * @version 1.0.0
    14. */
    15. public class CyclicBarrierTest1 {
    16. // CyclicBarrier主要用于:
    17. // 多个线程并行执行多个阶段的任务,第1阶段任务所有子线程都执行完成后,所有子线程再开始第2阶段的任务,
    18. // 第2阶段任务所有子线程都执行完成后,再进入下一阶段任务...
    19. // 每个阶段所有子线程都执行完成后,主线程还可以执行一次该阶段的汇总任务。
    20. public static void main(String[] args) {
    21. // 存放每阶段所有子线程的执行结果
    22. ConcurrentHashMap resultMap = new ConcurrentHashMap();
    23. // CyclicBarrier中的parties要和实际执行的子线程数一致,否则执行过程就混乱了。
    24. int threadCount = 3;
    25. // 创建CyclicBarrier,并定义每阶段执行完成后的汇总任务
    26. CyclicBarrier tCyclicBarrier = new CyclicBarrier(threadCount, new Runnable() {
    27. @Override
    28. public void run() {
    29. long id = Thread.currentThread().getId();
    30. System.out.println("【开始】-每个阶段任务所有子线程都执行完成后-汇总线程" + id + "开始-当前阶段的汇总任务...");
    31. resultMap.forEach((k, v) -> {
    32. System.out.println("----------[获取]-" + v);
    33. });
    34. System.out.println("【完成】-每个阶段任务所有子线程都执行完成后-汇总线程" + id + "完成-当前阶段的汇总任务...");
    35. }
    36. });
    37. // 创建多个子线程,并发执行每个阶段的任务
    38. for (int i = 0; i < threadCount; i++) {
    39. new Thread(new Runnable() {
    40. @Override
    41. public void run() {
    42. long id = Thread.currentThread().getId();
    43. String name = Thread.currentThread().getName();
    44. System.out.println("[开始]-" + name + "-第1阶段任务...");
    45. ThreadUtil.sleep(1000 + id);
    46. System.out.println("[完成]-" + name + "-第1阶段任务...");
    47. // 将子线程当前阶段的执行结果放入resultMap,key为线程名称
    48. resultMap.put(name, "子线程-" + name + "-第1阶段-sleep时间=" + (1000 + id));
    49. try {
    50. // 所有子线程等待其他子线程完成第一阶段任务,等都完成后,再进入下一阶段
    51. tCyclicBarrier.await();
    52. } catch (Exception e) {
    53. e.printStackTrace();
    54. }
    55. System.out.println("[开始]-" + name + "-第2阶段任务...");
    56. ThreadUtil.sleep(1000 + id);
    57. System.out.println("[完成]-" + name + "-第2阶段任务...");
    58. // 将子线程当前阶段的执行结果放入resultMap,key为线程名称,覆盖上一阶段的执行结果
    59. resultMap.put(name, "子线程-" + name + "-第2阶段-sleep时间=" + (1000 + id));
    60. try {
    61. // 所有子线程等待其他子线程完成第一阶段任务,等都完成后,再进入下一阶段
    62. tCyclicBarrier.await();
    63. } catch (Exception e) {
    64. e.printStackTrace();
    65. }
    66. }
    67. }, "线程" + (i + 1)).start();
    68. }
    69. System.out.println("上面的子线程不会阻塞当前主线程的执行");
    70. }
    71. }

    执行结果

    上面的子线程不会阻塞当前主线程的执行
    [开始]-线程3-第1阶段任务...
    [开始]-线程2-第1阶段任务...
    [开始]-线程1-第1阶段任务...
    [完成]-线程1-第1阶段任务...
    [完成]-线程2-第1阶段任务...
    [完成]-线程3-第1阶段任务...
    【开始】-每个阶段任务所有子线程都执行完成后-汇总线程14开始-当前阶段的汇总任务...
    ----------[获取]-子线程-线程2-第1阶段-sleep时间=1013
    ----------[获取]-子线程-线程3-第1阶段-sleep时间=1014
    ----------[获取]-子线程-线程1-第1阶段-sleep时间=1012
    【完成】-每个阶段任务所有子线程都执行完成后-汇总线程14完成-当前阶段的汇总任务...
    [开始]-线程3-第2阶段任务...
    [开始]-线程1-第2阶段任务...
    [开始]-线程2-第2阶段任务...
    [完成]-线程1-第2阶段任务...
    [完成]-线程2-第2阶段任务...
    [完成]-线程3-第2阶段任务...
    【开始】-每个阶段任务所有子线程都执行完成后-汇总线程14开始-当前阶段的汇总任务...
    ----------[获取]-子线程-线程2-第2阶段-sleep时间=1013
    ----------[获取]-子线程-线程3-第2阶段-sleep时间=1014
    ----------[获取]-子线程-线程1-第2阶段-sleep时间=1012
    【完成】-每个阶段任务所有子线程都执行完成后-汇总线程14完成-当前阶段的汇总任务...

     如果对您有帮助,请我喝杯咖啡吧!

  • 相关阅读:
    ChatGPT 宕机?OpenAI 将中断归咎于 DDoS 攻击
    数字后端概念——ILM
    Android Binder通信机制学习(二)
    <b><strong>,<i><em>标签的区别
    如何理解与学习数学分析——第二部分——数学分析中的基本概念——第7章——连续性
    Docker数据卷&&自定义Docker镜像
    动态规划之-----年终奖
    负载均衡 dubbo
    三翼鸟:产品不会变,场景实时变
    C语言之qsort()函数的模拟实现
  • 原文地址:https://blog.csdn.net/heishuang/article/details/126546877