• CountDownLatch 使用例子和代码流程


    CountDownLatch意思理解

    单词1: countdown

    
    常见释义:[ˈkaʊntdaʊn][ˈkaʊntdaʊn]
    n.	倒数读秒,倒计时(如发射宇宙飞船时); 大事临近的时期;
    [例句]The countdown to Christmas starts here.
    现在开始圣诞节倒计时。
    [其他]	复数:countdowns
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    单词2: latch

    常见释义 英[lætʃ][lætʃ]
    n.	插销; 门闩; 弹簧锁; 碰锁;
    vt.	用插销插上; 用碰锁锁上;
    [例句]He lifted the latch and opened the door.
    他拉起门闩开了门。
    [其他]	第三人称单数:latches 复数:latches 现在分词:latching 过去式:latched 过去分词:latched
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    场景:完成某件事情时,前面的一些事情都要完成,然后自己才能继续。

    远离:使用一个计数器进行实现,计数器初始值就是其它线程的数量。当每个被计数的线程完成任务后,计数器值减一,当计数器的值为0时,表示所有线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行。

    普通多线程运行

     public static void main(String[] args) throws InterruptedException {
    
            Runnable runnable = new Runnable() {
                @Override
                public void run () {
                    try {
                        // 业务处理
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName() + ":" + " finished");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
    
                    }
                }
            };
            List<Thread> threads = new ArrayList<>(4);
            for(int i= 0;i<4;i++) {
                Thread thread = new Thread(runnable);
                threads.add(thread);
            }
            for(Thread thread: threads){
                thread.start();
            }
    
            System.out.println("main end");
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    Thread.join()实现

    如下,main线程和其它线程运行时序不定,达不到其它线程都结束后main县城再结束的效果。

      public static void main(String[] args) throws InterruptedException {
    
            Runnable runnable = new Runnable() {
                @Override
                public void run () {
                    try {
                        // 业务处理
                        TimeUnit.SECONDS.sleep(3);
                        System.out.println(Thread.currentThread().getName() + ":" + " finished");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
    
                    }
                }
            };
            List<Thread> threads = new ArrayList<>(4);
            for(int i= 0;i<4;i++) {
                Thread thread = new Thread(runnable);
                threads.add(thread);
            }
            for(Thread thread: threads){
                thread.start();
            }
            for(Thread thread: threads){
                thread.join();
            }
    
            System.out.println("main end");
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    join原理:

    • A线程中执行 B.join(),则A线程阻塞住,直到线程B完成,A线程才能再继续。

    • join方法的本质调用的是Object中的wait方法实现线程的阻塞,即有monitor锁的概念。所以B线程执行一半要通知A线程继续则无法实现。

    CountDownLatch实现

    public class Test {
    
        final static int CNT = 3;
    
        static CountDownLatch countDownLatch = new CountDownLatch(CNT);
    
    
        public static void main(String[] args) throws InterruptedException {
    
            Runnable runnable = new Runnable() {
                @Override
                public void run () {
                    try {
                        // 业务处理
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println(Thread.currentThread().getName() + ":" + "business finished");
                        countDownLatch.countDown();
                        // 其它处理
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println(Thread.currentThread().getName() + ":" + " finished");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                    }
                }
            };
            List<Thread> threads = new ArrayList<>(4);
            for (int i = 0; i < CNT; i++) {
                Thread thread = new Thread(runnable);
                threads.add(thread);
            }
            for (Thread thread : threads) {
                thread.start();
            }
            countDownLatch.await();
            System.out.println("main end");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    输出如下: main线程只等待业务线程的业务逻辑结束就行。(当然应该写两个try finally, 避免结束不了)
    在这里插入图片描述

    改进如下:

    public class Test {
    
        final static int CNT = 3;
    
        static CountDownLatch countDownLatch = new CountDownLatch(CNT);
    
    
        public static void main(String[] args) throws InterruptedException {
    
            Runnable runnable = new Runnable() {
                @Override
                public void run () {
                    // 业务处理
                    try {
                        TimeUnit.SECONDS.sleep(2);
                        System.out.println(Thread.currentThread().getName() + ":" + "business finished");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        countDownLatch.countDown();
                    }
                    // 其它处理
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println(Thread.currentThread().getName() + ":" + " finished");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                    }
    
                }
            };
            List<Thread> threads = new ArrayList<>(4);
            for (int i = 0; i < CNT; i++) {
                Thread thread = new Thread(runnable);
                threads.add(thread);
            }
            for (Thread thread : threads) {
                thread.start();
            }
            countDownLatch.await();
            System.out.println("main end");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    CountDownLatch流程

    new CountDownLatch(3)

    public CountDownLatch(int count) {
       if (count < 0) throw new IllegalArgumentException("count < 0");
       this.sync = new Sync(count);
    }
    
    • 1
    • 2
    • 3
    • 4

    CountDownLatch内部自己实现了继承AbstractQueuedSynchronizer的同步器,如下

     private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c - 1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    countDown 方法

    在这里插入图片描述

    await方法

    在这里插入图片描述

    CountDownLatch 并没有往aqs队列加入节点,而是使用aqs的共享模式

    aqs的java.util.concurrent.locks.AbstractQueuedSynchronizer#doAcquireSharedInterruptibly方法

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    只要countDown没有减到0,那么Node.SHARED就是head, 则一直for(;😉 死循环的执行判断countDown为0,只要为0,就返回退出了

      if (p == head) {
       int r = tryAcquireShared(arg);
          if (r >= 0) {
              setHeadAndPropagate(node, r);
              p.next = null; // help GC
              return;
          }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    其中tryAcquireShared(arg);则是CountDownLatch中d的判断逻辑 java.util.concurrent.CountDownLatch.Sync#tryAcquireShared

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }
    
    • 1
    • 2
    • 3
  • 相关阅读:
    【OS】进程通信
    SCAU Java 实验7 银行账户存取款业务
    TCP/IP网络编程(8) 基于Linux的多进程服务器
    一次nginx文件打开数的问题排查处理
    shiro550反序列学习
    什么是智能视频美颜SDK?
    vue3代码编写
    【附源码】Python计算机毕业设计旅游组团管理系统
    【广州华锐互动】利用VR开展细胞基础实验教学有什么好处?
    一款优秀的盲盒APP源码成品需要具备什么条件
  • 原文地址:https://blog.csdn.net/qq_26437925/article/details/132938979