• 多线程基础(三)JUC并发包:Lock锁、CountDownLath、CyclicBarrier、Semaphore、LockSupport


    一、Lock锁

    1. 可重入锁 ReentrantLock

    我们可以使用 ReentrantLock 来替代 Synchronized锁,实现方法为:

    1. Lock lock = new ReentrantLock();
    2. void m2 (){
    3. try{
    4. // 加锁
    5. lock.lock();
    6. for(int i=0;i<10000;i++){
    7. count.incrementAndGet(); // 相当于线程安全的 count++
    8. }
    9. }catch (Exception e){
    10. e.printStackTrace();
    11. }finally {
    12. // 释放锁
    13. lock.unlock();
    14. }
    15. }

    使用Synchronized是自动解锁的。但是使用lock锁,必须使用try-catch-finally包裹,在try中加锁,在finally中释放锁。

    2. ReadWriteLock 读写锁

    允许同一时刻多个读线程访问,但所有写线程均被阻塞。读写分离,并发性提升。

    java中实现类的读写锁为 ReentrantReadWriteLock 类。

    1. import java.util.Random;
    2. import java.util.concurrent.locks.Lock;
    3. import java.util.concurrent.locks.ReadWriteLock;
    4. import java.util.concurrent.locks.ReentrantLock;
    5. import java.util.concurrent.locks.ReentrantReadWriteLock;
    6. public class ReadWriteLockTest {
    7. static Lock lock = new ReentrantLock();
    8. private static int value;
    9. static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    10. static Lock readLock = readWriteLock.readLock();
    11. static Lock writeLock = readWriteLock.writeLock();
    12. public static void read (Lock lock){
    13. try {
    14. lock.lock();
    15. // 模拟读操作
    16. Thread.sleep(1000);
    17. System.out.println("read over!");
    18. } catch (InterruptedException e) {
    19. e.printStackTrace();
    20. } finally {
    21. lock.unlock();
    22. }
    23. }
    24. public static void write (Lock lock,int v){
    25. try {
    26. lock.lock();
    27. // 模拟写操作
    28. Thread.sleep(1000);
    29. value = v;
    30. System.out.println("write over!");
    31. } catch (InterruptedException e) {
    32. e.printStackTrace();
    33. } finally {
    34. lock.unlock();
    35. }
    36. }
    37. public static void main(String[] args) {
    38. // 使用 互斥锁(排他锁) lock,需要一个一个执行线程
    39. Runnable readR = () -> read(lock);
    40. Runnable writeR = () -> write(lock,new Random().nextInt());
    41. for(int i=0;i<18;i++) new Thread(readR).start();
    42. for(int i=0;i<2;i++) new Thread(writeR).start();
    43. }
    44. }

    这里我们使用 互斥锁 lock 来去模拟读写操作时会发现,每个线程一个一个执行,只有当前线程执行完毕,下一个线程抢到资源才继续执行。

    如果我们使用读写锁进行模拟操作,会发现,读操作几乎是在一瞬间全部执行完毕,没有等待。

    1. public static void main(String[] args) {
    2. // 使用 读写锁 readWriteLock,所有的读写操作并发执行,读写不需要等待
    3. Runnable readLockR = () -> read(readLock);
    4. Runnable writeLockR = () -> write(writeLock,new Random().nextInt());
    5. for(int i=0;i<18;i++) new Thread(readLockR).start();
    6. for(int i=0;i<2;i++) new Thread(writeLockR).start();
    7. }

    3. Lock提供了Synchronized 不具备的特性:

    (1)我们可以使用 tryLock 进行尝试锁定,不管锁定与否,方法都将继续执行可以根据 tryLock 的返回值来判定是否锁定,也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的释放。

    (2)使用 lock.lockInterruptibly(); 方法,可以将当前获得锁的线程中断,抛出异常并释放锁,需要配合 ti.interrupt(); 方法使用。

    1. Thread ti = new Thread(() -> {
    2. try {
    3. lock.lockInterruptibly();
    4. index.compareAndSet(10, 11);
    5. index.compareAndSet(11, 10);
    6. System.out.println(Thread.currentThread().getName()+": 10->11->10");
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. });
    11. ti.interrupt();

    (3)初始化一个公平锁

    static Lock lock = new ReentrantLock(true);

    参数为 true表示为公平锁。当前所有线程在等待队列中,这个时候来了一个新的线程,如果是公平锁,则它会去查看是否有队列在等待,如果有,则让这个新的线程也加入队列等待。如果不是公平锁,这个线程会去抢占资源。

    二 、CountDownLath

    CountDownLath:允许其他多个线程等待当前latch线程,只有当前线程的latch计数器为0时,才放行,让其他线程执行。

    latch.countDown():每执行一次 latch.countDown,计算器减一,直到为0。

    latch.await(): latch.await相当于一道门,只有 latch 的计数器为0的时候才放行。

    1. import java.util.concurrent.CountDownLatch;
    2. public class CountDownLathTest {
    3. private static void usingCountDownLatch() {
    4. Thread[] threads = new Thread[100];
    5. // 定义一个CountDownLatch,给定计数器为 threads.length
    6. CountDownLatch latch = new CountDownLatch(threads.length);
    7. for (int i = 0; i < threads.length; i++) {
    8. threads[i] = new Thread(() -> {
    9. int result = 0;
    10. for (int j = 0; j < 100; j++) result += j;
    11. // 每执行一次 latch.countDown,计算器减一,直到为0
    12. latch.countDown();
    13. System.out.println(Thread.currentThread().getName());
    14. });
    15. }
    16. for (int i = 0; i < threads.length; i++) {
    17. threads[i].start();
    18. }
    19. try {
    20. // latch.await相当于一道门,只有 latch 的计数器为0的时候才放行
    21. latch.await();
    22. } catch (InterruptedException e) {
    23. e.printStackTrace();
    24. }
    25. System.out.println("latch计数器为0,放行了") ;
    26. }
    27. public static void main(String[] args) {
    28. usingCountDownLatch();
    29. }
    30. }

    CountDownLath 的作用和 join() 作用一样,都是让其他线程等待,先执行指定线程。

    1. private static void usingJoin() {
    2. Thread[] threads = new Thread[100];
    3. for (int i = 0; i < threads.length; i++) {
    4. threads[i] = new Thread(() -> {
    5. int result = 0;
    6. for (int j = 0; j < 100; j++) result += j;
    7. System.out.println(Thread.currentThread().getName());
    8. });
    9. }
    10. for (int i = 0; i < threads.length; i++) {
    11. threads[i].start();
    12. }
    13. for (int i = 0; i < threads.length; i++) {
    14. try {
    15. // 将线程循环放入,确保执行顺序
    16. threads[i].join();
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. }
    20. }
    21. System.out.println("join顺序线程执行完毕,到我了") ;
    22. }

    三、CyclicBarrier

    CyclicBarrier:可循环使用屏障,定义一个指定瓶颈的CyclicBarrier,当调用 await方法到达这个瓶颈后,就去执行相应的瓶颈方法。

    cyclicBarrier.await():如果没有达到计数器瓶颈,就等待

    cyclicBarrier.reset():使计数器重置

    1. public static void main(String[] args) {
    2. // CyclicBarrier 第二个参数作用是,当达到瓶颈 20 后,去做一些事情
    3. CyclicBarrier cyclicBarrier = new CyclicBarrier(20, new Runnable() {
    4. @Override
    5. public void run() {
    6. System.out.println("达到了瓶颈,重置");
    7. }
    8. });
    9. for (int i=0;i<100;i++){
    10. int finalI = i;
    11. new Thread(() -> {
    12. try {
    13. // 每执行一次cyclicBarrier.await(),相当与 cyclicBarrier的计数器+1,直到等于20,执行瓶颈方法
    14. cyclicBarrier.await();
    15. } catch (BrokenBarrierException e) {
    16. e.printStackTrace();
    17. } catch (InterruptedException e) {
    18. e.printStackTrace();
    19. }
    20. }).start();
    21. }
    22. }

    CyclicBarrier使用场景:

            1. 限流,当请求数量达到一定数量时,进行限流。

            2. 复杂操作数据库、网络、文件

            3. 并发执行线程-操作线程-操作

     四、Semaphore 信号量

    Semaphore :用来控制同时访问资源的线程数量。

    可以指定是否为公平锁。

    公平锁:获取锁时,如果获取的顺序符合请求的绝对时间顺序,则为公平锁,FIFO。

    1. // 允许 n 个线程同时执行
    2. Semaphore semaphore = new Semaphore(1);
    3. // 可以指定是否为公平锁
    4. Semaphore semaphore2 = new Semaphore(2,true);
    5. new Thread(() -> {
    6. try {
    7. semaphore.acquire();
    8. System.out.println("T1 running ...");
    9. Thread.sleep(200);
    10. System.out.println("T1 running ...");
    11. semaphore.release();
    12. } catch (InterruptedException e) {
    13. e.printStackTrace();
    14. }
    15. }).start();
    16. new Thread(() -> {
    17. try {
    18. semaphore.acquire();
    19. System.out.println("T2 running ...");
    20. Thread.sleep(200);
    21. System.out.println("T2 running ...");
    22. semaphore.release();
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. }).start();
    27. }

    T1 running ...
    T1 running ...
    T2 running ...
    T2 running ...

    如果我们把  new Semaphore(1); 改为 new Semaphore(2); 执行结果:

    T1 running ...
    T2 running ...
    T1 running ...
    T2 running ...

    可以看到,为1时,T2只能等 T1执行完毕,才开始执行。为2时,T1、T2可以同时执行。

    主要使用场景为限流,类比高速收费站,5个车道2个收费站。

     五、Exchanger 交换器

     Exchanger:用于线程通信(线程交换数据)的工具类,提供一个同步点,当第一个线程执行了 exchanger()方法,它会一直等待下一个线程也执行 exchanger()方法,然后进行交换数据。可以设置最大等待时间。

    1. import java.util.concurrent.Exchanger;
    2. public class ExchangerTest {
    3. static Exchanger exchanger = new Exchanger<>();
    4. public static void main(String[] args) {
    5. new Thread(() ->{
    6. String s = "T1";
    7. try {
    8. // 设置多大等待时间
    9. // s = exchanger.exchange(s,2000, TimeUnit.SECONDS);
    10. s = exchanger.exchange(s);
    11. } catch (InterruptedException e) {
    12. e.printStackTrace();
    13. }
    14. System.out.println(Thread.currentThread().getName()+" "+s);
    15. },"t1").start();
    16. new Thread(() ->{
    17. String s = "T2";
    18. try {
    19. s = exchanger.exchange(s);
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. }
    23. System.out.println(Thread.currentThread().getName()+" "+s);
    24. },"t2").start();
    25. }
    26. }

    t2 T1
    t1 T2

    使用场景如:游戏中俩个人交换装备。 

     六、LockSupport 工具

     LockSupport:定义了一组静态方法,提供了最基本的线程阻塞和唤醒功能。

    1. import java.util.concurrent.TimeUnit;
    2. import java.util.concurrent.locks.LockSupport;
    3. public class LockSupportTest {
    4. public static void main(String[] args) {
    5. Thread t = new Thread(() ->{
    6. for(int i =0;i<10;i++){
    7. System.out.println(i);
    8. if(i == 5){
    9. // 阻塞当前线程
    10. LockSupport.park();
    11. }
    12. try {
    13. TimeUnit.SECONDS.sleep(1);
    14. } catch (InterruptedException e) {
    15. e.printStackTrace();
    16. }
    17. }
    18. });
    19. t.start();
    20. // 唤醒指定线程
    21. // LockSupport.unpark(t);
    22. }
    23. }

    0
    1
    2
    3
    4
    5

     由于 LockSupport.park(); 当循环到5的时候,阻塞当前线程。

    将 LockSupport.unpark(t); 打开后,唤醒线程,完整打印。

    0
    1
    2
    3
    4
    5
    6
    7
    8
    9

    当我们查看 part()方法的实现后发现,这个方法是 Unsafe 类实现的。还记得之前 CAS 中说到的这个类吗?

            所有以 AtomXXX开头的类,底层都是使用cas方法,并是通过 Unsafe类实现的。

    Unsafe类的出现等于c/c++的指针,给 java语言赋予了原来 C/C++实现的指针方法。

    1. /**
    2. * Disables the current thread for thread scheduling purposes unless the
    3. * permit is available.
    4. *
    5. *

      If the permit is available then it is consumed and the call

    6. * returns immediately; otherwise the current thread becomes disabled
    7. * for thread scheduling purposes and lies dormant until one of three
    8. * things happens:
    9. *
    10. *
      • *
      • *
      • Some other thread invokes {@link #unpark unpark} with the
    11. * current thread as the target; or
    12. *
    13. *
    14. Some other thread {@linkplain Thread#interrupt interrupts}
  • * the current thread; or
  • *
  • *
  • The call spuriously (that is, for no reason) returns.
  • *
  • *
  • *

    This method does not report which of these caused the

  • * method to return. Callers should re-check the conditions which caused
  • * the thread to park in the first place. Callers may also determine,
  • * for example, the interrupt status of the thread upon return.
  • */
  • public static void park() {
  • UNSAFE.park(false, 0L);
  • }
  • 翻译:

    出于线程调度目的禁用当前线程,除非允许可用。
    如果许可证可用,则将其使用,呼叫立即返回;否则,当前线程将出于线程调度目的而被禁用并处于休眠状态,直到发生以下三种情况之一:
    其他一些线程以当前线程作为目标进行调用 unpark ;或者
    其他线程 中断 当前线程;或
    虚假调用(即无缘无故)返回。
    此方法 不会 报告哪些导致该方法返回。调用方应重新检查导致线程首先停放的条件。例如,调用方还可以确定线程在返回时的中断状态

  • 相关阅读:
    1440_TC275 DataSheet阅读笔记1
    《Effective C++》条款21
    【Vue 开发实战】生态篇 # 19:Vue Router的使用场景
    ATC‘22顶会论文RunD:高密高并发的轻量级 Serverless 安全容器运行时 | 龙蜥技术
    xgboost配置GPU
    外贸人必备的跨境电商常见专有名词!
    什么是SNMP监控
    目标检测算法——YOLOv5/YOLOv7改进之结合无参注意力SimAM(涨点神器)
    大一学生《Web前端网课作业》基于HTML+CSS自我介绍网页设计与制作
    Linux之iostat溯源diskstats
  • 原文地址:https://blog.csdn.net/m0_47743175/article/details/130795809