• 多线程与高并发——并发编程(7)


    七、JUC并发工具

    1 CountDownLatch应用&源码分析

    1.1 CountDownLatch介绍

    • CountDownLatch 就是 JUC 包下的一个工具,整个工具最核心的功能就是计数器。
    • 假设,有三个业务需要并行处理,并且需要知道三个业务全部都处理完毕了,以及需要一个并发安全的计数器来操作,那么 CountDownLatch 就可以实现。给 CountDownLatch 设置一个数值 3,每个业务处理完毕之后,执行 countDown 方法,指定的 3 在每次执行 countDown 方法时,对 3 进行 -1。主线程可以在业务处理时,执行 await 方法,主线程会阻塞等待任务处理完毕。当设置的 3 基于 countDown 方法减到 0 之后,主线程就会被唤醒,继续处理后续业务。

    image.png

    • 当咱们得业务中,出现 2 个以上允许并行处理的任务,并且需要在任务都处理完毕之后,再做其他处理时,可以采用 CountDownLatch 去实现这个功能。

    1.2 CountDownLatch应用

    • 模拟三个任务需要并行处理,在三个任务全部处理完毕后,再执行后续操作。CountDownLatch 中,执行 countDown 方法,代表一个任务结束,对计数器 -1;执行 await 方法,代表等待计数器变为 0 时,再继续执行;执行 await(time,unit) 代表等待 time 时长,如果计数器不为 0,返回 false,如果在等待期间,计数器为 0,方法返回 true。
    static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
    
    static CountDownLatch countDownLatch = new CountDownLatch(3);
    
    public static void main(String[] args) throws InterruptedException {
        System.out.println("主业务开始执行");
        sleep(1000);
        executor.execute(CompanyTest::a);
        executor.execute(CompanyTest::b);
        executor.execute(CompanyTest::c);
        System.out.println("三个任务并行执行,主业务线程等待");
        // 死等任务结束
        // countDownLatch.await();
        // 如果在规定时间内,任务没有结束,返回false
        if (countDownLatch.await(10, TimeUnit.SECONDS)) {
            System.out.println("三个任务处理完毕,主业务线程继续执行");
        }else{
            System.out.println("三个任务没有全部处理完毕,执行其他的操作");
        }
    }
    
    private static void a() {
        System.out.println("A任务开始");
        sleep(1000);
        System.out.println("A任务结束");
        countDownLatch.countDown();
    }
    private static void b() {
        System.out.println("B任务开始");
        sleep(1500);
        System.out.println("B任务结束");
        countDownLatch.countDown();
    }
    private static void c() {
        System.out.println("C任务开始");
        sleep(2000);
        System.out.println("C任务结束");
        countDownLatch.countDown();
    }
    
    private static void sleep(long timeout){
        try {
            Thread.sleep(timeout);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    1.3 CountDownLatch源码分析

    • CountDownLatch 就是一个计数器,没有什么特殊功能,查看源码也只是查看计数器实现的方式。
    • 发现 CountDownLatch 的内部类 Sync 继承了 AQS,CountDownLatch 就是基于 AQS 实现的计数器。而 AQS 就是一个 state 属性,以及 AQS 双向链表,所以猜测计数器的数值实现就是基于 state 实现的,主线程阻塞的方式,也是阻塞在了 AQS 的双向链表中。
    1.3.1 有参构造
    • 就是构造内部类 Sync,并且给 AQS 的 state 赋值
    // CountDownLatch的有参构造
    public CountDownLatch(int count) {
        // 健壮性校验
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 构建内部类,Sync传入count
        this.sync = new Sync(count);
    }
    // AQS子类,Sync的有参构造
    Sync(int count) {
        // 就是给AQS中的state赋值
        setState(count);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    1.3.2 await 方法
    • await 方法就是判断当前 CountDownLatch 中的 state 是否为 0,如果为 0,直接正常执行后续任务;如果不为 0,以共享锁的方式,插入到 AQS 的双向链表中,并且挂起线程。
    // 一般主线程await的方法,阻塞主线程,等待state为0
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    // 执行了AQS的acquireSharedInterruptibly方法
    public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
        // 判断线程是否中断,如果中断标记位是true,直接抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            // 共享锁挂起的操作
            doAcquireSharedInterruptibly(arg);
    }
    // tryAcquireShared在CountDownLatch中的实现
    protected int tryAcquireShared(int acquires) {
        // 查看state是否为0,如果为0,返回1,不为0,返回-1
        return (getState() == 0) ? 1 : -1;
    }
    
    private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
        // 封装当前先成为Node,属性为共享锁
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                // 在这,就需要挂起当前线程。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    1.3.3 countDown方法
    • countDown 方法本质就是对 state -1,如果 state -1后变为 0,需要去 AQS 的链表中唤醒挂起的节点。
    // countDown对计数器-1
    public void countDown() {
        sync.releaseShared(1);	// 是-1
    }
    // AQS提供的功能
    public final boolean releaseShared(int arg) {
        // 对state - 1
        if (tryReleaseShared(arg)) {
            // state - 1后,变为0,执行doReleaseShared
            doReleaseShared();
            return true;
        }
        return false;
    }
    // CountDownLatch的tryReleaseShared实现
    protected boolean tryReleaseShared(int releases) {
        // 死循环是为了避免CAS并发问题
        for (;;) {
            int c = getState();		// 获取state
            if (c == 0)				// state已经为0,直接返回false
                return false;
            int nextc = c-1;		// 对获取到的state - 1
            // 基于CAS的方式,将值赋值给state
            if (compareAndSetState(c, nextc))
                // 赋值完,发现state为0了。此时可能会有线程在await方法处挂起,那边挂起,需要这边唤醒
                return nextc == 0;
        }
    }
    
    // 如何唤醒在await方法处挂起的线程
    private void doReleaseShared() {
        for (;;) {	// 死循环
            Node h = head;	// 拿到head
            // head不为null,有值,并且head != tail,代表至少2个节点
            // 一个虚拟的head,加上一个实质性的Node
            if (h != null && h != tail) {
                int ws = h.waitStatus;	// 说明AQS队列中有节点
                if (ws == Node.SIGNAL) {// 如果head节点的状态为 -1
                    // 先对head节点将状态从-1,修改为0,避免重复唤醒的情况
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;  
                    // 正常唤醒节点即可,先看head.next,能唤醒就唤醒,如果head.next有问题,从后往前找有效节点
                    unparkSuccessor(h);
                }
                // 会在Semaphore中谈到这个位置
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;  
            }
            // 会在Semaphore中谈到这个位置
            if (h == head)  
                break;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    2 CyclicBarrier应用&源码分析

    2.1 CyclicBarrier介绍

    从名字上来看 CyclicBarrier,就是代表循环屏障。

    • Barrier屏障:让一个或多个线程到达一个屏障点,会被阻塞。屏障点会有一个数值,当到达一个线程阻塞在屏障点时,就会对屏障点的数值进行 -1 操作。当屏障点的数值减到 0 时,屏障就会打开,唤醒所有阻塞在屏障点的线程。在释放屏障点之后,可以先执行一个任务,再让所有阻塞被唤醒的线程继续之后后续任务。
    • Cyclic循环:所有线程被释放后,屏障点的数值可以再次被重置。

    CyclicBarrier 一般被称为栅栏,是一种同步机制,允许一组线程相互等待。线程达到屏障点其实是基于 await 方法在屏障点阻塞。

    CyclicBarrier 并没有基于 AQS 实现,它是基于 ReentrantLock 锁机制实现了对屏障点的 --,以及线程挂起的操作。(CountDownLatch本身是基于 AQS,对 state 进行 release 操作后,可以 -1)

    CyclicBarrier 每来一个线程执行 await 方法,都会对屏障数值 -1 操作,每次 -1 后,立即查看数值是否为 0。如果为 0,直接唤醒所有的相互等待的线程。

    CyclicBarrier 和 CountDownLatch 区别?

    • 底层实现不同:CyclicBarrier 基于 ReentrantLock 实现,CountDownLatch 直接基于 AQS 实现。
    • 应用场景不同:CountDownLatch 的计数器只能使用一次,而 CyclicBarrier 在计数器达到 0 之后,可以重置计数器。CyclicBarrier 可以实现相比 CountDownLatch 更复杂的业务,执行业务时出现错误,可以充值 CyclicBarrier 计数器,再执行一次。
    • CyclicBarrier 还提供了很多其他功能:
      • 可以获取到阻塞的线程有多少;
      • 在线程相互等待是,如果等待的线程中断,可以抛异常,避免无限等待的问题。
    • CountDownLatch 一般是让主线程等待,让子线程对计数器 --。CyclicBarrier 更多的是让子线程一起技术和等待,等待的线程达到数值后,再统一唤醒。
    • CyclicBarrier 多个线程相互等待,直到达到同一个同步点,再一起执行。

    2.2 CyclicBarrier应用

    举个🌰:出国旅游

    导游小姐姐需要等待所有乘客都到位后,发护照、签证等文件,再一起出发。

    比如 Tom、Jack、Rose 三个人组团出门旅游

    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(3,() -> {
            System.out.println("等到各位大佬都到位之后,分发护照和签证等内容!");
        });
    
        new Thread(() -> {
            System.out.println("Tom到位!!!");
            try {
                barrier.await();
            } catch (Exception e) {
                System.out.println("悲剧,人没到齐!");
                return;
            }
            System.out.println("Tom出发!!!");
        }).start();
        Thread.sleep(100);
        new Thread(() -> {
            System.out.println("Jack到位!!!");
            try {
                barrier.await();
            } catch (Exception e) {
                System.out.println("悲剧,人没到齐!");
                return;
            }
            System.out.println("Jack出发!!!");
        }).start();
        Thread.sleep(100);
        new Thread(() -> {
            System.out.println("Rose到位!!!");
            try {
                barrier.await();
            } catch (Exception e) {
                System.out.println("悲剧,人没到齐!");
                return;
            }
            System.out.println("Rose出发!!!");
        }).start();
        /*
        tom到位,jack到位,rose到位
        导游发签证
        tom出发,jack出发,rose出发
         */
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 在构建 CyclicBarrier 时可以选择性指定 barrierAction,如果指定了,那么会在 barrier 归为 0 后,优先执行 barrierAction 任务,然后再去唤醒所有阻塞挂起的线程,并行去处理后续任务。
    • 所有相互等待的线程,可以指定等待时间,并且再等待过程中,如果有线程中断,所有相互等待的线程都会被唤醒。
    • 如果在等待期间,有线程终端了,唤醒所有线程后,CyclicBarrier 无法继续使用。如果需要继续使用当前 CyclicBarrier,需要调用 reset 方法,让 CyclicBarrier 重置。
    • 如果 CyclicBarrier 的屏障数值达到 0 之后,会默认重置屏障数值,CyclicBarrier 在没有线程中断时,是可以重复使用的。

    2.3 CyclicBarrier源码分析

    • 分成两块内容去查看,首先查看 CyclicBarrier 的一些核心属性,然后再去查看 CyclicBarrier 的核心方法。
    2.3.1 CyclicBarrier的核心属性
    public class CyclicBarrier {
       
        // 这个静态内部类是用来标记是否中断的
        private static class Generation {
       
            boolean broken = false;
        }
    	// CyclicBarrier是基于ReentrantLock实现的互斥操作,以及计数原子性操作
        private final ReentrantLock lock = new ReentrantLock();
        // 基于当前的Condition实现线程的挂起和唤醒
        private final Condition trip = lock.newCondition();
        // 记录有参构造出入的屏障数值,不会对这个数值做操作
        private final int parties;
        // 当屏障数值达到0之后,优先执行当前任务
        private final Runnable barrierCommand;
        // 初始化默认的Generation,用来标记线程中断情况
        
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
  • 相关阅读:
    抖音短视频账号矩阵seo分发系统--开发源代
    浅谈综合管廊建设提速后的运维管理
    八、mysql语句的DDL语句
    wsl2迁移镜像虚拟磁盘
    SpringBoot详解
    Python——飞机大战(day10)
    合并两个有序的单链表
    关于ETL的两种架构(ETL架构和ELT架构)
    每日五道java面试题之spring篇(五)
    2020 滴滴java面试笔试总结 (含面试题解析)
  • 原文地址:https://blog.csdn.net/yangwei234/article/details/132794779