• 25.CyclicBarrire的功能和作用


    CyclicBarrire的意思是可循环(Cyclic)使用的屏障Barrire,主要作用是让一组线程达到一个屏障(也可以称为同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才能继续往下执行,线性进入屏障通过CyclicBarrire的await()方法实现的。

    1 基本使用

    我们先写个例子看一下如何使用CyclicBarrire。

    1. public class CyclicBarrierExample {
    2. public static void main(String[] args) {
    3. int parties = 4;
    4. CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
    5. System.out.println("所有线程执行完毕,继续处理其他任务");
    6. });
    7. for (int i = 0; i < parties; i++) {
    8. new ImportDataTask(barrier).start();
    9. }
    10. }
    11. static class ImportDataTask extends Thread {
    12. private CyclicBarrier cyclicBarrier;
    13. public ImportDataTask(CyclicBarrier cyclicBarrier) {
    14. this.cyclicBarrier = cyclicBarrier;
    15. }
    16. @Override
    17. public void run() {
    18. try {
    19. Thread.sleep(1000);
    20. System.out.println("线程 " + Thread.currentThread().getName() + "数据导入完毕,等待其他线程");
    21. cyclicBarrier.await();
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. }
    25. }
    26. }
    27. }

    该代码做的事情是:构建一个要求4个线程参与的CyclicBarrier实例,定义四个线程分别执行Writer写入,每个Writer线程执行写入完成后,调用cyclicBarrier.await()阻塞线程。当四个线程都调用await()方法之后,这四个线程都会被唤醒继续往下执行。

    其中CyclicBarrier的构造参数代表参与的线程数量,当有线程调用await()方法时先阻塞线程,只有达到该数量的线程都调用await()方法后,这些线程才全部被唤醒。

    我们通过下面的图示来进一步理解:

    2 实现原理

    CyclicBarrier包含了两层意思,第一个是前面说的屏障点,线程调用await()方法都会阻塞再屏障点,知道所有线程都达到屏障点再放行。第二个层面是Cyclic循环,当所有线程通过当前屏障点之后 ,又可以进入下一轮的屏障点进行等待,可以不断循环。

    在CyclicBarrier中定义了两个int类型的变量,分别是parties和count,这两个变量的作用如下:

    • parties表示每次要求达到屏障点的线程数,只有满足指定数量的线程,所有线程才会被唤醒。

    • count用来实现内部的计数器,初始值就是parties,后续再每个线程调用await()方法时,会对count--,当count=0时会唤醒所有的线程。

    以下是CyclicBarrier中定义的成员变量,可以看到,内部使用了重入锁和Condition,也就是说CyclicBarrier中的线程阻塞和唤醒是基于Condition实现的。

    1. public class CyclicBarrier {
    2. private static class Generation {
    3. boolean broken = false;
    4. }
    5. private final ReentrantLock lock = new ReentrantLock();
    6. private final Condition trip = lock.newCondition();
    7. private final int parties;
    8. private final Runnable barrierCommand;
    9. private Generation generation = new Generation();
    10. private int count;

    另外,CyclicBarrier有一个静态内部类Generation,该类的对象代表屏障点当前generation(代),每次当所有线程通过屏障点后,表示当前generation已经过去了,会进入下一个generation,CyclicBarrier用其实现循环等待。

    await方法的代码如下:

    1. public int await() throws InterruptedException, BrokenBarrierException {
    2. try {
    3. return dowait(false, 0L);
    4. } catch (TimeoutException toe) {
    5. throw new Error(toe); // cannot happen
    6. }
    7. }
    8. private int dowait(boolean timed, long nanos)
    9. throws InterruptedException, BrokenBarrierException,
    10. TimeoutException {
    11. final ReentrantLock lock = this.lock;
    12. lock.lock();//获得重入锁
    13. try {
    14. final Generation g = generation;
    15. //确认当前generation的barrier是否失效
    16. if (g.broken)
    17. throw new BrokenBarrierException();
    18. if (Thread.interrupted()) {
    19. breakBarrier();
    20. throw new InterruptedException();
    21. }
    22. //统计已经到达当前generation的线程数量
    23. int index = --count;
    24. //如果是0,表示所有线程都达到了屏障点
    25. if (index == 0) { // tripped
    26. boolean ranAction = false;
    27. try {
    28. final Runnable command = barrierCommand;
    29. if (command != null)
    30. //如果CyclicBarrier的回调不为空,则直接触发回调
    31. command.run();
    32. ranAction = true;
    33. //进入下一个屏障周期
    34. nextGeneration();
    35. return 0;
    36. } finally {
    37. //如果执行屏障点回调任务失败,则将屏障点失效
    38. if (!ranAction)
    39. breakBarrier();
    40. }
    41. }
    42. // 循环等待,直到所有线程达到屏障点,或者屏障点失效,线程中断、等待超时
    43. for (;;) {
    44. try {
    45. //是否带有等待超时时间,如果没有,则直接调用await()方法阻塞当前线程
    46. if (!timed)
    47. trip.await();
    48. else if (nanos > 0L)
    49. //否则采用超时等待
    50. nanos = trip.awaitNanos(nanos);
    51. } catch (InterruptedException ie) { //被其他线程通过interrupt()方法唤醒
    52. //如果是当前generation且没有被broken,则让屏障失效并抛出异常
    53. if (g == generation && ! g.broken) {
    54. breakBarrier();
    55. throw ie;
    56. } else {
    57. Thread.currentThread().interrupt();
    58. }
    59. }
    60. //有任何一个线程被中断时,都会调用breakBarrier()方法,而在该方法中会唤醒所有处于await()阻塞情况下的线程。
    61. //如果其他线程被唤醒,那么也需要抛出异常
    62. if (g.broken)
    63. throw new BrokenBarrierException();
    64. //被唤醒的generation和当前的不同,不做处理
    65. if (g != generation)
    66. return index;
    67. //如果在等待超时之后被唤醒,说明还有线程没有达到屏障点,则让屏障点失效
    68. if (timed && nanos <= 0L) {
    69. breakBarrier();
    70. throw new TimeoutException();
    71. }
    72. }
    73. } finally {
    74. lock.unlock();
    75. }
    76. }

    上面代码较长,整体逻辑如下:

    • 正常情况下,线程调用cyclicBarrier.await()方法直接阻塞当前线程,所以在dowait()方法中调用trip.await()方法阻塞当前线程。

    • 每个线程在调用cyclicBarrier.await()方法时,都会在代码中通过int index=-count对计数器进行递减,如果为0,则可以直接唤醒所有线程(nextGeneration()),并且如果异步回调任务barrierCommand不为空,则会同时执行该任务。

    1. private void nextGeneration() {
    2. // signal completion of last generation
    3. trip.signalAll();
    4. // set up next generation
    5. count = parties;
    6. generation = new Generation();
    7. }
    • 被trip.await()方法阻塞的线程,除了可以通过trip.signalAll()方法唤醒外,还可以被interrupt()方法唤醒的,这属于异常唤醒。被环形的通过g == generation && ! g.broken判断是否是当前generation,以及屏障点是否失效。如果没有失效,则调用breakBarrier()方法让屏障点失效。

    1. private void breakBarrier() {
    2. generation.broken = true;
    3. count = parties;
    4. trip.signalAll();
    5. }

    被中断的线程调用breakBarrier()方法,表示让当前屏障点失效,并且唤醒所有被阻塞的线程。接着被唤醒的线程需要通过if(g.broken)判断屏障点是否失效,如果是则意味着所有被唤醒的线程都要抛出异常。

    • 最后一种情况,被唤醒的线程可能会调用带有超时机制的阻塞方法 nacos=trip.awaitNanos(nacos),所以如果超过指定时间后相关线程还没有到达当前generation的屏障点,则同样可以通过breakBarrier()让屏障点失效。

    最后还有一个reset()方法再看一下。

    1. public void reset() {
    2. final ReentrantLock lock = this.lock;
    3. lock.lock();
    4. try {
    5. breakBarrier(); //中断当前的 generation
    6. nextGeneration(); // 开始新的 generation
    7. } finally {
    8. lock.unlock();
    9. }
    10. }

    这里的功能非常简单就是把原本阻塞在屏障点中的线程全部唤醒,然后进入下一个generation周期。

    整体来看,CyclicBarrier的代码非常精简,实现的逻辑也不复杂,核心思想是通过Condition实现指定条件的线程等待和唤醒。通过CyclicBarrier的源码分析,我们可以更好的理解Condition作为基础组件如何灵活应用在不同的场景中。

  • 相关阅读:
    报错:HikariPool-1 - Exception during pool initialization.
    STM32 从0开始移植FreeRTOS
    彻底掌握Makefile(二)
    redis悲观锁和乐观锁
    led灯珠型号及使用参数
    CPO和LPO谁将主宰未来数据中心光互连?
    C++面向对象 _ 成绩单系统
    为了保证openGauss的正确安装,请首先对主机环境进行配置
    python之字典的用法
    小白必看!企业开源知识库管理系统优势和选择
  • 原文地址:https://blog.csdn.net/xueyushenzhou/article/details/126943904