CyclicBarrire的意思是可循环(Cyclic)使用的屏障Barrire,主要作用是让一组线程达到一个屏障(也可以称为同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才能继续往下执行,线性进入屏障通过CyclicBarrire的await()方法实现的。
我们先写个例子看一下如何使用CyclicBarrire。
- public class CyclicBarrierExample {
- public static void main(String[] args) {
- int parties = 4;
- CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
- System.out.println("所有线程执行完毕,继续处理其他任务");
- });
- for (int i = 0; i < parties; i++) {
- new ImportDataTask(barrier).start();
- }
-
- }
-
- static class ImportDataTask extends Thread {
- private CyclicBarrier cyclicBarrier;
-
- public ImportDataTask(CyclicBarrier cyclicBarrier) {
- this.cyclicBarrier = cyclicBarrier;
- }
-
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- System.out.println("线程 " + Thread.currentThread().getName() + "数据导入完毕,等待其他线程");
- cyclicBarrier.await();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
该代码做的事情是:构建一个要求4个线程参与的CyclicBarrier实例,定义四个线程分别执行Writer写入,每个Writer线程执行写入完成后,调用cyclicBarrier.await()阻塞线程。当四个线程都调用await()方法之后,这四个线程都会被唤醒继续往下执行。
其中CyclicBarrier的构造参数代表参与的线程数量,当有线程调用await()方法时先阻塞线程,只有达到该数量的线程都调用await()方法后,这些线程才全部被唤醒。
我们通过下面的图示来进一步理解:
CyclicBarrier包含了两层意思,第一个是前面说的屏障点,线程调用await()方法都会阻塞再屏障点,知道所有线程都达到屏障点再放行。第二个层面是Cyclic循环,当所有线程通过当前屏障点之后 ,又可以进入下一轮的屏障点进行等待,可以不断循环。
在CyclicBarrier中定义了两个int类型的变量,分别是parties和count,这两个变量的作用如下:
parties表示每次要求达到屏障点的线程数,只有满足指定数量的线程,所有线程才会被唤醒。
count用来实现内部的计数器,初始值就是parties,后续再每个线程调用await()方法时,会对count--,当count=0时会唤醒所有的线程。
以下是CyclicBarrier中定义的成员变量,可以看到,内部使用了重入锁和Condition,也就是说CyclicBarrier中的线程阻塞和唤醒是基于Condition实现的。
- public class CyclicBarrier {
- private static class Generation {
- boolean broken = false;
- }
-
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition trip = lock.newCondition();
- private final int parties;
- private final Runnable barrierCommand;
- private Generation generation = new Generation();
-
- private int count;
另外,CyclicBarrier有一个静态内部类Generation,该类的对象代表屏障点当前generation(代),每次当所有线程通过屏障点后,表示当前generation已经过去了,会进入下一个generation,CyclicBarrier用其实现循环等待。
await方法的代码如下:
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
-
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- lock.lock();//获得重入锁
- try {
- final Generation g = generation;
- //确认当前generation的barrier是否失效
- if (g.broken)
- throw new BrokenBarrierException();
-
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- //统计已经到达当前generation的线程数量
- int index = --count;
- //如果是0,表示所有线程都达到了屏障点
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- if (command != null)
- //如果CyclicBarrier的回调不为空,则直接触发回调
- command.run();
- ranAction = true;
- //进入下一个屏障周期
- nextGeneration();
- return 0;
- } finally {
- //如果执行屏障点回调任务失败,则将屏障点失效
- if (!ranAction)
- breakBarrier();
- }
- }
-
- // 循环等待,直到所有线程达到屏障点,或者屏障点失效,线程中断、等待超时
- for (;;) {
- try {
- //是否带有等待超时时间,如果没有,则直接调用await()方法阻塞当前线程
- if (!timed)
- trip.await();
- else if (nanos > 0L)
- //否则采用超时等待
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) { //被其他线程通过interrupt()方法唤醒
- //如果是当前generation且没有被broken,则让屏障失效并抛出异常
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw ie;
- } else {
- Thread.currentThread().interrupt();
- }
- }
- //有任何一个线程被中断时,都会调用breakBarrier()方法,而在该方法中会唤醒所有处于await()阻塞情况下的线程。
- //如果其他线程被唤醒,那么也需要抛出异常
- if (g.broken)
- throw new BrokenBarrierException();
- //被唤醒的generation和当前的不同,不做处理
- if (g != generation)
- return index;
- //如果在等待超时之后被唤醒,说明还有线程没有达到屏障点,则让屏障点失效
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- lock.unlock();
- }
- }
上面代码较长,整体逻辑如下:
正常情况下,线程调用cyclicBarrier.await()方法直接阻塞当前线程,所以在dowait()方法中调用trip.await()方法阻塞当前线程。
每个线程在调用cyclicBarrier.await()方法时,都会在代码中通过int index=-count对计数器进行递减,如果为0,则可以直接唤醒所有线程(nextGeneration()),并且如果异步回调任务barrierCommand不为空,则会同时执行该任务。
- private void nextGeneration() {
- // signal completion of last generation
- trip.signalAll();
- // set up next generation
- count = parties;
- generation = new Generation();
- }
被trip.await()方法阻塞的线程,除了可以通过trip.signalAll()方法唤醒外,还可以被interrupt()方法唤醒的,这属于异常唤醒。被环形的通过g == generation && ! g.broken判断是否是当前generation,以及屏障点是否失效。如果没有失效,则调用breakBarrier()方法让屏障点失效。
- private void breakBarrier() {
- generation.broken = true;
- count = parties;
- trip.signalAll();
- }
被中断的线程调用breakBarrier()方法,表示让当前屏障点失效,并且唤醒所有被阻塞的线程。接着被唤醒的线程需要通过if(g.broken)判断屏障点是否失效,如果是则意味着所有被唤醒的线程都要抛出异常。
最后一种情况,被唤醒的线程可能会调用带有超时机制的阻塞方法 nacos=trip.awaitNanos(nacos),所以如果超过指定时间后相关线程还没有到达当前generation的屏障点,则同样可以通过breakBarrier()让屏障点失效。
最后还有一个reset()方法再看一下。
- public void reset() {
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- breakBarrier(); //中断当前的 generation
- nextGeneration(); // 开始新的 generation
- } finally {
- lock.unlock();
- }
- }
这里的功能非常简单就是把原本阻塞在屏障点中的线程全部唤醒,然后进入下一个generation周期。
整体来看,CyclicBarrier的代码非常精简,实现的逻辑也不复杂,核心思想是通过Condition实现指定条件的线程等待和唤醒。通过CyclicBarrier的源码分析,我们可以更好的理解Condition作为基础组件如何灵活应用在不同的场景中。