CyclicBarrier也是一个多线程通信工具,它支持一组线程都到达一个点之后再继续执行;它的内部实现是通过ReentrantLock和Condition来实现的,接下来看一下它的源码分析;
// 可重入锁
private final ReentrantLock lock = new ReentrantLock();
// 条件队列
private final Condition trip = lock.newCondition();
// 参与的线程数量,它的值不会改变,当循环使用CyclicBarrier时,会将它的值赋值给count;
private final int parties;
// 由最后一个进入 barrier 的线程执行的操作
private final Runnable barrierCommand;
// 当前代
private Generation generation = new Generation();
// 正在等待进入屏障的线程数量
private int count;
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
它提供了两个有参构造器,第一个构造器允许传入一个int类型的个数,第二个构造器多了一个Runnable,它个Runnable是做什么 的?
CyclicBarrier支持当一组线程到达一个执行点之后,先执行Runnable,然后再让各自线程开始执行;
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 直接调了dowait,传入false和nanos
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
// 保存当前代,后面用来判断generation是否发生改变;
final Generation g = generation;
// 默认为false,如果为true,则表示屏障被破坏
if (g.broken)
throw new BrokenBarrierException();
// 当前await线程被中断
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 减少正在等待进入屏障的线程数量
int index = --count;
// 如果为0,表示屏障的线程数量为0,可以唤醒所有线程了
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 如果有传入Runnable,则先执行runnable的run方法(这是里执行run方法,并不是start)
if (command != null)
command.run();
ranAction = true;
// 设置下一代
nextGeneration();
return 0;
} finally {
// 如果被抛出异常,则标识屏障已被破坏
if (!ranAction)
breakBarrier();
}
}
// 当前count!=0,则进入线程需要等待;
for (;;) {
try {
if (!timed)
// 等待被唤醒
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 等于当前代并且屏障没有被损坏
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
// 这里判断g!=generation才会退出循环,那在count==0的线程的时候,会进行signalAll,唤醒所有的await线程;
// 那这里会出现并发总是吗?
// 不会的,因为在进入dowait已经加了lock,虽然signalAll,但是它们还是会去排除等待;
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
// 损坏当前屏障,并且唤醒所有(只有拥有锁的时候才会调用)的线程
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
await整体是通过ReentrantLock和Condition进行控制,进入方法时进行lock,如果count!=0,则进行condtion.await();直到最后一个线程进来时–count为0,则进行conditon.signalAll;