• java并发编程 CyclicBarrier详解



    java 并发编程系列文章目录

    1. CyclicBarrier是什么

    java的类注释上描述:一种同步辅助工具,允许一组线程都等待对方到达一个共同的障碍点。CyclicBarrier在涉及固定大小的线程组的程序中很有用,这些线程偶尔必须相互等待。屏障被称为循环的(Cyclic),因为它可以在等待线程释放后重新使用。

    CyclicBarrier支持一个可选的Runnable命令,该命令在参与方中的最后一个线程到达之后,但在释放任何线程之前,每个屏障点运行一次。此屏障操作有助于在任何一方继续之前更新共享状态。
    在这里插入图片描述
    注:最后一个到达的线程负责执行runnable
    示例在java的类注释中已经提供,可以复制粘贴运行看下

    2 核心属性详解

    	private static class Generation {
            boolean broken = false;
        }
    
        //线程同步使用的锁
        private final ReentrantLock lock = new ReentrantLock();
        //等待的条件队列
        private final Condition trip = lock.newCondition();
        //线程同步等待的个数
        private final int parties;
        //全部达到点的执行的runnable任务
        private final Runnable barrierCommand;
        //标识
        private Generation generation = new Generation();
    
        //统计的数量字段
        private int count;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    3 核心方法详解

    3.1 await()

        public int await() throws InterruptedException, BrokenBarrierException {
            try {
                return dowait(false, 0L);
            } catch (TimeoutException toe) {
                throw new Error(toe); // cannot happen
            }
        }
        //timed 是否是具有超时时间的阻塞,nanos超时时间
        private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
                   TimeoutException {
            final ReentrantLock lock = this.lock;
            //先获取到锁
            lock.lock();
            try {
                final Generation g = generation;
    			//如果当前g.broken == true 会抛出异常,这个变成true是栅栏被破坏,什么是破坏,下面可
    			//以看到,执行逻辑下面会看到 
    			//总结下:如果有线程被中断和runnable出现异常和超时时间已经到达没有和其他线程一起唤醒的会破坏这个栅栏
                if (g.broken)
                    throw new BrokenBarrierException();
    			//破坏条件1 线程被中断
                if (Thread.interrupted()) {
                    breakBarrier();
                    throw new InterruptedException();
                }
    			//初始状态下count = parties;此时一个线程await 就减-1
                int index = --count;
                //最后一个线程await count == 0成立 会执行Runnable 执行完成。
                //会nextGeneration 就是唤醒所有的线程 重置count 和generation
                if (index == 0) {  // tripped
                    boolean ranAction = false;
                    try {
                        final Runnable command = barrierCommand;
                        if (command != null)
                            command.run();
                        ranAction = true;
                        nextGeneration();
                        return 0;
                    } finally {
                    	//破坏条件2 runable抛出异常 会执行breakBarrier
                        if (!ranAction)
                            breakBarrier();
                    }
                }
    			//此时是count != 0
                for (;;) {
                    try {
                    	//如果不带超时时间的 直接await等待 带超时时间的 等待nanos时间
                        if (!timed)
                            trip.await();
                        else if (nanos > 0L)
                            nanos = trip.awaitNanos(nanos);
                    } catch (InterruptedException ie) {
                    	//破坏条件3 如果等待的过程中线程被中断 执行 breakBarrier
                        if (g == generation && ! g.broken) {
                            breakBarrier();
                            throw ie;
                        } else {
                            Thread.currentThread().interrupt();
                        }
                    }
    				//如果一些线程正常await() 但是有一个线程执行了breakBarrier 此时唤醒线程
    				//g.broken = true  所以其他线程被唤醒也会抛出该异常
                    if (g.broken)
                        throw new BrokenBarrierException();
                    if (g != generation)
                        return index;
    				//破坏条件4 如果传入的nanos <= 0 会直接执行breakBarrier,一般在await(long 
    				//timeout, TimeUnit unit)里防止传入小于0
                    if (timed && nanos <= 0L) {
                        breakBarrier();
                        throw new TimeoutException();
                    }
                }
            } finally {
                lock.unlock();
            }
        }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    3.1 breakBarrier()

        private void breakBarrier() {
        	//broken =true 让其他线程直接抛出异常
            generation.broken = true;
            //恢复count 
            count = parties;
            //唤醒所有线程
            trip.signalAll();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4 总结

    CyclicBarrier在执行过程中有一个线程如果出现破坏栅栏情况,之后的其他线程的await方法都会抛出异常,CyclicBarrier可以平替CountDownLatch使用,CountDownLatch是使用AQS的公平锁实现,CyclicBarrier使用可重入锁ReentrantLock 实现,且可以支持一个runnable运行。可以通过合适的场景选用合适的类

  • 相关阅读:
    Go 复合类型之切片类型介绍
    Flutter 自定义动画 — 数字递增动画和文字逐行逐字出现或消失动画
    第五十二章 开发自定义标签 - Using csr %CSP.AbstractAtom Write Methods
    “Fatal error compiling: 无效的目标发行版: 11“的解决
    C 语言程序的执行流程
    真正靠谱的手机清理APP与方法,轻松帮你干掉几个G垃圾!
    Airflow用于ETL的四种基本运行模式, 2022-11-20
    小程序制作(超详解!!!)第十六节 小程序的基本架构
    Nginx快速入门教程,域名转发、负载均衡
    MyBatis 的注解实现方法整合(完成数据的增删改查操作)
  • 原文地址:https://blog.csdn.net/weixin_46082526/article/details/132827928