• 09 JUC 之 CountDownLatch


    CountDownLatch说明

    CountDownLatch是一个用来统计线程个数的工具,可以看作是个门闩(latch,门闩),必须等到线程齐了后才打开门闩。

    CountDownLatch主要使用的方法是:

    • await,当前线程阻塞
    • countDown,当前线程释放锁。

    CountDownLatch的两个应用场景:

    1.乘车去买菜,也可看作是工人上工和下工。这种场景需要的使用两个CountDownLatch。代码如下:

    1. private static final CountDownLatch people = new CountDownLatch(3);
    2. private static final CountDownLatch car = new CountDownLatch(1);
    3. static class Run3 implements Runnable {
    4. private String name;
    5. Run3(String name) {
    6. this.name = name;
    7. }
    8. @Override
    9. public void run() {
    10. try {
    11. System.out.println(name + "上车");
    12. car.await();
    13. System.out.println(name + "去采购东西");
    14. // doWork部分,不同线程耗时不一样
    15. int second = new Random().nextInt(20);
    16. Thread.sleep(second * 1000);
    17. System.out.println(name + "采购东西完毕,回到车上");
    18. people.countDown();
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }
    22. }
    23. }
    24. public static void main(String[] args) throws InterruptedException {
    25. System.out.println("人员准备去采购");
    26. new Thread(new Run3("甲 ")).start();
    27. new Thread(new Run3("乙 ")).start();
    28. new Thread(new Run3("丙 ")).start();
    29. Thread.sleep(10 * 1000);
    30. // doWork
    31. car.countDown();
    32. System.out.println("车辆到达菜市场,人员下车");
    33. // 等待所有people完成采购
    34. people.await();
    35. System.out.println("车辆回家");
    36. }
    37. 输出日志:
    38. 人员准备去采购
    39. 甲 上车
    40. 乙 上车
    41. 丙 上车
    42. 车辆到达菜市场,人员下车
    43. 乙 去采购东西
    44. 甲 去采购东西
    45. 丙 去采购东西
    46. 甲 采购东西完毕,回到车上
    47. 丙 采购东西完毕,回到车上
    48. 乙 采购东西完毕,回到车上
    49. 车辆回家

    2.另一个案例是单独的CountDownLatch完成的,可以看作是上述代码中的people。这个案例是把一个大任务分成了多个小任务让不同的人(线程)去处理,等所有人(线程)完成后,这个任务才算完成。可以类比的案例是,赛场跑道上多个运动员比赛,每个运动员都完成了比赛。那这个比赛项目就结束了。又或者是一个宝藏密室需要多个钥匙都插入钥匙孔才能打开大门。

    CountDownLatch的源码分析

    CountDownLatch 内部也是有个Sync类继承了AbstractQueuedSynchronizer类,之前我们分析ReentrantLock、Semaphore时,发现这个父类他只提供模板方法。然而子类以不同的实现方式,来对应具体的业务案例。

    CountDownLatch 的Sync类如下:

    1. private static final class Sync extends AbstractQueuedSynchronizer {
    2. private static final long serialVersionUID = 4982264981922014374L;
    3. Sync(int count) {
    4. setState(count);
    5. }
    6. int getCount() {
    7. return getState();
    8. }
    9. protected int tryAcquireShared(int acquires) {
    10. return (getState() == 0) ? 1 : -1;
    11. }
    12. protected boolean tryReleaseShared(int releases) {
    13. // Decrement count; signal when transition to zero
    14. for (;;) {
    15. int c = getState();
    16. if (c == 0)
    17. return false;
    18. int nextc = c-1;
    19. if (compareAndSetState(c, nextc))
    20. return nextc == 0;
    21. }
    22. }
    23. }
    1. // CountDownLatch的await方法
    2. public void await() throws InterruptedException {
    3. sync.acquireSharedInterruptibly(1);
    4. }
    5. // AQS类的acquireSharedInterruptibly方法,也就是CountDownLatch实际执行的方法
    6. public final void acquireSharedInterruptibly(int arg)
    7. throws InterruptedException {
    8. if (Thread.interrupted())
    9. throw new InterruptedException();
    10. if (tryAcquireShared(arg) < 0)
    11. doAcquireSharedInterruptibly(arg);
    12. }
    13. private void doAcquireSharedInterruptibly(int arg)
    14. throws InterruptedException {
    15. final Node node = addWaiter(Node.SHARED);
    16. boolean failed = true;
    17. try {
    18. for (;;) {
    19. final Node p = node.predecessor();
    20. if (p == head) {
    21. int r = tryAcquireShared(arg);
    22. if (r >= 0) {
    23. setHeadAndPropagate(node, r);
    24. p.next = null; // help GC
    25. failed = false;
    26. return;
    27. }
    28. }
    29. if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
    30. throw new InterruptedException();
    31. }
    32. } finally {
    33. if (failed)
    34. cancelAcquire(node);
    35. }
    36. }
    37. // CountDownLatch方法
    38. public void countDown() {
    39. sync.releaseShared(1);
    40. }
    41. // AQS的releaseShared
    42. public final boolean releaseShared(int arg) {
    43. if (tryReleaseShared(arg)) {
    44. doReleaseShared();
    45. return true;
    46. }
    47. return false;
    48. }

    从这简单的代码里,我们可以了解到以下内容:

    1. 通过构造方法,直接设置了AQS类中state的具体值,同时要求了执行线程的个数要和这个值保持一致。
    2. 主线程通过await方法被阻塞,说明state !=0,方法返回了-1。当前线程被放在了队列中,且被挂起了。
    3. 子线程调用countDown方法,实际上是将state值进行减少,因为每个线程只能减1,那么当线程个数等于初始state的时候,state才能被减到0,那么此时AQS的releaseShared方法里的tryReleaseShared(arg)方法得到了true,此时才能执行doReleaseShared方法,唤醒主线程。
    4. 结合案例,主线程和子线程其实是相对的概念。不能说main方法里就一定是主线程。
  • 相关阅读:
    考察1学生学籍系统winform .net6 sqlserver
    qt判断当前日期的当月的最后一天是几号
    高校实验室设备管理系统设计与实现-计算机毕业设计源码+LW文档
    【torch.utils.data.sampler】采样器的解析和使用
    升级pip 升级pip3的快速方法
    ECCV 2022 | RFLA:基于高斯感受野的微小目标检测标签分配
    MySQL 新增表中的数据为另外一个或多个表的数据(业务场景:创建关系表,复制旧表数据到新表)
    LeetCode刷题之HOT100之组合总和
    基于Java的图书商城管理系统设计与实现(源码+lw+部署文档+讲解等)
    搭建简单的Jmeter性能监控平台(Influxdb+Grafana)
  • 原文地址:https://blog.csdn.net/liulimoyu/article/details/127711009