• JUC——CyclicBarrier


    CyclicBarrier也是一个多线程通信工具,它支持一组线程都到达一个点之后再继续执行;它的内部实现是通过ReentrantLock和Condition来实现的,接下来看一下它的源码分析;

    1. 构造器和属性

    // 可重入锁
    private final ReentrantLock lock = new ReentrantLock();
    // 条件队列
    private final Condition trip = lock.newCondition();
    // 参与的线程数量,它的值不会改变,当循环使用CyclicBarrier时,会将它的值赋值给count;
    private final int parties;
    // 由最后一个进入 barrier 的线程执行的操作
    private final Runnable barrierCommand;
    // 当前代
    private Generation generation = new Generation();
    // 正在等待进入屏障的线程数量
    private int count;

    public CyclicBarrier(int parties) {
    	this(parties, null);
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
    	if (parties <= 0) throw new IllegalArgumentException();
    	this.parties = parties;
    	this.count = parties;
    	this.barrierCommand = barrierAction;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    它提供了两个有参构造器,第一个构造器允许传入一个int类型的个数,第二个构造器多了一个Runnable,它个Runnable是做什么 的?
    CyclicBarrier支持当一组线程到达一个执行点之后,先执行Runnable,然后再让各自线程开始执行;

    2. await

    public int await() throws InterruptedException, BrokenBarrierException {
    	try {
    		// 直接调了dowait,传入false和nanos
    		return dowait(false, 0L);
    	} catch (TimeoutException toe) {
    		throw new Error(toe); // cannot happen
    	}
    }
    
    private int dowait(boolean timed, long nanos)
    	throws InterruptedException, BrokenBarrierException,
    		   TimeoutException {
    	final ReentrantLock lock = this.lock;
    	// 加锁
    	lock.lock();
    	try {
    		// 保存当前代,后面用来判断generation是否发生改变;
    		final Generation g = generation;
    		// 默认为false,如果为true,则表示屏障被破坏
    		if (g.broken)
    			throw new BrokenBarrierException();
    		// 当前await线程被中断
    		if (Thread.interrupted()) {
    			breakBarrier();
    			throw new InterruptedException();
    		}
    		// 减少正在等待进入屏障的线程数量
    		int index = --count;
    		// 如果为0,表示屏障的线程数量为0,可以唤醒所有线程了
    		if (index == 0) {  // tripped
    			boolean ranAction = false;
    			try {
    				final Runnable command = barrierCommand;
    				// 如果有传入Runnable,则先执行runnable的run方法(这是里执行run方法,并不是start)
    				if (command != null)
    					command.run();
    				ranAction = true;
    				// 设置下一代
    				nextGeneration();
    				return 0;
    			} finally {
    				// 如果被抛出异常,则标识屏障已被破坏
    				if (!ranAction)
    					breakBarrier();
    			}
    		}
    		
    		// 当前count!=0,则进入线程需要等待;
    		for (;;) {
    			try {
    				if (!timed)
    					// 等待被唤醒
    					trip.await();
    				else if (nanos > 0L)
    					nanos = trip.awaitNanos(nanos);
    			} catch (InterruptedException ie) {
    				// 等于当前代并且屏障没有被损坏
    				if (g == generation && ! g.broken) {
    					breakBarrier();
    					throw ie;
    				} else {
    					// We're about to finish waiting even if we had not
    					// been interrupted, so this interrupt is deemed to
    					// "belong" to subsequent execution.
    					Thread.currentThread().interrupt();
    				}
    			}
    			
    			if (g.broken)
    				throw new BrokenBarrierException();
    
    			// 这里判断g!=generation才会退出循环,那在count==0的线程的时候,会进行signalAll,唤醒所有的await线程;
    			// 那这里会出现并发总是吗?
    			// 不会的,因为在进入dowait已经加了lock,虽然signalAll,但是它们还是会去排除等待;
    			if (g != generation)
    				return index;
    
    			if (timed && nanos <= 0L) {
    				breakBarrier();
    				throw new TimeoutException();
    			}
    		}
    	} finally {
    		lock.unlock();
    	}
    }
    
    // 损坏当前屏障,并且唤醒所有(只有拥有锁的时候才会调用)的线程
    private void breakBarrier() {
    	generation.broken = true;
    	count = parties;
    	trip.signalAll();
    }
    
    private void nextGeneration() {
    	// signal completion of last generation
    	trip.signalAll();
    	// set up next generation
    	count = parties;
    	generation = new Generation();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101

    await整体是通过ReentrantLock和Condition进行控制,进入方法时进行lock,如果count!=0,则进行condtion.await();直到最后一个线程进来时–count为0,则进行conditon.signalAll;

  • 相关阅读:
    Unity中神秘的Transform和transform(小写)的关系
    Servlet的生命周期
    Redis专题(六):Redis主从复制、哨兵搭建以及原理
    字符设备驱动框架(字符设备基础一)
    大二Web课程设计期末考试——基于HTML+CSS+JavaScript+jQuery电商类化妆品购物商城
    自动驾驶学习笔记(五)——绕行距离调试
    Cy3-PEG-maleimide,Cy3-聚乙二醇-马来酰亚胺,MAL-PEG-Cy3
    在嵌入式开发中如何提高自己的代码水平
    小米6安装Ubuntu Touch系统也不是很难嘛
    PT_随机变量函数的分布_随机变量线性函数的正态分布
  • 原文地址:https://blog.csdn.net/shixiaoling123/article/details/125462559