• 从一道面试题进入Java并发新机制---J.U.C


    什么是 J.U.C ?

    它是 Java java.util.concurrent 包的缩写,从包的名称就可以看出,它应该主要提供一些 线程同步 的类。

    这个包下面的类提供了多种实现线程同步的方式,还有诸如 ExecutorCallableFutureExecutorService 等耳熟能详的接口。

    一道面试题

    实现一个容器,提供两个方法 addsize ,实现两个线程:

    线程1,向容器中添加 10 个元素到容器中;

    线程2,实时监控容器元素个数,当元素个数到5个时,线程2给出提示并结束。

    看到这道题,我首先想到的是 synchronized + wait/notify ,具体实现为:

    1. public class NiubilityContainer<T> {
    2. private List list = new ArrayList<>();
    3. /**
    4. * add方法,借用list的add方法相容器添加元素
    5. * @param t 待添加的元素
    6. */
    7. public void add(T t) {
    8. list.add(t);
    9. }
    10. /**
    11. * 借助list的size方法返回当前容器的元素个数
    12. * @return int 容器元素个数
    13. */
    14. public int size(){
    15. return list.size();
    16. }
    17. public static void main(String[] args) {
    18. NiubilityContainer c = new NiubilityContainer();
    19. //定义一个需要上锁的对象,线程持有该对象的锁才能执行
    20. final Object lock = new Object();
    21. //启动一个监控线程
    22. new Thread(() -> {
    23. System.out.println("监控线程启动...");
    24. synchronized (lock) {
    25. //只要元素个数不为5,就调用wait方法让出CPU
    26. if (c.size() != 5) {
    27. try {
    28. lock.wait();
    29. } catch (InterruptedException e) {
    30. e.printStackTrace();
    31. }
    32. }
    33. //通知添加元素的线程继续执行
    34. lock.notify();
    35. }
    36. System.out.println("容器元素个数为5,监控线程退出!");
    37. }, "MonitorThread").start();
    38. //启动一个添加元素的线程
    39. new Thread(() -> {
    40. System.out.println("添加元素线程启动...");
    41. synchronized (lock) {
    42. for (int i = 0; i < 10; i++) {
    43. c.add(i);
    44. System.out.println("添加元素线程 add " + i);
    45. if (c.size() == 5) {
    46. //先唤醒当前线程
    47. lock.notify();
    48. try {
    49. //释放锁,使得监控线程得以执行
    50. lock.wait();
    51. } catch (InterruptedException e) {
    52. e.printStackTrace();
    53. }
    54. }
    55. }
    56. }
    57. }, "AddThread").start();
    58. }
    59. }
    60. 复制代码

    运行结果:

    1. 监控线程启动...
    2. 添加元素线程启动...
    3. 添加元素线程 add 0
    4. 添加元素线程 add 1
    5. 添加元素线程 add 2
    6. 添加元素线程 add 3
    7. 添加元素线程 add 4
    8. 容器元素个数为5,监控线程退出!
    9. 添加元素线程 add 5
    10. 添加元素线程 add 6
    11. 添加元素线程 add 7
    12. 添加元素线程 add 8
    13. 添加元素线程 add 9
    14. 复制代码

    这种方法要注意 waitnotify 的顺序,而且他们都必须放在 synchronized 内。

    CyclicBarrier

    CyclicBarrier的官网描述:

    A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released.

    A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

    转译总结一下,大概就是这个意思:

    它允许一组线程互相等待,直到到达某个公共屏障点 (Common Barrier Point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 Barrier 在释放等待线程后可以重用,所以称它为循环( Cyclic ) 的屏障( Barrier ) 。

    构造函数

    1. /**
    2. * 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
    3. *
    4. * @param parties 拦截线程的总数量
    5. * @param barrierAction 为 CyclicBarrier 接收的 Runnable 命令,用于在线程到达屏障时,优先执行
    6. * @throws IllegalArgumentException 当拦截线程数量小于1时抛出异常
    7. */
    8. public CyclicBarrier(int parties, Runnable barrierAction) {
    9. if (parties <= 0) throw new IllegalArgumentException();
    10. this.parties = parties;
    11. this.count = parties;
    12. this.barrierCommand = barrierAction;
    13. }
    14. /**
    15. * 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
    16. * @param parties 表示拦截线程的总数量。
    17. * @throws IllegalArgumentException 当拦截线程数量小于1时抛出异常
    18. */
    19. public CyclicBarrier(int parties) {
    20. this(parties, null);
    21. }
    22. 复制代码

    CyclicBarrier 中最重要的方法莫过于 await() 方法,在所有参与者 parties 都已经在此 barrier 上调用 await 方法之前,将一直等待。

    我举个栗子,一个小伙子( 线程1 )骑自行车到了一个红绿灯路口,红灯( Barrier )亮了,他需要停下来等待( await );一个白富美( 线程2 )开着兰博基尼也路过此路口,恰好此时绿灯亮了( 拦截的最后一个线程到达barrier )。小伙子和白富美确认过眼神后,过绿灯可以去干别的事( barrierAction )了。

    基于以上对 CyclicBarrier 的了解,我们要强行使用 CyclicBarrier 来解决开始的那道题(==只需要实现效果即可==),怎么办?

    我的思路是,我只定义一个需要拦截的线程,让它去做添加元素的操作,当元素添加到5个之后,await,执行一个Runnable,也就是barrierAction,用来提示已经有5个元素了。

    具体实现:

    1. public class TestCyclicBarrier {
    2. static CyclicBarrier barrier ;
    3. static List lists = new LinkedList();
    4. static void add(Object o) {
    5. lists.add(o);
    6. }
    7. static int size() {
    8. return lists.size();
    9. }
    10. static class ReactThread implements Runnable {
    11. @Override
    12. public void run() {
    13. System.out.println("============== 元素个数已到达5,监控退出!=============");
    14. }
    15. }
    16. static class AddThread extends Thread {
    17. @Override
    18. public void run() {
    19. for (int i = 1; i < 11; i++) {
    20. add(new Object());
    21. System.out.println("添加元素线程 add 第" + i + "个元素");
    22. if (size() == 5) {
    23. try {
    24. barrier.await();
    25. } catch (InterruptedException e) {
    26. e.printStackTrace();
    27. } catch (BrokenBarrierException e) {
    28. e.printStackTrace();
    29. }
    30. }
    31. }
    32. }
    33. }
    34. public static void main(String[] args) {
    35. barrier = new CyclicBarrier(1, new ReactThread());
    36. new AddThread().start();
    37. }
    38. }
    39. 复制代码

    执行结果:

    CountDownLatch

    CountDownLatch的API描述:

    A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

    一种同步机制,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。

    A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

    用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await() 方法会一直受阻塞。之后,会释放所有等待的线程,await() 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier 。

    CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候,需要带入该计数器值,该值就表示了线程的数量。

    每当一个线程完成自己的任务后,计数器的值就会减 1 。 当计数器的值变为0时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。

    再次强行使用 CountDownLatch 来解决一下开始的那道题,尝试代码如下:

    1. public class TestCountDownLatch {
    2. static CountDownLatch latch = new CountDownLatch(1);
    3. static List lists = new LinkedList();
    4. static void add(Object o) {
    5. lists.add(o);
    6. }
    7. static int size() {
    8. return lists.size();
    9. }
    10. static class ReactThread extends Thread {
    11. @Override
    12. public void run() {
    13. try {
    14. latch.await();
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. System.out.println("============== 元素个数已到达5,监控退出!=============");
    19. }
    20. }
    21. static class AddThread extends Thread {
    22. @Override
    23. public void run() {
    24. for (int i = 1; i < 11; i++) {
    25. add(new Object());
    26. System.out.println("添加元素线程 add 第" + i + "个元素");
    27. if (size() == 5) {
    28. latch.countDown();
    29. }
    30. //(监控线程已经准备打印退出了,添加元素的线程还在继续添加)
    31. //加个睡眠时间,方便观察,因为打印的动作也需要耗时
    32. try {
    33. TimeUnit.MILLISECONDS.sleep(100);
    34. } catch (InterruptedException e) {
    35. e.printStackTrace();
    36. }
    37. }
    38. }
    39. }
    40. public static void main(String[] args) {
    41. new ReactThread().start();
    42. new AddThread().start();
    43. }
    44. }
    45. 复制代码

    运行结果也能达到预期:

    1. 添加元素线程 add1个元素
    2. 添加元素线程 add2个元素
    3. 添加元素线程 add3个元素
    4. 添加元素线程 add4个元素
    5. 添加元素线程 add5个元素
    6. ============== 元素个数已到达5,监控退出!=============
    7. 添加元素线程 add6个元素
    8. 添加元素线程 add7个元素
    9. 添加元素线程 add8个元素
    10. 添加元素线程 add9个元素
    11. 添加元素线程 add10个元素
    12. 复制代码

    CountDownLatch的关键类图:

    通过这个图,我们试着看一下它的实现源码。

    • 构造方法:
    1. /**
    2. * 构造一个用给定计数初始化的 CountDownLatch
    3. *
    4. * @param count the number of times {@link #countDown} must be invoked
    5. * before threads can pass through {@link #await}
    6. * @throws IllegalArgumentException if {@code count} is negative
    7. */
    8. public CountDownLatch(int count) {
    9. if (count < 0) throw new IllegalArgumentException("count < 0");
    10. this.sync = new Sync(count);
    11. }
    12. 复制代码
    • Sync

    SyncCountDownLatch 的一个内部类,它实现了 AbstractQueuedSynchronizerAQS )。

    1. /**
    2. * Synchronization control For CountDownLatch.
    3. * Uses AQS state to represent count.
    4. */
    5. private static final class Sync extends AbstractQueuedSynchronizer {
    6. private static final long serialVersionUID = 4982264981922014374L;
    7. Sync(int count) {
    8. setState(count);
    9. }
    10. //获取同步状态
    11. int getCount() {
    12. return getState();
    13. }
    14. //获取同步状态
    15. protected int tryAcquireShared(int acquires) {
    16. return (getState() == 0) ? 1 : -1;
    17. }
    18. //释放同步状态
    19. protected boolean tryReleaseShared(int releases) {
    20. // Decrement count; signal when transition to zero
    21. for (;;) {
    22. int c = getState();
    23. if (c == 0)
    24. return false;
    25. int nextc = c-1;
    26. if (compareAndSetState(c, nextc))
    27. return nextc == 0;
    28. }
    29. }
    30. }
    31. 复制代码

    由此可见,CountDownLatch 内部是采用 共享锁 来实现的。

    • await方法
    1. public void await() throws InterruptedException {
    2. sync.acquireSharedInterruptibly(1);
    3. }
    4. 复制代码

    acquireSharedInterruptibly 这个方法在其父类 AQS 里,来看一下:

    1. // java.util.concurrent.locks.AbstractQueuedSynchronizer.java
    2. public final void acquireSharedInterruptibly(int arg)
    3. throws InterruptedException {
    4. if (Thread.interrupted())
    5. throw new InterruptedException();
    6. if (tryAcquireShared(arg) < 0)
    7. doAcquireSharedInterruptibly(arg);
    8. }
    9. //...
    10. private void doAcquireSharedInterruptibly(int arg)
    11. throws InterruptedException {
    12. final Node node = addWaiter(Node.SHARED);
    13. boolean failed = true;
    14. try {
    15. for (;;) {
    16. final Node p = node.predecessor();
    17. if (p == head) {
    18. int r = tryAcquireShared(arg);
    19. if (r >= 0) {
    20. setHeadAndPropagate(node, r);
    21. p.next = null; // help GC
    22. failed = false;
    23. return;
    24. }
    25. }
    26. if (shouldParkAfterFailedAcquire(p, node) &&
    27. parkAndCheckInterrupt())
    28. throw new InterruptedException();
    29. }
    30. } finally {
    31. if (failed)
    32. cancelAcquire(node);
    33. }
    34. }
    35. 复制代码

    当进入到 doAcquireSharedInterruptibly 方法后,它会自旋 for (;;) ,一直尝试去获取同步状态。

    • countDown
    1. public void countDown() {
    2. sync.releaseShared(1);
    3. }
    4. 复制代码

    这里调用的 releaseShared 也是父类 AQS 的方法:

    1. // AQS
    2. public final boolean releaseShared(int arg) {
    3. if (tryReleaseShared(arg)) {
    4. doReleaseShared();
    5. return true;
    6. }
    7. return false;
    8. }
    9. 复制代码
    1. // CountDownLatch内部类Sync重写tryReleaseShared方法
    2. protected boolean tryReleaseShared(int releases) {
    3. // Decrement count; signal when transition to zero
    4. for (;;) {
    5. //获取锁状态
    6. int c = getState();
    7. //c == 0 直接返回,释放锁成功
    8. if (c == 0)
    9. return false;
    10. int nextc = c-1;
    11. //比较并替换CAS,更新锁状态(计数器)
    12. if (compareAndSetState(c, nextc))
    13. return nextc == 0;
    14. }
    15. }
    16. 复制代码

    Semaphore

    Semaphore 也是并发工具类之一,按惯例,我们来看一下它的API描述:

    计数信号量。从概念上讲,信号量维护了一个许可集。

    如有必要,在许可可用前会阻塞每一个 acquire,然后再获取该许可。 每个 release 添加一个许可,从而可能释放一个正在阻塞的获取者。 但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

    信号量通常用于限制线程数量,使其无法访问某些(物理或逻辑)资源,例如 API 上举了一个例子,通过 Semaphore 来控制资源池中数据的访问:

    1. class Pool {
    2. private static final int MAX_AVAILABLE = 100;
    3. private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    4. public Object getItem() throws InterruptedException {
    5. available.acquire();
    6. return getNextAvailableItem();
    7. }
    8. public void putItem(Object x) {
    9. if (markAsUnused(x))
    10. available.release();
    11. }
    12. // Not a particularly efficient data structure; just for demo
    13. protected Object[] items = ... whatever kinds of items being managed
    14. protected boolean[] used = new boolean[MAX_AVAILABLE];
    15. protected synchronized Object getNextAvailableItem() {
    16. for (int i = 0; i < MAX_AVAILABLE; ++i) {
    17. if (!used[i]) {
    18. used[i] = true;
    19. return items[i];
    20. }
    21. }
    22. return null; // not reached
    23. }
    24. protected synchronized boolean markAsUnused(Object item) {
    25. for (int i = 0; i < MAX_AVAILABLE; ++i) {
    26. if (item == items[i]) {
    27. if (used[i]) {
    28. used[i] = false;
    29. return true;
    30. } else
    31. return false;
    32. }
    33. }
    34. return false;
    35. }
    36. }
    37. 复制代码

    从中我们可以领会到,Semaphore 的关键方法就是 acquire()release()

    • acquire

    获取信号量的方法。

    • release

    释放信号量的方法。

    再再次强行使用 Semaphore 来完成我们之前那道题,上代码:

    1. public class TestSemaphore {
    2. static List lists = new LinkedList();
    3. static void add(Object o) {
    4. lists.add(o);
    5. }
    6. static int size() {
    7. return lists.size();
    8. }
    9. static Thread t1 = null, t2 = null;
    10. public static void main(String[] args) {
    11. //定义一个只能有1个线程能获得许可的信号量
    12. Semaphore semaphore = new Semaphore(1);
    13. t1 = new Thread(() -> {
    14. try {
    15. //尝试获得许可
    16. semaphore.acquire();
    17. //添加5个元素之后,释放锁
    18. for (int i = 0; i < 5; i++) {
    19. add(new Object());
    20. System.out.println("线程t1 已经 add " + size() + " 个元素");
    21. }
    22. //释放锁,等待t2打印退出
    23. semaphore.release();
    24. //需要让t2执行
    25. t2.start();
    26. t2.join();
    27. //t2退出后,继续获得许可,添加元素
    28. semaphore.acquire();
    29. for (int i = 0; i < 5; i++) {
    30. add(new Object());
    31. System.out.println("线程t1 已经 add " + size() + " 个元素");
    32. }
    33. } catch (InterruptedException e) {
    34. e.printStackTrace();
    35. }
    36. });
    37. t2 = new Thread(() -> {
    38. try {
    39. //首先获得许可
    40. semaphore.acquire();
    41. //打印 退出
    42. System.out.println("------- 线程t2已知容器中有5个元素了,t2退出。-------");
    43. //释放锁,等t1接着添加剩余的元素
    44. semaphore.release();
    45. } catch (InterruptedException e) {
    46. e.printStackTrace();
    47. }
    48. });
    49. t1.start();
    50. }
    51. }
    52. 复制代码

    执行结果:

    1. 线程t1 已经 add 1 个元素
    2. 线程t1 已经 add 2 个元素
    3. 线程t1 已经 add 3 个元素
    4. 线程t1 已经 add 4 个元素
    5. 线程t1 已经 add 5 个元素
    6. ------- 线程t2已知容器中有5个元素了,t2退出。-------
    7. 线程t1 已经 add 6 个元素
    8. 线程t1 已经 add 7 个元素
    9. 线程t1 已经 add 8 个元素
    10. 线程t1 已经 add 9 个元素
    11. 线程t1 已经 add 10 个元素
    12. 复制代码

    小结

    这次从一道线程同步的题目,切入 Java 并发与线程同步新机制,使用了 CyclicBarrireCountDownLatchSemaphore 实现了预期的效果。

    其实还有像 ExchangerPhaser 等等并发工具类没有介绍到,他们的底层实现都是继承了 AQS 这个强大的类。


     

  • 相关阅读:
    动手学深度学习-深度学习基础
    皕杰报表在tomcat的server.xml中配置了什么?
    LeetCode Python - 31.下一个排列
    3.Docker 镜像及镜像分层
    一文带你掌握 优先级队列
    探索 Java 线程的创建
    Selenium自动化测试框架工作原理你明白了吗?
    APP自动化测试 ---- Appium介绍及运行原理
    荣耀回应强迫员工购买股份:不实消息;​微软为Win11 引入云操作系统;苹果定于6月6日举行开发者大会 |极客头条
    C++中的继承
  • 原文地址:https://blog.csdn.net/weixin_72753070/article/details/126175754