• AQS详解


    AQS详解

    开始之前,先来看几个常见的面试题!建议带着问题来学习

    • 何为AQS?AQS原理了解吗?
    • CountDownLatch和CyclicBarrier了解吗?两者的区别是什么?
    • 用过Semaphore吗?应用场景了解吗?

    AQS简单介绍

    AQS的全称是AbStractQueueSynchronizer,翻译过来的意思就是抽象队列同步器。

    这个类在java.util.concurrent.locks包下边。

    在这里插入图片描述

    AQS就是一个抽象类,主要用来构建锁和同步器。

    public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    }
    
    • 1
    • 2

    AQS为构建锁和同步器提供了一些通用功能的实现,因此,使用AQS能简单且高效的构建vhu应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask(jdk1.7)等等都是基于AQS的。

    AQS原理

    在面试中被问到并发知识的时候,大多都会被问到“请你说一下自己对于AQS原理的理解‘。下边给大家一个示例仅供参考。

    AQS原理概览

    AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

    CLH(Craig Landin and Hagersten)队列是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系)。AQS是将每条请求共享资源的线程封装成一个CLH锁队列的一个结点(node)来实现锁的分配。

    看一下AQS原理图:

    在这里插入图片描述

    AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。

    private volatile int state;//共享变量,使用volatile修饰保证线程可见性
    
    • 1

    状态信息通过protected类型的getState(),setState(),compareAndSetState()进行操作

    //返回同步状态的当前值
    protected final int getState() {
           return state;
    }
    // 设置同步状态的值
    protected final void setState(int newState) {
           state = newState;
    }
    //原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
    protected final boolean compareAndSetState(int expect, int update) {
           return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    AQS对资源的共享方式
    AQS定义两种资源共享方式
    Exclusive(独占)

    只有一个线程能执行,如ReentrantLock。又可分为公平锁和非公平锁,ReentrantLock用时支持两种锁,下面以ReentrantLock对这两种锁的定义做介绍:

    • 公平锁 :按照线程在队列中的排队顺序,先到者先拿到锁。
    • 非公平锁:当前线程要获取锁时,先通过两次CAS操作取获取锁,如果没抢到,当前线程再加入到队列中等待唤醒。

    下面看一下ReentrantLock中相关的源码:

    ReentrantLock默认采用非公平锁,因为考虑获取更好的性能,通过boolean来决定是否使用公平锁(传入true用公平锁)。

    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;
    public ReentrantLock() {
       // 默认非公平锁
       sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
       sync = fair ? new FairSync() : new NonfairSync();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    RenntrantLock中公平锁的lock方法:

    static final class FairSync extends Sync {
       final void lock() {
           acquire(1);
       }
       // AbstractQueuedSynchronizer.acquire(int arg)
       public final void acquire(int arg) {
           if (!tryAcquire(arg) &&
               acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
               selfInterrupt();
       }
       protected final boolean tryAcquire(int acquires) {
           final Thread current = Thread.currentThread();
           int c = getState();
           if (c == 0) {
               // 1. 和非公平锁相比,这里多了一个判断:是否有线程在等待
               if (!hasQueuedPredecessors() &&
                   compareAndSetState(0, acquires)) {
                   setExclusiveOwnerThread(current);
                   return true;
               }
           }
           else if (current == getExclusiveOwnerThread()) {
               int nextc = c + acquires;
               if (nextc < 0)
                   throw new Error("Maximum lock count exceeded");
               setState(nextc);
               return true;
           }
           return false;
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    ReentrantLock中非公平锁的lock方法:

    static final class NonfairSync extends Sync {
       final void lock() {
           // 2. 和公平锁相比,这里会直接先进行一次CAS,成功就返回了
           if (compareAndSetState(0, 1))
               setExclusiveOwnerThread(Thread.currentThread());
           else
               acquire(1);
       }
       // AbstractQueuedSynchronizer.acquire(int arg)
       public final void acquire(int arg) {
           if (!tryAcquire(arg) &&
               acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
               selfInterrupt();
       }
       protected final boolean tryAcquire(int acquires) {
           return nonfairTryAcquire(acquires);
       }
    }
    /**
    * Performs non-fair tryLock.  tryAcquire is implemented in
    * subclasses, but both need nonfair try for trylock method.
    */
    final boolean nonfairTryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       if (c == 0) {
           // 这里没有对阻塞队列进行判断
           if (compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
       }
       else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0) // overflow
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    **总结:**非公平锁和公平锁只有两处不同:

    • 非公平锁被调用lock后,首先会调用CAS进行一次抢锁,如果这个时候恰巧锁没有被占用,那么直接就获取到锁返回了。

    • 非公平锁在CAS失败后,和公平锁一样都会进入tryAcquire方法,在tryAcquire方法中,如果发现锁这个时候被释放(state==0),非公平锁会直接CAS抢锁,但是公平锁会判断等待队列是否有线程处于等待状态,如果有则不去抢锁,乖乖排到后面。

    公平锁和非公平锁就这两点区别,如果这两次CAS都不成功,那么后面非公平锁和公平锁都是一样的,都要进入到阻塞队列等待唤醒。

    相对来说,非公平锁会有更好的性能,因为他的吞吐量比较大,当然,非公平锁让获取锁的时间变得更加不确定,可能会导致在阻塞队列的线程长期处于饥饿状态。

    Share(共享)

    多个线程可同时执行,如Semaphore/CountDownLatch.Semaphore、CountDownLatch、CyclicBarrier、ReadWriteLock后续讲。

    ReentranReadWriteLock可以看成时组合式,因为ReentrantReadWriteLock也就是读写锁允许对各线程同时对某一个资源进行读取。

    不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队),AQS已在上层实现好了。

    AQS底层使用了模板方法模式

    模板方法模式讲解链接:用Java8改造后的模板方法模式真的是yyds! (qq.com)

    同步器的设计是基于模板方法模式的,如果需要自定义同步器一般的方式时这样(模板方法模式很经典的一个应用):

    • 使用者继承AbstractQueueSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
    • 将AQS组合在自定义同步组件的实现中,并调用模板方法,而这些模板方法会调用使用者重写的方法。

    这和我们以往通过实现接口的方式有很大区别,这是模板方法模式很经典的一个应用。

    AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的钩子方法:

    protected boolean tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
    protected boolean tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
    protected int tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    protected boolean tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
    protected boolean isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
    
    • 1
    • 2
    • 3
    • 4
    • 5

    **什么时钩子方法呢?**钩子方法是一种被声明在抽象类中的方法,一般使用protected关键字修饰,他可以时空方法(由子类实现),也可以时默认实现方法。模板设计模式通过钩子方法控制固定步骤的实现。

    除了上边提到的钩子方法之外,AQS类中的其他方法都是final,所以无法被其他类重写。

    • 以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1.此后,其他线程再tryAcquire()时就会失败,知道A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state可以累加),这就时可重入的概念。但要注意的是,获取多少次就要释放多少次,这样才能保证state能回到0.
    • 再以CountDwonLatch为例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程数一致)。这N个子线程并行执行,每个子线程执行完后countDown()一次,state会CAS(Compare and swap)减1.等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程会从await()函数返回,继续后余动作。

    一般来说,自定义同步器要么是独占方法,要么是共享方式,他们只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。

    推荐两篇AQS原理和相关源码分析的文章:

    Java并发之AQS详解 - waterystone - 博客园 (cnblogs.com)

    Java并发包基石-AQS详解 - dreamcatcher-cx - 博客园 (cnblogs.com)

    Semaphore(信号量)

    synchronized和ReentrantLock都是一次只允许一个线程访问某个资源。

    Semaphore(信号量)可以指定多个线程同时访问某个资源。计数器是递增的。

    public class SemaphoreTest {
       // 定义线程数
       private static int num = 3;
       // 初始数量
       private static int initNum = 0;
       // 定义初始信号量,此时可获取的信号量为0
       private static Semaphore semaphore = new Semaphore(initNum);
       // 创建一个具有固定线程数量的线程池对象
       private static ExecutorService executorService = Executors.newFixedThreadPool(num);
       public static void main(String[] args) throws Exception{
           executorService.submit(() -> {
               System.out.println("A在上厕所");
               try {
                   Thread.sleep(4000);
                   // 释放一个信号量
                   semaphore.release();
                   System.out.println("A上完了");
               } catch (Exception e) {
                   e.printStackTrace();
               }finally {
    
               }
           });
           executorService.submit(()->{
               System.out.println("B在上厕所");
               try {
                   Thread.sleep(2000);
                   // 释放第二个信号量
                   semaphore.release();
                   System.out.println("B上完了");
               } catch (Exception e) {
                   e.printStackTrace();
               }finally {
    
               }
           });
           executorService.submit(()->{
               System.out.println("C在上厕所");
               try {
                   Thread.sleep(3000);
                   // 释放第三个信号量
                   semaphore.release();
                   System.out.println("C上完了");
               } catch (Exception e) {
                   e.printStackTrace();
               }finally {
    
               }
           });
    
           System.out.println("等待所有人从厕所回来开会...");
           // 获取信号量(此时已经释放了3个信号量,则获取3个信号量是不会阻塞的)
           semaphore.acquire(num);
           System.out.println("所有人都好了,开始开会...");
    
           executorService.shutdown();
    
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    输出结果为:

    A在上厕所
    B在上厕所
    等待所有人从厕所回来开会...
    C在上厕所
    B上完了
    C上完了
    A上完了
    所有人都好了,开始开会...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    构造函数传入初始值为0, 当子线程调用release()方法时,信号量计数器递增,主程序acquire()传参为3则说明主线程一直阻塞,直到计数器为3才返回。

    • 可以理解为Semaphore只是维持一个可获得许可证的一个数量,每个release()方法都会增加一个许可证,这时都可能会释放一个阻塞acquire()方法。
    • 除了acquire()方法之外,另一个比较常用的与之对应的方法时tryAcquire()方法,该方法如果获取不到许可就立即返回false。

    Semaphore有两种模式,公平模式和非公平模式。

    • 公平模式:调用acquire()方法的顺序就是获取许可证的顺序,遵循FIFO;

    • 非公平模式:抢占式的;

    Semaphore对应的两个构造方法如下:

    	 public Semaphore(int permits) {
            sync = new NonfairSync(permits);
        }
    
        public Semaphore(int permits, boolean fair) {
            sync = fair ? new FairSync(permits) : new NonfairSync(permits);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。

    SemaphoreCountDownLatch 一样,也是共享锁的一种实现。它默认构造 AQS 的 state 为 permits。当执行任务的线程数量超出 permits,那么多余的线程将会被放入阻塞队列 Park,并自旋判断 state 是否大于 0。只有当 state 大于 0 的时候,阻塞的线程才能继续执行,此时先前执行任务的线程继续执行 release() 方法,release() 方法使得 state 的变量会加 1,那么自旋的线程便会判断成功。 如此,每次只有最多不超过 permits 数量的线程能自旋成功,便限制了执行任务线程的数量

    CountDownLatch(倒计时器)

    CountDownLatch允许count个线程阻塞在一个地方,直至所有线程的任务都执行完毕。

    适用于在多线程的场景需要等待所有子线程全部执行完毕之后再做操作的场景。

    CountDownLatch是共享锁的一种实现,它默认构造AQS的state值为count。当线程使用countDown()方法时,其实使用了tryReleaseShared方法以CAS的操作来减少state,直至state为0.当调用await()方法的时候,如果state不为0,那就证明任务还没有执行完毕,await()方法就会一直阻塞,也就是说await()方法之后的语句不会被执行,然后,CountDownLatch会自旋CAS判断state0,如果state0的话,就会释放所有等待的线程,await()方法之后的语句得到执行。

    CountDownLatch的两种典型用法
    • 某一线程在开始运行前等待n个线程执行完毕
    • 将CountDownLatch的计数器初始化为n(new CountDownLatch(n)),每当一个任务线程执行完毕,就将计数器减1(countdownlatch.countDown()),当计数器的值变为0时,在CountDownLatch上await()的线程就会被唤醒。一个典型应用场景就是启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
    • 实现多个线程开始执行任务的最大并行性
    • 注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。做法是初始化一个共享的CountDownLatch对象,将计数器初始化为1(new CountDownLatch(1)),多个线程在开始执行任务前首先countdownlatch.await(),当主线程调用countDown()时,计数器变为0,多个线程同时被唤醒。
    CountDownLatch的使用示例

    举个例子,早上部门开会,有人在上厕所,这时候需要等待所有人从厕所回来之后才能开始会议。

    public class CountDownLatchTest {
       // 定义线程数量以及计数器初始值
       private static int num = 3;
       // 初始化计数器
       private static CountDownLatch countDownLatch = new CountDownLatch(num);
       // 初始化线程池
       private static ExecutorService executorService = Executors.newFixedThreadPool(num);
       public static void main(String[] args) throws Exception{
           executorService.submit(() -> {
               System.out.println("A在上厕所");
               try {
                   Thread.sleep(4000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }finally {
                   // 表示一个请求已经被完成   此时计数器-1
                   countDownLatch.countDown();
                   System.out.println("A上完了");
               }
           });
           executorService.submit(()->{
               System.out.println("B在上厕所");
               try {
                   Thread.sleep(2000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }finally {
                   // 表示一个请求已经被完成   此时计数器-1
                   countDownLatch.countDown();
                   System.out.println("B上完了");
               }
           });
           executorService.submit(()->{
               System.out.println("C在上厕所");
               try {
                   Thread.sleep(3000);
               } catch (InterruptedException e) {
                   e.printStackTrace();
               }finally {
                   // 表示一个请求已经被完成   此时计数器-1
                   countDownLatch.countDown();
                   System.out.println("C上完了");
               }
           });
    
           System.out.println("等待所有人从厕所回来开会...");
           // 调用await判断阻塞与否
           countDownLatch.await();
           System.out.println("所有人都好了,开始开会...");
           executorService.shutdown();
    
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    代码执行结果:

    A在上厕所
    B在上厕所
    等待所有人从厕所回来开会...
    C在上厕所
    B上完了
    C上完了
    A上完了
    所有人都好了,开始开会...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    初始化一个CountDownLatch实例传参3,因为我们有3个子线程,每次子线程执行完毕之后调用countDown()方法给计数器-1,主线程调用await()方法后会被阻塞,直到最后计数器变为0,await()方法返回,执行完毕。他和join()方法的区别就是join会阻塞子线程直到运行结束,而CountDownLatch可以在任何时候让await()返回,而且用ExecutorService没法用join了,相比起来,CountDownLatch更灵活。

    注意:CountDownLatch的await()方法使用不当很容易产生死锁,当count的值没办法等于0的时候,会导致一直等待。

    CountDownLatch是基于AQS实现的,volatile变量state维持倒数状态,多线程共享变量可见。

    • CountDownLatch通过构造函数初始化传入参数实际为AQS的state变量赋值,维持计数器倒数状态
    • 当主线程调用await()方法时,当前线程会被阻塞,当state不为0时进入AQS阻塞队列等待4
    • 其他线程调用countDown()时,state值原子性递减,当state值为0时,唤醒所有调用await()方法阻塞的线程
    CountDownLatch的不足

    CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

    CountDownLatch常见的面试题
    • CountDownLatch怎么用?应用场景是什么?
    • CountDownLatch和CycliBarrier的不同之处?
    • CountDownLatch类中主要的方法?

    CyclicBarrier(循环栅栏)

    CyclicBarrier和CountDownLatch非常类似,它也可以实现线程间的技术等待,但是它的功能比CountDownLatch’更加复杂强大。主要应用场景和CountDownLatch类似。

    CountDown Latch的实现是基于AQS的,而CycliBarrier是基于ReentrantLock(ReentrantLock也属于AQS同步

    器)和Condition的。

    CyclicBarrier理解

    CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是:让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。

    CyclicBarrier叫做回环屏障,它的作用是让一组线程全部达到一个状态之后再全部同时执行,而且他有一个特点就是所有线程执行完毕之后是可以重用的

    CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await()方法告诉CyclicBarrier我已经到达屏障了,然后当前线程被阻塞。

    public CyclicBarrier(int parties) {
       this(parties, null);
    }
    
    public CyclicBarrier(int parties, Runnable barrierAction) {
       if (parties <= 0) throw new IllegalArgumentException();
       this.parties = parties;
       this.count = parties;
       this.barrierCommand = barrierAction;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    其中,parties就代表了有拦截的线程的数量,当拦截的线程数量达到这个值的时候就打开栅栏,让所有线程通过。

    CyclicBarrier的应用场景

    CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个账号近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的技算结果,计算出整个Excel的日均银行流水。

    CyclicBarrier的使用示例:
    让一组线程全部达到一个状态之后再全部同时执行的效果。
    public class CyclicBarrierTest {
       // 定义初始化屏障3
       private static int num = 3;
       // 需要同步的线程数量  
       // 使用CyclicBarrier(int parties, Runnable barrierAction),在线程到达屏障后优先执行barrierAction.
       private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
           System.out.println("所有人都好了,开始开会...");
           System.out.println("-------------------");
       });
       private static ExecutorService executorService = Executors.newFixedThreadPool(num);
       public static void main(String[] args) throws Exception{
           executorService.submit(() -> {
               System.out.println("A在上厕所");
               try {
                   Thread.sleep(4000);
                   System.out.println("A上完了");
                   // 线程到达1个,此时await阻塞不向下执行
                   cyclicBarrier.await();
                   System.out.println("会议结束,A退出");
               } catch (Exception e) {
                   e.printStackTrace();
               }finally {
    
               }
           });
           executorService.submit(()->{
               System.out.println("B在上厕所");
               try {
                   Thread.sleep(2000);
                   System.out.println("B上完了");
                   // 线程到达2个
                   cyclicBarrier.await();
                   System.out.println("会议结束,B退出");
               } catch (Exception e) {
                   e.printStackTrace();
               }finally {
    
               }
           });
           executorService.submit(()->{
               System.out.println("C在上厕所");
               try {
                   Thread.sleep(3000);
                   System.out.println("C上完了");
                   // 线程到达3个
                   cyclicBarrier.await();
                   System.out.println("会议结束,C退出");
               } catch (Exception e) {
                   e.printStackTrace();
               }finally {
    
               }
           });
    
           executorService.shutdown();
    
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    输出结果为:

    A在上厕所
    B在上厕所
    C在上厕所
    B上完了
    C上完了
    A上完了
    所有人都好了,开始开会...
    -------------------
    会议结束,A退出
    会议结束,B退出
    会议结束,C退出
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    从结果来和CountDownLacth非常相似,初始化传入3个线程和一个任务,线程调用await()之后进入阻塞,计数器-1,当线程为0时,就去执行CyclicBarrier中构造函数的任务,当任务执行完毕后,唤醒所有阻塞中的线程。

    这验证了CyclicBarrier让一组线程全部达到一个状态之后再全部同时执行的效果。

    在这里使用的是CyclicBarrier 提供的一个更高级的构造函数 CyclicBarrier(int parties, Runnable barrierAction),用于在线程到达屏障时,优先执行 barrierAction,方便处理更复杂的业务场景

    验证CyclicBarrier可重用的结果
    public class CyclicBarrierTest2 {
       private static int num = 3;
       private static CyclicBarrier cyclicBarrier = new CyclicBarrier(num, () -> {
           System.out.println("-------------------");
       });
       private static ExecutorService executorService = Executors.newFixedThreadPool(num);
    
       public static void main(String[] args) throws Exception {
           executorService.submit(() -> {
               System.out.println("A在上厕所");
               try {
                   Thread.sleep(4000);
                   System.out.println("A上完了");
                   cyclicBarrier.await();
                   System.out.println("会议结束,A退出,开始撸代码");
                   cyclicBarrier.await();
                   System.out.println("C工作结束,下班回家");
                   cyclicBarrier.await();
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
    
               }
           });
           executorService.submit(() -> {
               System.out.println("B在上厕所");
               try {
                   Thread.sleep(2000);
                   System.out.println("B上完了");
                   cyclicBarrier.await();
                   System.out.println("会议结束,B退出,开始摸鱼");
                   cyclicBarrier.await();
                   System.out.println("B摸鱼结束,下班回家");
                   cyclicBarrier.await();
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
    
               }
           });
           executorService.submit(() -> {
               System.out.println("C在上厕所");
               try {
                   Thread.sleep(3000);
                   System.out.println("C上完了");
                   cyclicBarrier.await();
                   System.out.println("会议结束,C退出,开始摸鱼");
                   cyclicBarrier.await();
                   System.out.println("C摸鱼结束,下班回家");
                   cyclicBarrier.await();
               } catch (Exception e) {
                   e.printStackTrace();
               } finally {
    
               }
           });
    
           executorService.shutdown();
    
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    输出结果为:

    A在上厕所
    B在上厕所
    C在上厕所
    B上完了
    C上完了
    A上完了
    -------------------
    会议结束,A退出,开始撸代码
    会议结束,B退出,开始摸鱼
    会议结束,C退出,开始摸鱼
    -------------------
    C摸鱼结束,下班回家
    C工作结束,下班回家
    B摸鱼结束,下班回家
    -------------------
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    从结果看,每个子线程调用await()计数器减为0后,才开始继续一起往下执行,会议结束之后一起进入摸鱼状态,最后一天结束一起下班,这就是可重用。

    Cyclic Barrier还是基于AQS实现的,内部维护parties记录总线程数,count用于计数,最开始count= parties,调用await()之后count原子递减,当count为0之后,再次将parties赋值给count,这就是复用的原理。

    • 当子线程调用await()方法时,获取独占锁,同时对count递减,进入阻塞队列,然后释放锁
    • 当第一个线程被阻塞同时释放锁之后,其他子线程竞争获取锁,操作同1
    • 直到最后count为0,执行CyclicBarrier构造函数中的任务,执行完毕之后子线程继续向下执行
    CyclicBarrier源码分析

    当调用CyclicBarrier对象调用await()方法时,实际上调用的是dowait(false,0L)方法。await()方法就行树立起一个栅栏的行为一样,将线程挡住了,当拦住的线程数量达到parties的值时,栅栏才会打开,线程才得以通过执行。

    public int await() throws InterruptedException, BrokenBarrierException {
     try {
       	return dowait(false, 0L);
     } catch (TimeoutException toe) {
      	 throw new Error(toe); // cannot happen
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    dowait(false,0L);

    // 当线程数量或者请求数量达到 count 时 await 之后的方法才会被执行。上面的示例中 count 的值就为 5。
       private int count;
       /**
        * Main barrier code, covering the various policies.
        */
       private int dowait(boolean timed, long nanos)
           throws InterruptedException, BrokenBarrierException,
                  TimeoutException {
           final ReentrantLock lock = this.lock;
           // 锁住
           lock.lock();
           try {
               final Generation g = generation;
    
               if (g.broken)
                   throw new BrokenBarrierException();
    
               // 如果线程中断了,抛出异常
               if (Thread.interrupted()) {
                   breakBarrier();
                   throw new InterruptedException();
               }
               // cout减1
               int index = --count;
               // 当 count 数量减为 0 之后说明最后一个线程已经到达栅栏了,也就是达到了可以执行await 方法之后的条件
               if (index == 0) {  // tripped
                   boolean ranAction = false;
                   try {
                       final Runnable command = barrierCommand;
                       if (command != null)
                           command.run();
                       ranAction = true;
                       // 将 count 重置为 parties 属性的初始化值
                       // 唤醒之前等待的线程
                       // 下一波执行开始
                       nextGeneration();
                       return 0;
                   } finally {
                       if (!ranAction)
                           breakBarrier();
                   }
               }
    
               // loop until tripped, broken, interrupted, or timed out
               for (;;) {
                   try {
                       if (!timed)
                           trip.await();
                       else if (nanos > 0L)
                           nanos = trip.awaitNanos(nanos);
                   } catch (InterruptedException ie) {
                       if (g == generation && ! g.broken) {
                           breakBarrier();
                           throw ie;
                       } else {
                           // We're about to finish waiting even if we had not
                           // been interrupted, so this interrupt is deemed to
                           // "belong" to subsequent execution.
                           Thread.currentThread().interrupt();
                       }
                   }
    
                   if (g.broken)
                       throw new BrokenBarrierException();
    
                   if (g != generation)
                       return index;
    
                   if (timed && nanos <= 0L) {
                       breakBarrier();
                       throw new TimeoutException();
                   }
               }
           } finally {
               lock.unlock();
           }
       }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    总结:CyclicBarrier 内部通过一个 count 变量作为计数器,count 的初始值为 parties 属性的初始化值,每当一个线程到了栅栏这里了,那么就将计数器减一。如果 count 值为 0 了,表示这是这一代最后一个线程到达栅栏,就尝试执行我们构造方法中输入的任务。

    CyclicBarrier和CountDownLatch的区别

    下面这个是国外一个大佬的回答:

    CountDownLatch 是计数器,只能使用一次,而 CyclicBarrier 的计数器提供 reset 功能,可以多次使用。但是我不那么认为它们之间的区别仅仅就是这么简单的一点。我们来从 jdk 作者设计的目的来看,javadoc 是这么描述它们的:

    CountDownLatch: A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.(CountDownLatch: 一个或者多个线程,等待其他多个线程完成某件事情之后才能执行;) CyclicBarrier : A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.(CyclicBarrier : 多个线程互相等待,直到到达同一个同步点,再继续一起执行。)

    对于 CountDownLatch 来说,重点是“一个线程(多个线程)等待”,而其他的 N 个线程在完成“某件事情”之后,可以终止,也可以等待。而对于 CyclicBarrier,重点是多个线程,在任意一个线程没有完成,所有的线程都必须等待。

    CountDownLatch 是计数器,线程完成一个记录一个,只不过计数不是递增而是递减,而 CyclicBarrier 更像是一个阀门,需要所有线程都到达,阀门才能打开,然后继续执行。

    ReentrantLock 和 ReentrantReadWriteLock

    ReentrantLocksynchronized 的区别在上面已经讲过了这里就不多做讲解。另外,需要注意的是:读写锁 ReentrantReadWriteLock 可以保证多个线程可以同时读,所以在读操作远大于写操作的时候,读写锁就非常有用了。

  • 相关阅读:
    如何去除数据库中重复的数据
    卷积神经网络CNN学习笔记-MaxPool2D函数解析
    音频领域的50个关键词
    Linux Shell 基础语法 流程控制 逻辑运算 字符串操作详细解析
    Linux多线程【线程互斥与同步】
    Linux/Windows中根据端口号关闭进程及关闭Java进程
    16位ADC的积分非线性做到0.5LSB的难度有多大
    为什么说阿里云服务器5M带宽是最划算的?
    阅读openfoam框图
    解决IDEA中java的system.properties乱码问题
  • 原文地址:https://blog.csdn.net/qq_43410878/article/details/127451436