• JUC并发工具类在大厂的应用场景详解


            jdk提供了比synchronized更加高级的各种同步工具,包括 ReentrantLock Semaphore CountDownLatch 、 CyclicBarrier等,可以实现更加丰富的多线程操作。
    (前三个是重点)

            一. ReentrantLock

            ReentrantLock是一种可重入的独占锁,它允许同一个线程多次获取同一个锁而不会被阻塞。 它的功能类似于synchronized是一种互斥锁,可以保证线程安全。相对于 synchronized, ReentrantLock具备如下特点:

            1.1 可中断

            1.2 可以设置超时时间

            1.3 可以设置为公平锁(默认非公平锁)
            1.4 支持多个条件变量
            1.5 与 synchronized 一样,都支持可重入
    它的主要应用场景是 在多线程环境下对共享资源进行独占式访问,以保证数据的一致性和安全性。
            常用API
    void lock()
    获取锁,调用该方法当前线程会获取锁,当锁获
    得后,该方法返回
    void lockInterruptibly()
    可中断的获取锁,和lock()方法不同之处在于该方
    法会响应中断,即在锁的获取中可以中断当前线
    boolean tryLock()
    尝试非阻塞的获取锁,调用该方法后立即返回。
    如果能够获取到返回true,否则返回false
    boolean tryLock(long time, TimeUnit unit)
    超时获取锁,当前线程在以下三种情况下会被返
    回:
    当前线程在超时时间内获取了锁
    当前线程在超时时间内被中断
    超时时间结束,返回false
    void unlock()
    释放锁
    Condition newCondition()
    获取等待通知组件,该组件和当前的锁绑定,当 前线程只有获取了锁,才能调用该组件的await() 方法,而调用后,当前线程将释放锁

    注意点:   

            1. 默认情况下 ReentrantLock 为非公平锁而非公平锁;
            2. 加锁次数和释放锁次数一定要保持一致,否则会导致线程阻塞或程序异常;
            3. 加锁操作一定要放在 try 代码之前,这样可以避免未加锁成功又释放锁的异常;
            4. 释放锁一定要放在 finally 中,否则会导致线程阻塞。

            1.1 ReentrantLock使用

            独占锁:模拟抢票场景

    1. package com.laoyang.Thread;
    2. import java.util.concurrent.locks.ReentrantLock;
    3. /**
    4. * @author:Kevin
    5. * @create: 2023-10-11 18:17
    6. * @Description: 模拟抢票
    7. */
    8. public class ReentrantLockDemo {
    9. private final ReentrantLock lock = new ReentrantLock();//默认非公平
    10. private static int tickets = 8; // 总票数
    11. public void buyTicket() {
    12. lock.lock(); // 获取锁
    13. try {
    14. if (tickets > 0) { // 还有票
    15. try {
    16. Thread.sleep(10); // 休眠10ms,模拟出并发效果
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. }
    20. System.out.println(Thread.currentThread().getName() + "购买了第" +
    21. tickets-- + "张票");
    22. } else {
    23. System.out.println("票已经卖完了," + Thread.currentThread().getName() +
    24. "抢票失败");
    25. }
    26. } finally {
    27. lock.unlock(); // 释放锁
    28. }
    29. }
    30. public static void main(String[] args) {
    31. ReentrantLockDemo ticketSystem = new ReentrantLockDemo();
    32. for (int i = 1; i <= 10; i++) {
    33. Thread thread = new Thread(() -> {
    34. ticketSystem.buyTicket(); // 抢票
    35. }, "线程" + i);
    36. // 启动线程
    37. thread.start();
    38. }
    39. try {
    40. Thread.sleep(3000);
    41. } catch (InterruptedException e) {
    42. throw new RuntimeException(e);
    43. }
    44. System.out.println("剩余票数:" + tickets);
    45. }
    46. }

            

           1.2 公平锁和非公平锁

            ReentrantLock支持公平锁和非公平锁两种模式:

            公平锁:线程在获取锁时,按照等待的先后顺序获取锁。

            非公平锁:线程在获取锁时,不按照等待的先后顺序获取锁,而是随机获取锁。ReentrantLock默认是非公平锁
            比如买票的时候就有可能出现插队的场景,允许插队就是非公平锁,如下图:

            1.3 结合Condition实现生产者消费者模式

    java.util.concurrent类库中提供Condition类来实现线程之间的协调。调用Condition.await() 方法使
    线程等待,其他线程调用Condition.signal() 或 Condition.signalAll() 方法唤醒等待的线程。
    注意: 调用Condition的await()和signal()方法,都必须在lock保护之内。
    案例:基于ReentrantLock和Condition实现一个简单队列
    1. package com.laoyang.Thread;
    2. import java.util.Random;
    3. import java.util.concurrent.locks.Condition;
    4. import java.util.concurrent.locks.ReentrantLock;
    5. /**
    6. * @author:Kevin
    7. * @create: 2023-10-11 18:29
    8. * @Description: 模拟生产消费场景
    9. */
    10. public class ReentrantLockDemo3 {
    11. public static void main(String[] args) {
    12. // 创建队列
    13. Queue queue = new Queue(5);
    14. //启动生产者线程
    15. new Thread(new Producer(queue)).start();
    16. //启动消费者线程
    17. new Thread(new Customer(queue)).start();
    18. }
    19. public static class Queue {
    20. private Object[] items;
    21. int size = 0;
    22. int takeIndex;
    23. int putIndex;
    24. private ReentrantLock lock;
    25. public Condition notEmpty; //消费者线程阻塞唤醒条件,队列为空阻塞,生产者生产完唤醒
    26. public Condition notFull; //生产者线程阻塞唤醒条件,队列满了阻塞,消费者消费完唤醒
    27. public Queue(int capacity) {
    28. this.items = new Object[capacity];
    29. lock = new ReentrantLock();
    30. notEmpty = lock.newCondition();
    31. notFull = lock.newCondition();
    32. }
    33. public void put(Object value) throws Exception {
    34. //加锁
    35. lock.lock();
    36. try {
    37. while (size == items.length)
    38. // 队列满了让生产者等待
    39. notFull.await();
    40. items[putIndex] = value;
    41. if (++putIndex == items.length)
    42. putIndex = 0;
    43. size++;
    44. notEmpty.signal(); // 生产完唤醒消费者
    45. } finally {
    46. System.out.println("producer生产:" + value);
    47. //解锁
    48. lock.unlock();
    49. }
    50. }
    51. public Object take() throws Exception {
    52. lock.lock();
    53. try {
    54. // 队列空了就让消费者等待
    55. while (size == 0)
    56. notEmpty.await();
    57. Object value = items[takeIndex];
    58. items[takeIndex] = null;
    59. if (++takeIndex == items.length)
    60. takeIndex = 0;
    61. size--;
    62. notFull.signal(); //消费完唤醒生产者生产
    63. return value;
    64. } finally {
    65. lock.unlock();
    66. }
    67. }
    68. }
    69. static class Producer implements Runnable {
    70. private Queue queue;
    71. public Producer(Queue queue) {
    72. this.queue = queue;
    73. }
    74. @Override
    75. public void run() {
    76. try {
    77. // 隔1秒轮询生产一次
    78. while (true) {
    79. Thread.sleep(1000);
    80. queue.put(new Random().nextInt(1000));
    81. }
    82. } catch (Exception e) {
    83. e.printStackTrace();
    84. }
    85. }
    86. }
    87. /**
    88. * 101 * 消费者
    89. * 102
    90. */
    91. static class Customer implements Runnable {
    92. private Queue queue;
    93. public Customer(Queue queue) {
    94. this.queue = queue;
    95. }
    96. @Override
    97. public void run() {
    98. try {
    99. // 隔2秒轮询消费一次
    100. while (true) {
    101. Thread.sleep(2000);
    102. System.out.println("consumer消费:" + queue.take());
    103. }
    104. } catch (Exception e) {
    105. e.printStackTrace();
    106. }
    107. }
    108. }
    109. }
    1.3 应用场景总结
            
    ReentrantLock具体应用场景如下:
            1. 解决多线程竞争资源的问题,例如多个线程同时对同一个数据库进行写操作,可以使用ReentrantLock保证每次
    只有一个线程能够写入。
            2. 实现多线程任务的顺序执行,例如在一个线程执行完某个任务后,再让另一个线程执行任务。
            3. 实现多线程等待/通知机制,例如在某个线程执行完某个任务后,通知其他线程继续执行任务。

            二. Semaphore 

            Semaphore(信号量)是一种用于多线程编程的同步工具,用于控制同时访问某个资源的线程数量

            Semaphore维护了一个计数器,线程可以通过调用acquire()方法来获取Semaphore中的许可证,当计数器为0时,调用acquire()的线程将被阻塞,直到有其他线程释放许可证;线程可以通过调用release()方法来释放Semaphore中的许可证,这会使Semaphore中的计数器增加,从而允许更多的线程访问共享资源。

    2.1 常用方法

    acquire()表示阻塞并获取许可
    tryAcquire()方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
    release()表示释放许可

    2.2 Semaphore实现服务接口限流

    1. package com.laoyang.Thread;
    2. import java.util.concurrent.Executor;
    3. import java.util.concurrent.Executors;
    4. import java.util.concurrent.Semaphore;
    5. /**
    6. * @author:Kevin
    7. * @create: 2023-10-11 18:48
    8. * @Description: 模拟限流操作
    9. */
    10. public class SemaphoreDemo {
    11. //信号量
    12. private static Semaphore semaphore = new Semaphore(2);
    13. private static Executor executor = Executors.newFixedThreadPool(10);
    14. public static void main(String[] args) {
    15. for(int i=0;i<10;i++){
    16. executor.execute(()->getProductInfo2());
    17. }
    18. }
    19. public static String getProductInfo2() {
    20. if(!semaphore.tryAcquire()){
    21. System.out.println("请求被流控了");
    22. return "请求被流控了";
    23. }
    24. try {
    25. System.out.println("请求服务");
    26. Thread.sleep(2000);
    27. } catch (InterruptedException e) {
    28. throw new RuntimeException(e);
    29. }finally {
    30. semaphore.release();
    31. }
    32. return "返回商品详情信息";
    33. }
    34. }

    2.3 应用场景总结

            以下是一些使用Semaphore的常见场景:
            1. 限流:Semaphore可以用于限制对共享资源的并发访问数量,以控制系统的流量。
            2. 资源池:Semaphore可以用于实现资源池,以维护一组有限的共享资源。

            

            三. CountDownLatch 

            CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。

            CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值 (count),由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。

            3.1 常用方法

    void await ()
    调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
    boolean await ( long timeout , TimeUnit unit )
    await() 类似,若等待 timeout 时长后, count 值还是没有变为 0 ,不再等待,继续执行
    void countDown ()
    会将 count 1 ,直至为 0

            3.2 CountDownLatch使用

            模拟实现百米赛跑

    1. package com.laoyang.Thread;
    2. import java.util.concurrent.CountDownLatch;
    3. /**
    4. * @author:Kevin
    5. * @create: 2023-10-11 18:59
    6. * @Description: 模拟实现百米赛跑
    7. */
    8. public class CountDownLatchDemo {
    9. // begin 代表裁判 初始为 1
    10. private static CountDownLatch begin = new CountDownLatch(1);
    11. // end 代表玩家 初始为 8
    12. private static CountDownLatch end = new CountDownLatch(8);
    13. public static void main(String[] args) throws InterruptedException {
    14. for (int i = 1; i <= 8; i++) {
    15. new Thread(new Runnable() {
    16. @Override
    17. public void run() {
    18. // 预备状态
    19. System.out.println("参赛者" + Thread.currentThread().getName() + "已经准备好了");
    20. // 等待裁判吹哨
    21. try {
    22. begin.await();
    23. } catch (InterruptedException e) {
    24. throw new RuntimeException(e);
    25. }
    26. // 开始跑步
    27. System.out.println("参赛者" + Thread.currentThread().getName() + "开始跑步");
    28. try {
    29. Thread.sleep(1000);
    30. } catch (InterruptedException e) {
    31. throw new RuntimeException(e);
    32. }
    33. // 跑步结束, 跑完了
    34. System.out.println("参赛者" + Thread.currentThread().getName() + "到达终点");
    35. // 跑到终点, 计数器就减一
    36. end.countDown();
    37. }
    38. }).start();
    39. }
    40. // 等待 5s 就开始吹哨
    41. Thread.sleep(5000);
    42. System.out.println("开始比赛");
    43. // 裁判吹哨, 计数器减一
    44. begin.countDown();
    45. // 等待所有玩家到达终点
    46. end.await();
    47. System.out.println("比赛结束");
    48. }
    49. }

            多任务完成后合并汇总

            很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check。

    1. package com.laoyang.Thread;
    2. import java.util.concurrent.CountDownLatch;
    3. import java.util.concurrent.ThreadLocalRandom;
    4. /**
    5. * @author:Kevin
    6. * @create: 2023-10-11 19:10
    7. * @Description: 多任务完成后合并汇总
    8. */
    9. public class CountDownLatchDemo2 {
    10. public static void main(String[] args) throws Exception {
    11. CountDownLatch countDownLatch = new CountDownLatch(5);
    12. for (int i = 0; i < 5; i++) {
    13. final int index = i;
    14. new Thread(() -> {
    15. try {
    16. Thread.sleep(1000 + ThreadLocalRandom.current().nextInt(2000));
    17. System.out.println("任务" + index + "执行完成");
    18. countDownLatch.countDown();
    19. } catch (InterruptedException e) {
    20. e.printStackTrace();
    21. }
    22. }).start();
    23. }
    24. // 主线程在阻塞,当计数器为0,就唤醒主线程往下执行
    25. countDownLatch.await();
    26. System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
    27. }
    28. }

    3.3 应用场景总结

    以下是使用CountDownLatch的常见场景:
             1. 并行任务同步:CountDownLatch可以用于协调多个并行任务的完成情况,确保所有任务都完成后再继续执行下
    一步操作。
            2. 多任务汇总:CountDownLatch可以用于统计多个线程的完成情况,以确定所有线程都已完成工作。
            3. 资源初始化:CountDownLatch可以用于等待资源的初始化完成,以便在资源初始化完成后开始使用。

              四. CyclicBarrier

            CyclicBarrier(回环栅栏或循环屏障),是 Java 并发库中的一个同步工具,通过它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。

            4.1 常用方法

    int await ()
    指定数量的线程全部调用 await() 方法时,这些线程不再阻塞
    int await ( long timeout , TimeUnit unit )
    BrokenBarrierException 表示栅栏已经被破坏,破坏的原因可能是其中一个线程 await() 时被中断
    或者超时
    void reset ()
    循环 通过 reset() 方法可以进行重置

            4.2 CyclicBarrier使用

            利用CyclicBarrier的计数器能够重置,屏障可以重复使用的特性,可以支持类似“人满发车”的场景

    1. package com.laoyang.Thread;
    2. import java.util.concurrent.*;
    3. /**
    4. * @author:Kevin
    5. * @create: 2023-10-11 19:20
    6. * @Description:模拟人满发车
    7. */
    8. public class CyclicBarrierDemo {
    9. public static void main(String[] args) {
    10. ExecutorService executorService = Executors.newFixedThreadPool(5);
    11. CyclicBarrier cyclicBarrier = new CyclicBarrier(5,
    12. () -> System.out.println("人齐了,准备发车"));
    13. for (int i = 0; i < 10; i++) {
    14. final int id = i + 1;
    15. executorService.submit(new Runnable() {
    16. @Override
    17. public void run() {
    18. try {
    19. System.out.println(id + "号马上就到");
    20. int sleepMills = ThreadLocalRandom.current().nextInt(2000);
    21. Thread.sleep(sleepMills);
    22. System.out.println(id + "号到了,上车");
    23. cyclicBarrier.await();
    24. } catch (InterruptedException e) {
    25. e.printStackTrace();
    26. } catch (BrokenBarrierException e) {
    27. e.printStackTrace();
    28. }
    29. }
    30. });
    31. }
    32. }
    33. }

    4.3 应用场景总结

    以下是一些常见的 CyclicBarrier 应用场景:
            1. 多线程任务:CyclicBarrier 可以用于将复杂的任务分配给多个线程执行,并在所有线程完成工作后触发后续操
    作。
            2. 数据处理:CyclicBarrier 可以用于协调多个线程间的数据处理,在所有线程处理完数据后触发后续操作。

    4.4 CyclicBarrier 与 CountDownLatch 区别

            1. CountDownLatch 是一次性的,CyclicBarrier 是可循环利用的
            2. CountDownLatch 参与的线程的职责是不一样的,有的在倒计时,有的在等待倒计时结束。CyclicBarrier 参与的线程职责是一样的。

            五. Exchanger 

            Exchanger是一个用于线程间协作的工具类,用于两个线程间交换数据。具体交换数据是通过exchange方法来实现的,如果一个线程先执行exchange方法,那么它会同步等待另一个线程也执行 exchange方法,这个时候两个线程就都达到了同步点,两个线程就可以交换数据。

            5.1 Exchanger使用

            模拟交易场景

    用一个简单的例子来看下Exchanger的具体使用。两方做交易,如果一方先到要等另一方也到了才能交易,交易就是执行exchange方法交换数据。
    1. package com.laoyang.Thread;
    2. import java.util.concurrent.Exchanger;
    3. /**
    4. * @author:Kevin
    5. * @create: 2023-10-11 19:25
    6. * @Description: 模拟交换
    7. */
    8. public class ExchangerDemo {
    9. private static Exchanger exchanger = new Exchanger();
    10. static String goods = "电脑";
    11. static String money = "$4000";
    12. public static void main(String[] args) throws InterruptedException {
    13. System.out.println("准备交易,一手交钱一手交货...");
    14. // 卖家
    15. new Thread(new Runnable() {
    16. @Override
    17. public void run() {
    18. System.out.println("卖家到了,已经准备好货:" + goods);
    19. try {
    20. String money = (String) exchanger.exchange(goods);
    21. System.out.println("卖家收到钱:" + money);
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. }
    25. }
    26. }).start();
    27. Thread.sleep(3000);
    28. // 买家
    29. new Thread(new Runnable() {
    30. @Override
    31. public void run() {
    32. try {
    33. System.out.println("买家到了,已经准备好钱:" + money);
    34. String goods = (String) exchanger.exchange(money);
    35. System.out.println("买家收到货:" + goods);
    36. } catch (Exception e) {
    37. e.printStackTrace();
    38. }
    39. }
    40. }).start();
    41. }
    42. }

            5.2 应用场景总结

            Exchanger 可以用于各种应用场景,具体取决于具体的 Exchanger 实现。常见的场景包括:

            1. 数据交换:在多线程环境中,两个线程可以通过 Exchanger 进行数据交换。

            2. 数据采集:在数据采集系统中,可以使用 Exchanger 在采集线程和处理线程间进行数据交换。

            六. Phaser  

            Phaser(阶段协同器)是一个Java实现的并发工具类,用于协调多个线程的执行。它提供了一些方便的方法来管理多个阶段的执行,可以让程序员灵活地控制线程的执行顺序和阶段性的执行。Phaser可以被视为CyclicBarrier和CountDownLatch的进化版,它能够自适应地调整并发线程数,可以动态地增加或减少参与线程的数量。所以Phaser特别适合使用在重复执行或者重用的情况。

  • 相关阅读:
    网络工程师入门必懂华为认证体系,附系统学习路线分享
    第二章 Scala变量和数据类型
    上传本地包到私有maven仓库
    朴素贝叶斯法
    Linux 大页内存 Huge Pages 虚拟内存
    C 语言程序的执行流程
    cocoeval 解析
    Canvas 低代码工具,多人自研开发全流程优化|ModelWhale 版本更新
    蓝桥杯真题:纯质数
    Meter接口测试使用教程哪里找?
  • 原文地址:https://blog.csdn.net/qq_67801847/article/details/133775775