• AQS源码解析 7.共享模式_CyclicBarrier重复屏障


    AQS源码解析 —共享模式_CyclicBarrier重复屏障

    简介

    CyclicBarrier:循环屏障、循环栅栏,用来进行线程协作,等待线程满足某个计数。构造时设置『计数个数』,每个线程执行到某个需要“同步”的时刻调用 await() 方法进行等待,当等待的线程数满足『计数个数』时,继续执行。

    应用:可以实现多线程中,某个任务在等待其他线程执行完毕以后触发。

    CyclicBarrier 与 CountDownLatch 的异同

    • 两者都能实现阻塞一组线程等待被唤醒;
    • 前者是最后一个线程到达时自动唤醒;
    • 后者是通过显式地调用 countDown() 实现的;
    • 前者是通过重入锁及其条件锁实现的,后者是直接基于 AQS 实现的;
    • 前者具有“代”的概念,可以重复使用,后者只能使用一次;
    • 前者只能实现多个线程到达栅栏处一起运行;
    • 后者不仅可以实现多个线程等待一个线程条件成立,还能实现一个线程等待多个线程条件成立(详见 CountDownLatch 那章使用案例);

    CountDownLatch 主要区别:CyclicBarrier 是可以重用的。

    • CountDownLatch 是一次性的,循环多次使用,每次都需要重新 new 一次(因为会清空计数),无法重用

      ExecutorService service = Executors.newFixedThreadPool(2);
      for (int i = 0; i < 3; i++) {
          CountDownLatch latch = new CountDownLatch(2); // 每循环一次就需要重新 new 一次
          service.submit(() -> {
              log.debug("task1 start...");
              sleep(1);
              latch.countDown();
          });
          service.submit(() -> {
              log.debug("task2 start...");
              sleep(2);
              latch.countDown();
          });
          try {
              latch.await();
          } catch (InterruptedException e) {
              e.printStackTrace();
          }
          log.debug("task1 task2 finish...");
      }
      service.shutdown();
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
    • CyclicBarrier,可以重用

      ExecutorService service = Executors.newFixedThreadPool(2);
          // 可重用 下次被调用不会清空计数 恢复为 2
          CyclicBarrier barrier = new CyclicBarrier(2, () -> { // 个数为2时才会继续执行
              System.out.println("task1, task2 finish..."); // 这里的任务执行时机是在其余的所有任务都执行完成后
          });
          for (int i = 0; i < 3; i++) {
              service.submit(() -> {
                  log.debug("task1 begin...");
                  sleep(1);
                  try {
                      barrier.await(); // 2-1=1
                  } catch (InterruptedException | BrokenBarrierException e) {
                      e.printStackTrace();
                  }
              });
              service.submit(() -> {
                  log.debug("task2 begin...");
                  sleep(2);
                  try {
                      barrier.await(); // 1-1=0
                  } catch (InterruptedException | BrokenBarrierException e) {
                      e.printStackTrace();
                  }
              });
          }
          service.shutdown();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27

      执行结果

      21:20:03.323 c.TestCyclicBarrier [pool-1-thread-2] - task2 begin...
      21:20:03.323 c.TestCyclicBarrier [pool-1-thread-1] - task1 begin...
      21:20:03.323 c.TestCyclicBarrier [pool-1-thread-3] - task1 begin...
      21:20:04.340 c.TestCyclicBarrier [pool-1-thread-1] - task1, task2 finish...
      21:20:04.340 c.TestCyclicBarrier [pool-1-thread-1] - task2 begin...
      21:20:04.340 c.TestCyclicBarrier [pool-1-thread-3] - task1 begin...
      21:20:05.347 c.TestCyclicBarrier [pool-1-thread-3] - task1, task2 finish...
      21:20:05.347 c.TestCyclicBarrier [pool-1-thread-3] - task2 begin...
      21:20:07.356 c.TestCyclicBarrier [pool-1-thread-3] - task1, task2 finish...
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

      可以看到 只有 task1 和 task2 这两个任务都完成时,才会打破 CyclicBarrier 的屏障,执行 await() 后续代码逻辑。并且屏障可以重用。

    工作原理图

    image-20221120145216352

    入门案例

    public class CyclicBarrierDemo {
        /**
         * 案例:模拟英雄联盟 游戏开始逻辑
         */
        public static void main(String[] args) {
            // 第1步:定义5个英雄
            String[] heroes = { "青钢影", "武器大师", "剑姬", "腕豪", "剑魔" };
            // 第2步:创建固定的线程池,线程数量为5
            ExecutorService service = Executors.newFixedThreadPool(5);
            // 第3步:创建barrier,parties 设置为5
            CyclicBarrier barrier = new CyclicBarrier(5);
            // 第4步:通过for循环开启5任务,模拟开始游戏,传递给每个任务 英雄名称和barrier
            for (int i = 0; i < 5; i++) {
                service.execute(new Player(heroes[i], barrier));
            }
            service.shutdown();
        }
    
        static class Player implements Runnable {
            private String hero;
            private CyclicBarrier barrier;
    
            public Player(String hero, CyclicBarrier barrier) {
                this.hero = hero;
                this.barrier = barrier;
            }
    
            @Override
            public void run() {
                try {
                    // 每个玩家加载进度不一样,这里使用随机数来模拟
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println(hero + " 加载进度100% 等待其他玩家加载中...");
                    barrier.await();
                    System.out.println(hero + " 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛");
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    执行结果

    武器大师 加载进度100% 等待其他玩家加载中...
    青钢影 加载进度100% 等待其他玩家加载中...
    腕豪 加载进度100% 等待其他玩家加载中...
    剑姬 加载进度100% 等待其他玩家加载中...
    剑魔 加载进度100% 等待其他玩家加载中...
    剑魔 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
    武器大师 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
    剑姬 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
    腕豪 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
    青钢影 加载完成,欢迎来到英雄联盟,点击左侧录制按钮,即可开始录制本局比赛
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    源码解析

    属性及构造方法

    public class CyclicBarrier {  
        
        // 静态内部类 Generation 表示 “代” 这个概念
        // 一开始到来的线程 都会在某一个代中挂起,当最后一个线程到达时,这个代中的线程就可以全部通过了,随后会开启新的一个代。
    	private static class Generation {
            /*
             * 表示当前代是否被打破,如果代被打破,那么再来到这一代的线程,就会直接抛出BrokenException异常,
             * 且在这一代挂起的线程都会被唤醒,然后抛出异常BrokenException。
             *
             * 在await()方法时,正常情况下被阻塞的线程被唤醒后,如果跳出await()就会判断
             * 原代和新代是否是一个,因为最后一个达到的线程会将创建新代。
             */
            boolean broken = false;
        }
    	
        // 因为CyclicBarrier的实现是依赖于Condition等待队列的,而Condition等待队列必须依赖lock
        private final ReentrantLock lock = new ReentrantLock();
    	
        /*
         * 线程挂起实现使用的等待队列,条件:当前代所有线程到位(count = 0),这个等待队列的线程才会被唤醒
         */
        private final Condition trip = lock.newCondition();
    	
        // barrier需要参与进来的线程数量(可以比作『人满发车』)
        private final int parties;
    	
        // 当前代 最后一个到位的线程需要执行的事件
        private final Runnable barrierCommand;
    	
        // 表示barrier对象,当前代
        private Generation generation = new Generation();
    	
        // 当前代还有多少个线程未到位(初始值为parties)
        private int count;
        
        /*
         * 构造器
         * @param parties 表示需要参与的线程数量,每次屏障需要参与的线程数
         * @param barrierAction 当前 “代” 最后一个到位的线程,需要执行的事件(可以为null)
         */
        public CyclicBarrier(int parties, Runnable barrierAction) {
            // parties <= 0 抛出异常
            if (parties <= 0) throw new IllegalArgumentException();
            // 为内部属性赋值
            this.parties = parties;
            this.count = parties; // count的初始值 就是parties,后面当前代每到位一个线程,count--
            this.barrierCommand = barrierAction;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    简单小方法

        /*
         * 开启下一代,当这一代 所有线程到位后(假设barrierCommond不为空,还需要最后一个线程执行完事件), 会调用nextGeneration()开启下一代。
         */
    	private void nextGeneration() {
            // 将在trip条件队列内挂起的线程 全部唤醒
            trip.signalAll();
            // 重置count为parties
            count = parties;  
            // 开启下一代 使用一个新的generation对象 表示新的一代,新的一代和上一代没有任何关系
            generation = new Generation();
        }
    	/*
    	 * 打破barrier屏障,在屏障内部的线程 都会抛出异常
    	 */
        private void breakBarrier() {
            // 将"代"中的broken设置为true,表示这一代是被打破了的,再来到这一代的线程直接抛出异常
            generation.broken = true;
            // 重置count为parties
            count = parties;
            /*
             * 将等待队列中的线程全部唤醒,唤醒后的线程会检查当前代是否是被打破的,
             * 如果是被打破的话,接下来的逻辑和开启下一代唤醒的逻辑不一样。
             */
            trip.signalAll();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    await()

    	public int await() throws InterruptedException, BrokenBarrierException {
            try {
                // 底层调用的是dowait()方法,这里分析一个不带超时时间的dowait()
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); 
            }
        }
                       ||
                       ||
                       \/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    dowait()

    	/*
         * @param timed 表示当前调用await()方法的线程是否指定了超时时长
         * @param nanos 表示线程等待超时时长,如果timed == false,那么nanos == 0
         */
    	private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException { // 抛出3种异常:中断、打破、超时
            // 获取全局锁
            final ReentrantLock lock = this.lock;        
            // 加锁                 
            // 为什么要加锁呢?
            // 因为 barrier的挂起 和 唤醒 依赖的组件是 Condition
            lock.lock();
            try {
                // 获取barrier当前的 “代”
                final Generation g = generation;
                // 如果当前代已经被打破状态,则当前调用await()方法的线程,直接抛出Broken异常
                if (g.broken)
                    throw new BrokenBarrierException();
                // 如果当前线程的中断标志位为true,则打破当前代,然后当前线程抛出中断异常
                if (Thread.interrupted()) {
                    /*
                     * 此方法将代中的broken打破标志设置为true,并重置count,唤醒等待队列的所有节点
                     */
                    breakBarrier();
                    // 抛出中断异常
                    throw new InterruptedException();
                }
                
                /*
                 * 线程执行到这里,说明当前线程中断状态是正常的(false),并且当前“代”的broken为false(未打破状态)
                 */
                
                // 将count - 1 赋值给 index
                int index = --count;
                /*
                 * 条件成立:表示当前线程是最后一个到达barrier的线程
                 */
                if (index == 0) {  // tripped  
                    // ranAction -> true 表示最后一个到达barrier的线程在执行内部的barrierCommand任务时没有抛出异常,否则抛出了异常。
                    boolean ranAction = false;
                    try {
                        // 拿到创建的barrierCommand
                        final Runnable command = barrierCommand;     
                        // barrierCommand不为null,将其执行
                        if (command != null)
                            command.run();
                        // 如果执行barrierCommand => command.run()未抛出异常,线程会执行到这里
                        // 执行完成,设置标记位为true
                        ranAction = true;   
                        /*
                         * 开启新一代
                         * 1.唤醒等待队列内的线程,被唤醒的线程会依次获取到锁(state),然后依次退出await方法
                         * 2.重置count为parties
                         * 3.创建一个新的generation,表示新的一代
                         */
                        nextGeneration();
                        // 返回0,因为当前线程是此代最后一个到达的线程,所以index == 0
                        return 0;
                    } finally {
                        // 如果执行barrierCommand => command.run()出现异常,会进入到这里
                        if (!ranAction)
                            // 打破屏障
                            breakBarrier();
                    }
                }
    			
                /*
                 * 执行到这里,说明当前线程并不是最后一个到达barrier的线程,此时需要进入自旋。
                 */
                
                // 自旋,一直到条件满足 或者 当前代被打破、线程被中断、等待超时
                for (;;) {
                    try {
                        // 条件成立:说明当前线程是不指定超时时间的
                        if (!timed)
                        	// 当前线程会释放掉lock,然后进入等待队列的尾部,然后挂起 等待被唤醒
                            trip.await();
                        // 响应超时时间
                        else if (nanos > 0L)
                            // 调用awaitNanos方法(带超时的阻塞)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                        /*
                         * 抛出中断异常,会进来这里
                         * 什么时候会抛出中断异常呢?
                         * Node节点在等待队列内收到中断信号,会抛出中断异常
                         */
                        /*
                         * 条件1:g == generation 成立,说明当前代并没有变化
                         * 条件2:!g.broken,当前代如果没有被打破,那么当前线程就去打破,并且抛出异常
                         */ 
                        if (g == generation && ! g.broken) {
                            // 打破barrier
                            breakBarrier();
                            // 抛出中断异常
                            throw ie;
                        } else {
    						/*
    						 * 执行到else有几种情况?
    						 * 1.代发生了变化,这个时候就不需要抛出中断异常了,因为代已经更新了,这里唤醒后就走正常逻辑了,只不过设置下中断标记
    						 * 2.代未发生变化,但是代被打破了,此时也不用返回中断异常,执行到下面的时候会抛出brokenBarrier异常,也记录下中断标志位
    						 */
                            Thread.currentThread().interrupt();
                        }
                    }
                    /*
                     * 唤醒后执行到这里 有几种情况?
                     * 1.正常情况,当前barrier开启了新的一代
                     * 2.当前generation被打破,此时也会唤醒所有在trip等待队列上挂起的线程
                     * 3.当前线程在等待队列中超时,然后主动转移到同步队列,然后获取到锁 唤醒
                     */      
                    // 表示当前代已经被打破
                    if (g.broken)
                        // 线程唤醒后依次抛出BrokenBarrier异常
                        throw new BrokenBarrierException();
                    // 条件成立:说明当前线程挂起期间,最后一个线程到位了,然后触发了开启新一轮的逻辑,此时唤醒等待队列中的线程
                    // 这是一次正常的线程被唤醒后退出的逻辑
                    if (g != generation)
                        return index;
                    // 超时判断
                    if (timed && nanos <= 0L) {
                        // 打破barrier
                        breakBarrier();
                        // 抛出超时异常
                        throw new TimeoutException();
                    }
                }
            } finally {
                // 解锁
                lock.unlock();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133

    总结

    • CyclicBarrier 有一个 “代” 的概念,一开始到来的线程,都会在某一个代中挂起,当最后一个线程到达时,这个代中的线程就可以全部通过了,随后会开启新的一个代。
    • CyclicBarrier 会使一组线程阻塞在 await() 处,当最后一个线程到达时唤醒(只是从条件队列转移到 AQS 队列中)前面的线程大家再继续往下走;
    • CyclicBarrier 不是直接使用 AQS 实现的一个同步器,是基于 Lock 和 Condition 的一个实践案例,实现整个同步逻辑;
    • 整体来说,只有一个方法 dowait() 。



    参考

  • 相关阅读:
    HarmonyOS USB DDK助你轻松实现USB驱动开发
    CCF CSP认证 历年题目自练 Day20
    PCBA涂覆三防漆的工作需要注意什么?
    【信创】麒麟v10(arm)-mysql8-mongo-redis-oceanbase
    ISIS协议的基础配置实验,原来做ISIS基础配置还可以这么有趣
    binder通信之Messenger介绍
    19 | spark 统计 每列的数据非缺失值
    主流的开发语言和开发环境介绍
    MYSQL多表联查on和where的区别
    umich cv-6-1 循环神经网络基本知识
  • 原文地址:https://blog.csdn.net/weixin_53407527/article/details/127949016