• Day125.JUC:线程间通信(Conditon)、并发容器类(CopyOnWrite)、JUC强大辅助类、Callable


    目录

    一、线程间通信

    线程间通信改造成Lock版  Condition

    定制化调用通信 Condition

    二、并发容器类 (解决集合安全问题)

    CopyOnWrite 写时拷贝技术

    三、JUC 并发辅助工具类

    1、CountDownLatch (倒计时线程控制)

    2、CyclicBarrier (循环栅栏)

    3、Semaphore (信号量)

    四、Callable接口、

    FutureTask 未来任务


    一、线程间通信

    线程按顺序执行

    面试题:两个线程,一个线程打印1-52,另一个打印字母A-Z打印顺序为12A34B...5152Z,要求用线程间通信

    1、简化问题:两个线程操作一个初始值为0的变量,实现一个线程对变量增加1,一个线程对变量减少1,交替10轮。

    2、实现方案-线程间通信模型

    • 生产者+消费者

    • 通知等待唤醒机制 (wait、notify)

    3、多线程编程模板中线程   操作  资源类判断、干活、通知高内聚低耦合

    1. // 实现一个线程对该变量加1,一个线程对该变量减1, 交替 10 轮!
    2. class ShareDataOne{
    3. private Integer number = 0;
    4. //加一
    5. public synchronized void incre() {
    6. try {
    7. //1.判断
    8. if (number!=0){
    9. this.wait();
    10. }
    11. //2.干活
    12. number++;
    13. System.out.println(Thread.currentThread().getName()+":"+number);
    14. //3.通知
    15. this.notifyAll();
    16. } catch (InterruptedException e) {
    17. e.printStackTrace();
    18. }
    19. }
    20. //减一
    21. public synchronized void decre(){
    22. try {
    23. //1.判断
    24. if (number!=1){
    25. this.wait();
    26. }
    27. //2.干活
    28. number--;
    29. System.out.println(Thread.currentThread().getName()+":"+number);
    30. //3.通知
    31. this.notifyAll();
    32. } catch (Exception e) {
    33. e.printStackTrace();
    34. }
    35. }
    36. }
    37. /*
    38. * 现在两个线程,
    39. * 可以操作初始值为零的一个变量,
    40. * 实现一个线程对该变量加1,一个线程对该变量减1,
    41. * 交替,来10轮。
    42. *
    43. * 笔记:Java里面如何进行工程级别的多线程编写
    44. * 1 多线程变成模板(套路)-----上
    45. * 1.1 线程 操作 资源类
    46. * 1.2 高内聚 低耦合
    47. * 2 多线程变成模板(套路)-----中
    48. * 2.1 判断
    49. * 2.2 干活
    50. * 2.3 通知
    51. */
    52. public class NotifyWaitDemo {
    53. public static void main(String[] args) {
    54. ShareDataOne shareDataOne = new ShareDataOne();
    55. new Thread(()->{
    56. for (int i = 0; i < 10; i++) {
    57. shareDataOne.incre();
    58. }
    59. },"AAA").start();
    60. new Thread(()->{
    61. for (int i = 0; i < 10; i++) {
    62. shareDataOne.decre();
    63. }
    64. },"BBB").start();
    65. }
    66. }

    换成4个线程会导致错误,虚假唤醒问题

    原因:在java多线程判断时,不能用if,程序出事出在了判断上面。

    注意,消费者被唤醒后是从wait()方法(被阻塞的地方)后面执行,而不是重新从同步块开头,没有进行条件判断。



    线程间通信改造成Lock版  Condition

    对标实现

     

    1. class ShareDataOne{
    2. private Integer number = 0;
    3. Lock lock = new ReentrantLock(); // 初始化lock锁
    4. Condition cd = lock.newCondition(); // 初始化condition对象
    5. //加一
    6. public void incre() {
    7. lock.lock();
    8. try {
    9. //1.判断
    10. while (number!=0){
    11. //this.wait();
    12. cd.await(); //睡眠
    13. }
    14. //2.干活
    15. number++;
    16. System.out.println(Thread.currentThread().getName() + ":" + number);
    17. //3.通知
    18. //this.notifyAll();
    19. cd.signalAll(); //唤醒
    20. } catch (InterruptedException e) {
    21. e.printStackTrace();
    22. } finally {
    23. lock.unlock();
    24. }
    25. }
    26. //减一
    27. public void decre(){
    28. lock.lock();
    29. try {
    30. //1.判断
    31. while (number!=1){
    32. //this.wait();
    33. cd.await(); //睡眠
    34. }
    35. //2.干活
    36. number--;
    37. System.out.println(Thread.currentThread().getName() + ":" + number);
    38. //3.通知
    39. //this.notifyAll();
    40. cd.signalAll(); //唤醒
    41. } catch (InterruptedException e) {
    42. e.printStackTrace();
    43. } finally {
    44. lock.unlock();
    45. }
    46. }
    47. }

    定制化调用通信 Condition

    案例:

    多线程之间按顺序调用,实现A->B->C。三个线程启动,要求如下:

    AA打印5次,BB打印10次,CC打印15次;

    接着

    AA打印5次,BB打印10次,CC打印15次;

    。。。打印10轮

    分析实现方式:

    • 有一个锁Lock,3把钥匙Condition
    • 有顺序通知(切换线程),需要有标识位
    • 判断标志位
    • 输出线程名 + 内容
    • 修改标识符,通知下一个
    1. /*多线程之间按顺序调用,实现A->B->C。三个线程启动,要求如下:
    2. AA打印5次,BB打印10次,CC打印15次;
    3. 接着
    4. AA打印5次,BB打印10次,CC打印15次;
    5. 。。。打印10轮*/
    6. // 资源类:
    7. class ShareDataTwo{
    8. // 声明一些变量
    9. private int flag = 1; // 【flag=1 AA flag=2 BB flag=3 CC】
    10. private Lock lock = new ReentrantLock();
    11. private Condition c1 = lock.newCondition();
    12. private Condition c2 = lock.newCondition();
    13. private Condition c3 = lock.newCondition();
    14. // 定义打印方法!
    15. public void print5(int total){
    16. lock.lock(); //上锁
    17. try {
    18. while (flag!=1){ //判断
    19. c1.await();
    20. }
    21. //干活:循环打印
    22. for (int i = 0; i <= 5; i++) {
    23. System.out.println(Thread.currentThread().getName() +
    24. "\t"+i+"\t第"+total+"轮");
    25. }
    26. //通知,修改标识位
    27. flag=2;
    28. c2.signal();
    29. } catch (InterruptedException e) {
    30. e.printStackTrace();
    31. } finally {
    32. lock.unlock(); //解锁
    33. }
    34. }
    35. public void print10(int total){
    36. lock.lock(); //上锁
    37. try {
    38. while (flag!=2){ //判断
    39. c2.await();
    40. }
    41. //干活:循环打印
    42. for (int i = 0; i <= 10; i++) {
    43. System.out.println(Thread.currentThread().getName() +
    44. "\t"+i+"\t第"+total+"轮");
    45. }
    46. //通知,修改标识位
    47. flag=3;
    48. c3.signal();
    49. } catch (InterruptedException e) {
    50. e.printStackTrace();
    51. } finally {
    52. lock.unlock(); //解锁
    53. }
    54. }
    55. public void print15(int total){
    56. lock.lock(); //上锁
    57. try {
    58. while (flag!=3){ //判断
    59. c3.await();
    60. }
    61. //干活:循环打印
    62. for (int i = 0; i <= 15; i++) {
    63. System.out.println(Thread.currentThread().getName() +
    64. "\t"+i+"\t第"+total+"轮");
    65. }
    66. //通知,修改标识位
    67. flag=1;
    68. c1.signal();
    69. } catch (InterruptedException e) {
    70. e.printStackTrace();
    71. } finally {
    72. lock.unlock();//解锁
    73. }
    74. }
    75. }
    76. public class ThreadOrderAccess {
    77. public static void main(String[] args) {
    78. // 线程操作资源类
    79. ShareDataTwo shareDataTwo = new ShareDataTwo();
    80. // 线程:
    81. new Thread(()->{
    82. for (int i = 0; i < 10; i++) {
    83. shareDataTwo.print5(i+1);
    84. }
    85. },"AA").start();
    86. new Thread(()->{
    87. for (int i = 0; i < 10; i++) {
    88. shareDataTwo.print10(i+1);
    89. }
    90. },"BB").start();
    91. new Thread(()->{
    92. for (int i = 0; i < 10; i++) {
    93. shareDataTwo.print15(i+1);
    94. }
    95. },"CC").start();
    96. }
    97. }

    面试题:

    1. //面试题:两个线程,一个线程打印1-52,另一个打印字母A-Z打印顺序为12A34B...5152Z,要求用线程间通信
    2. class ShareDataThree{
    3. private int flag = 1;
    4. private Integer number = 1;
    5. private char letter = 'A';
    6. private Lock lock = new ReentrantLock();
    7. Condition c1 = lock.newCondition();
    8. Condition c2 = lock.newCondition();
    9. public void printNum(){
    10. lock.lock(); //上锁
    11. try {
    12. //判断
    13. while (flag!=1){
    14. c1.await(); //睡眠
    15. }
    16. //干活
    17. System.out.print(number++);
    18. System.out.print(number++);
    19. //通知 修改标识,唤醒
    20. flag = 2;
    21. c2.signal();
    22. } catch (InterruptedException e) {
    23. e.printStackTrace();
    24. } finally {
    25. lock.unlock(); //解锁
    26. }
    27. }
    28. public void printLetter(){
    29. lock.lock(); //上锁
    30. try {
    31. //判断
    32. while (flag!=2){
    33. c2.await(); //睡眠
    34. }
    35. //干活
    36. System.out.print(letter++ + " ");
    37. //通知 修改标识,唤醒
    38. flag = 1;
    39. c1.signal();
    40. } catch (InterruptedException e) {
    41. e.printStackTrace();
    42. } finally {
    43. lock.unlock(); //解锁
    44. }
    45. }
    46. }
    47. public class Test {
    48. public static void main(String[] args) {
    49. ShareDataThree shareDataThree = new ShareDataThree();
    50. new Thread(()->{
    51. for (int i = 0; i < 26; i++) {
    52. shareDataThree.printNum();
    53. }
    54. },"num").start();
    55. new Thread(()->{
    56. for (int i = 0; i < 26; i++) {
    57. shareDataThree.printLetter();
    58. }
    59. },"let").start();
    60. }
    61. }

    二、并发容器类 (解决集合安全问题)

    面试题:请举例说明集合类是不安全的。

    方法一:代码验证

    1. public class NotSafeDemo {
    2. public static void main(String[] args) {
    3. //List list = new ArrayList<>();
    4. //new Vector(); 效率太低
    5. //Collections.synchronizedList(new ArrayList<>()); 效率太低
    6. List list = new CopyOnWriteArrayList<>();//JUC
    7. for (int i = 0; i < 20; i++) {
    8. new Thread(()->{
    9. list.add(UUID.randomUUID().toString().substring(0,8));
    10. System.out.println(list);//读取遍历数据时数据被修改,迭代器报错
    11. },String.valueOf(i)).start();
    12. }
    13. }
    14. }
    15. 出现并发修改异常 

      方法二:看源码

      Redis 和 Memcached 区别?

      CopyOnWrite 写时拷贝技术

      所有读的线程可以并发去读(共享锁),所有写的线程可以单独去写(独占锁)。

      CopyOnWrite容器(简称COW容器)即写时拷贝的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器

      CopyOnWriteArraySet、CopyOnWriteArrayList; ConcurrentHashMap

      优点:保证高效的读取(共享),安全的写入(独占) (用于读多写少的并发场景:搜索框黑名单)

      缺点:占用内存难问题,只能保证最终一致;

      源码片段:

      扩展:HashSet是HashMap 的Kay部分

      三、JUC 并发辅助工具类

      JUC的多线程辅助类非常多,这里我们介绍三个:

      • CountDownLatch(倒计数器)
      • CyclicBarrier(循环栅栏)
      • Semaphore(信号量)

      1、CountDownLatch (倒计时线程控制)

      CountDownLatch是一个非常实用的多线程控制工具类,应用非常广泛。

      例如:在手机上安装一个应用程序,假如需要5个子进程检查服务授权,那么主进程会维护一个计数器,初始计数就是5。用户每同意一个授权该计数器减1,当计数减为0时,主进程才启动,否则就只有阻塞等待了。

      CountDownLatch中count down是倒数的意思,latch则是门闩的含义。整体含义可以理解为倒数的门栓,似乎有一点“三二一,芝麻开门”的感觉。CountDownLatch的作用也是如此。

      1. new CountDownLatch(int count) //实例化一个倒计数器,count指定初始计数
      2. countDown() // 每调用一次,计数减一
      3. await() //等待,当计数减到0时,阻塞线程(可以是一个,也可以是多个)并行执行

      案例:6个同学陆续离开教室后值班同学才可以关门。  

      1. //案例:6个同学陆续离开教室后班长才可以关门。
      2. public class CountDownLatchDemo {
      3. public static void main(String[] args) throws InterruptedException {
      4. CountDownLatch cd = new CountDownLatch(6);
      5. for (int i = 1; i <= 6; i++) {
      6. new Thread(()->{
      7. try {
      8. // 每个同学墨迹几秒钟
      9. TimeUnit.SECONDS.sleep(new Random().nextInt(5));
      10. System.out.println(Thread.currentThread().getName() + " 同学出门了");
      11. //计数器减一
      12. cd.countDown();
      13. } catch (InterruptedException e) {
      14. e.printStackTrace();
      15. }
      16. },String.valueOf(i)).start();
      17. }
      18. //计数器不为0一致阻塞
      19. cd.await();
      20. System.out.println("班长锁门了");
      21. }
      22. }

      面试:CountDownLatch 与 join 方法的区别

      调用一个子线程的 join()方法后,该线程会一直被阻塞直到该线程运行完毕。而 CountDownLatch 则使用计数器允许子线程运行完毕或者运行中时候递减计数,也就是 CountDownLatch 可以在子线程运行任何时候让 await 方法返回而不一定必须等到线程结束;另外使用线程池来管理线程时候一般都是直接添加 Runnable 到线程池这时候就没有办法在调用线程的 join 方法了,countDownLatch 相比 Join 方法让我们对线程同步有更灵活的控制。

      练习:秦灭六国,一统华夏。(模仿课堂案例,练习枚举类的使用)

      2、CyclicBarrier (循环栅栏)

      CyclicBarrier 的中文意思是“循环栅栏”,可循环利用的屏障。该命令只在每个屏障点运行一次。若在所有参与线程之前更新共享状态,此屏障操作很有用

      常用方法:

      • CyclicBarrier(int parties, Runnable barrierAction) 创建一个CyclicBarrier实例,parties指定参与相互等待的线程数,barrierAction一个可选的Runnable命令,该命令只在每个屏障点运行一次,可以在执行后续业务之前共享状态。该操作由最后一个进入屏障点的线程执行
      • CyclicBarrier(int parties) 创建一个CyclicBarrier实例,parties指定参与相互等待的线程数。
      • await() 该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态,直到所有线程都到达屏障点,当前线程才会被唤醒。

      注意:所有的"过关了"都是由最后到达await方法的线程执行打印的

      1. public class CyclicBarrierDemo {
      2. //集齐七颗龙珠召唤神龙
      3. private static void test() {
      4. //完成的线程数,要做的事
      5. CyclicBarrier cb = new CyclicBarrier(7,()->{
      6. System.out.println("集齐七颗龙珠召唤神龙");
      7. });
      8. for (int i = 1; i <= 7; i++) {
      9. new Thread(()->{
      10. try {
      11. System.out.println(Thread.currentThread().getName()+"星龙珠被收集");
      12. //完成任务阻塞
      13. cb.await();
      14. } catch (InterruptedException e) {
      15. e.printStackTrace();
      16. } catch (BrokenBarrierException e) {
      17. e.printStackTrace();
      18. }
      19. },String.valueOf(i)).start();
      20. }
      21. }
      22. public static void main(String[] args) {
      23. //test();
      24. //组队打Boss
      25. CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
      26. System.out.println(Thread.currentThread().getName() + " 过关了");
      27. });
      28. for (int i = 0; i < 3; i++) {
      29. new Thread(()->{
      30. try {
      31. System.out.println(Thread.currentThread().getName() + " 开始第一关");
      32. TimeUnit.SECONDS.sleep(new Random().nextInt(4));
      33. System.out.println(Thread.currentThread().getName() + " 开始打boss");
      34. cyclicBarrier.await();
      35. System.out.println(Thread.currentThread().getName() + " 开始第二关");
      36. TimeUnit.SECONDS.sleep(new Random().nextInt(4));
      37. System.out.println(Thread.currentThread().getName() + " 开始打boss");
      38. cyclicBarrier.await();
      39. System.out.println(Thread.currentThread().getName() + " 开始第三关");
      40. TimeUnit.SECONDS.sleep(new Random().nextInt(4));
      41. System.out.println(Thread.currentThread().getName() + " 开始打boss");
      42. cyclicBarrier.await();
      43. } catch (Exception e) {
      44. e.printStackTrace();
      45. }
      46. }, String.valueOf(i)).start();
      47. }
      48. }
      49. }

      面试:CyclicBarrier和CountDownLatch的区别?

      CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,可以使用多次,所以CyclicBarrier能够处理更为复杂的场景;CountDownLatch允许一个或多个线程等待一组事件的产生,而CyclicBarrier用于等待其他线程运行到栅栏位置

      CyclicBarrier可以处理更复杂的业务。

      3、Semaphore (信号量)

      Semaphore翻译成字面意思为 信号量,Semaphore可以控制同时访问的线程个数。非常适合需求量大,而资源又很紧张的情况。比如给定一个资源数目有限的资源池,假设资源数目为N,每一个线程均可获取一个资源,但是当资源分配完毕时,后来线程需要阻塞等待,直到前面已持有资源的线程释放资源之后才能继续。

      信号量主要用于两个目的:
      1. 多个共享资源的互斥使用。
      2. 用于并发线程数的控制。保护一个关键部分不要一次输入超过N个线程。

      类比场景:停车场、YY软件

      1. public Semaphore(int permits) // 构造方法,permits指资源数目(信号量)
      2. public void acquire() throws InterruptedException // 占用资源,当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。
      3. public void release() // (释放)实际上会将信号量的值加1,然后唤醒等待的线程。

      6辆汽车抢占3个停车位:

      1. //6辆汽车抢占3个停车位
      2. public class SemaphoreDemo {
      3. public static void main(String[] args) {
      4. // 初始化信号量,3个车位
      5. Semaphore sp = new Semaphore(3);
      6. // 6个线程,模拟6辆车
      7. for (int i = 0; i < 6; i++) {
      8. new Thread(()->{
      9. try {
      10. // 抢占一个停车位
      11. sp.acquire();
      12. System.out.println(Thread.currentThread().getName() + " 抢到了一个停车位!!");
      13. // 停一会儿车
      14. TimeUnit.SECONDS.sleep(new Random().nextInt(10));
      15. System.out.println(Thread.currentThread().getName() + " 离开停车位!!");
      16. // 开走,释放一个停车位
      17. sp.release();
      18. } catch (InterruptedException e) {
      19. e.printStackTrace();
      20. }
      21. }, String.valueOf(i)).start();
      22. }
      23. }
      24. }

      四、Callable接口、

      Callable 是 Runnable 的增强

      Thread类、Runnable接口使得多线程编程简单直接。

      从java5开始,提供了Callable接口,是Runable接口的增强版。用Call()方法作为线程的执行体,增强了之前的run()方法。因为call方法可以有返回值,也可以声明抛出异常

      Callable 和 Runnable 区别:1.有返回值(泛型); 2.可以抛异常; 3.方法名不同

      1. //Callable 和 Runnable 区别
      2. public class MyRunnable implements Runnable{
      3. @Override
      4. public void run() {
      5. System.out.println("MyRunnable");
      6. }
      7. }
      8. //1.有返回值(泛型) 2.可以抛异常 3.方法名不同
      9. class MyCallable implements Callable{
      10. @Override
      11. public Integer call() throws Exception {
      12. System.out.println(Thread.currentThread().getName()+"Callable");
      13. return null;
      14. }
      15. }

      使用Callable (查看API文档)

      java1.5之后才有的juc,所以Thread方法并无法兼容

      1. public class CallableDemo{
      2. public static void main(String[] args) {
      3. //1.直接替换 不行
      4. //new Thread(new MyCallable(),"zhang3").start();
      5. //2.通过中间类 FutureTask
      6. FutureTask futureTask1 = new FutureTask<>(new MyCallable());
      7. new Thread(futureTask1).start();
      8. }
      9. }

      FutureTask 未来任务

      FutureTask:未来的任务,用它就干一件事,异步调用。通常用它解决耗时任务,挂起堵塞问题。 在主线流程最后获取结果。

      建议:1. 为了防止主线业务阻塞,在整个主线业务流程之后去get结果

      2. 只计算一次,FutureTask会复用之前计算过的结果

      去API文档:查找中间类

      注意:

      • 为了防止主线程阻塞,建议get方法放到最后
      • 只计算一次,FutureTask会复用之前计算过得结果
       Vget()
                如有必要,等待计算完成,然后获取其结果。
       Vget(long timeout, TimeUnit unit)
                如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
       booleanisDone()
                如果任务已完成,则返回 true。
       booleancancel(boolean mayInterruptIfRunning)
                试图取消对此任务的执行。

      实例:

      1. public class CallableDemo{
      2. public static void main(String[] args) throws ExecutionException, InterruptedException {
      3. //1.直接替换 不行
      4. //new Thread(new MyCallable(),"zhang3").start();
      5. //2.通过中间类 FutureTask未来任务
      6. FutureTask futureTask1 = new FutureTask<>(new MyCallable());
      7. FutureTask futureTask2 = new FutureTask<>(()->{
      8. System.out.println(Thread.currentThread().getName()+"Callable");
      9. return 1024;
      10. });
      11. new Thread(futureTask1,"zhang3").start();
      12. //new Thread(futureTask2,"li4").start();
      13. //isDone: 判断任务是否完成
      14. while (!futureTask1.isDone()) {
      15. System.out.println("wait...");
      16. }
      17. //为了防止主线程阻塞,建议get方法放到最后
      18. System.out.println(futureTask1.get());
      19. //System.out.println(futureTask2.get());
      20. }
      21. }

      面试题:callable接口与runnable接口的区别?

      相同点:都是接口,都可以编写多线程程序,都采用Thread.start()启动线程

      不同点:

      1. 具体方法不同:一个是run,一个是call
      2. Runnable没有返回值;Callable可以返回执行结果,是个泛型
      3. Callable接口的call()方法允许抛出异常;Runnable的run()方法异常只能在内部消化,不能往上继续抛
      4. 它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果 (FutureTask)。

      面试题:获得多线程的方法几种?

      传统的是继承thread类和实现runnable接口

      java5以后又有实现callable接口和java的线程池获得

    16. 相关阅读:
      (最优化理论与方法)第三章优化建模-第一节:优化建模和常见建模技术
      单例模式优缺点
      ABAP:ME28/ME2L/ME2N标准报表字段增强统一出口
      docker基础命令以及常用命令
      如何通过python爬股票接口获取证券交易日?
      langchain入门指南和实战
      RabbitMQ之集群方案原理
      回收羽绒羽毛检测
      Coumarin 343 X azide/carboxylic acid/NHS ester,香豆素343 X 叠氮化物/羧基羧酸/琥珀酰亚胺活化酯
      【Socket】两种高效事件处理模式&并发模式
    17. 原文地址:https://blog.csdn.net/a111042555/article/details/126328321