• 【多线程案例】阻塞队列,实现生产者消费者模型


    阻塞队列(BlockingQueue

    阻塞队列是多线程代码中比较常用的一种数据结构。是一种特殊的队列,带有阻塞特性。

    为何说是一种特殊的队列?

    1.线程安全

    2.带有阻塞特性

    • 如果队列为空,继续出队列,就会发生阻塞。阻塞到其他线程往队列里添加元素为止。
    • 如果队列为满,继续入队列,就会发生阻塞。阻塞到其他线程从队列中取走元素为止。
    • 意义:可以用来实现"生产者消费者模型"。生产者消费者模型通俗的来讲,生产者负责生产东西,并将东西放到阻塞队列中,然后消费者就会从阻塞队列中获取内容,如果生产者生产的慢,消费者就得等待 (即消费者从空的队列中获取元素就得等待);相反如果生产者生产的快,生产者就可以休息速度慢下来 (即生产者从满的队列中添加元素就会阻塞等待)。

    在java中标准库中针对zuseduilie提供两种实现方式:

    基于数组:

    BlockingQueue queue = new ArrayBlockingQueue<>(5);

    基于链表:

    BlockingQueue queue = new LinkedBlockingDeque<>();

    方法:

    put()      入队列 带有阻塞性质

    take()    出队列 带有阻塞性质

    实现基于数组(循环队列)的阻塞队列:

    1. //实现阻塞队列
    2. class myBlockingQueue{
    3. //锁对象
    4. private Object object = new Object();
    5. //队列采用循环队列 数组
    6. private String[] data = new String[1000];
    7. //队头元素位置 加volatile防止内存可见性问题
    8. private volatile int head = 0;
    9. //队尾元素位置
    10. private volatile int tail = 0;
    11. //有效长度
    12. private volatile int size = 0;
    13. //带有阻塞性质的入队操作put
    14. public void put(String str) throws InterruptedException {
    15. synchronized(object) {
    16. //队列满时
    17. if(size==data.length) {
    18. //阻塞等待 等待另一个线程调用notify方法唤醒
    19. object.wait();
    20. }
    21. //队列不满 入队列
    22. data[tail] = str;
    23. tail++;
    24. size++;
    25. object.notify();
    26. //由于数组循环使用 也防止索引出界
    27. if(tail==data.length) {
    28. tail = 0;
    29. }
    30. }
    31. }
    32. //带有阻塞性质的出队列操作
    33. public String take() throws InterruptedException {
    34. synchronized(object) {
    35. //队列为空
    36. if(size==0) {
    37. //阻塞等待
    38. object.wait();
    39. }
    40. //队列不为空
    41. String tmp = data[head];
    42. head++;
    43. if(head==data.length) {
    44. head = 0;
    45. }
    46. size--;
    47. //唤醒
    48. object.notify();
    49. return tmp;
    50. }
    51. }
    52. }

    代码实现中的一些细节:

    • 指向队头,队尾元素,size在代码中可能会出现内存可见性问题,要加volatile。
    • 入队,出队方法都存在着可能会影响线程安全的读,修改操作,最好给整个方法加锁。
    • 虽然在一个方法中有wait和notify,但是一个队列满队列和空队列不会同时出现。并且使用wait进行阻塞等待时,是由另一个线程中的notify唤醒的。
    • 抛异常可以是方法后跟throws,也可以是try...catch...,但这里应该使用throws,原因是try...catch...执行后程序不会停止,还是继续向下执行,对应代码就是入队操作判断队列满时,如果使用try...catch...,程序出现异常后,向下再接着执行,是会覆盖掉队列中其他未执行的内容的,而使用throws若程序出现异常,会抛出异常后整个方法就结束了,interrupt唤醒了wait。
    • 使用wait的时候,往往都是使用while作为条件判定的方式,java源码解释也是推荐while。目的就是为了让wait唤醒之后还能再确认一次,是否条件仍然满足。
    • 一个队列,空和满只能同时出现一种,take和put只有一边能阻塞。如果put阻塞了,其他线程继续调用put也都会阻塞,只有靠take唤醒,如果take阻塞了,其他线程继续调用take也都会阻塞,只能靠put唤醒。

    实现生产者消费者模型

    生产者 - 消费者模型( Producer-consumer problem) 是一个非常经典的多线程并发协作的模型,在分布式系统里非常常见。

    这个模型由两类线程和一个缓冲区组成来组成

    • 生产者线程:生产数据,并把数据放在这个队列里面
    • 缓冲区:存放生产者的数据的地方即阻塞队列
    • 消费者线程:从队列里面取数据,消费数据

    运行流程

    • 生产者和消费者在同一时间段内共用同一个存储空间
    • 生产者往存储空间中添加产品
    • 消费者从存储空间中取走产品
    • 当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
       

    实现"生产者消费者模型"好处:

    (1)解耦合

            两个模块之间联系越紧密,耦合就越高。尤其对于分布式系统来说,十分有意义。

    (2)削峰填谷

            峰:指短时间内请求多。

            比如服务器和客户端之间的请求与响应,当用户量请求增大时,服务器也会受牵连,甚至于            将服务器弄崩溃给挂了,耦合性较高,如果两者之间用一种数据结构如队列存储请求,就不论客户            端用户量请求有多大时,服务器仍然可以按照自己的速度去处理请求。

    消息队列:当把阻塞队列封装成单独的服务器程序,部署到特定的机器上,这个时候就把这个队列称为"消息队列"。

    实现生产者消费者模型代码:

    1. //实现阻塞队列
    2. class myBlockingQueue{
    3. //锁对象
    4. private Object object = new Object();
    5. //队列采用循环队列 数组
    6. private String[] data = new String[1000];
    7. //头指针 加volatile防止内存可见性问题
    8. private volatile int head = 0;
    9. //尾指针
    10. private volatile int tail = 0;
    11. //有效长度
    12. private volatile int size = 0;
    13. //带有阻塞性质的入队操作put
    14. public void put(String str) throws InterruptedException {
    15. synchronized(object) {
    16. //队列满时
    17. while (size==data.length) {
    18. //阻塞等待 等待另一个线程调用notify方法唤醒
    19. object.wait();
    20. }
    21. //队列不满 入队列
    22. data[tail] = str;
    23. tail++;
    24. size++;
    25. object.notify();
    26. //由于数组循环使用 也防止索引出界
    27. if(tail==data.length) {
    28. tail = 0;
    29. }
    30. }
    31. }
    32. //带有阻塞性质的出队列操作
    33. public String take() throws InterruptedException {
    34. synchronized(object) {
    35. //队列为空
    36. while (size==0) {
    37. //阻塞等待
    38. object.wait();
    39. }
    40. //队列不为空
    41. String tmp = data[head];
    42. head++;
    43. if(head==data.length) {
    44. head = 0;
    45. }
    46. size--;
    47. //唤醒
    48. object.notify();
    49. return tmp;
    50. }
    51. }
    52. }
    53. //借助阻塞队列 实现生产者消费者模型
    54. public class test {
    55. public static void main(String[] args) {
    56. MyBlockingQueue queue = new MyBlockingQueue();
    57. //生产者模型
    58. Thread t1 = new Thread(()->{
    59. int num = 1;
    60. while(true) {
    61. try {
    62. queue.put(num);
    63. System.out.println("生产者生产"+num);
    64. num++;
    65. //Thread.sleep(1000); //生产者有节奏生产
    66. } catch (InterruptedException e) {
    67. e.printStackTrace();
    68. }
    69. }
    70. });
    71. //消费者模型
    72. Thread t2 =new Thread(()->{
    73. while(true) {
    74. try {
    75. int tmp = queue.take();
    76. System.out.println("消费者消费"+tmp);
    77. Thread.sleep(1000); //消费者有节奏消费
    78. } catch (InterruptedException e) {
    79. e.printStackTrace();
    80. }
    81. }
    82. });
    83. t1.start();
    84. t2.start();
    85. }
    86. }

  • 相关阅读:
    Orphaned pod found - but volume paths are still present on disk的处理
    深度解析布谷鸟过滤器(上篇)
    MySQL数据库——主从复制及读写分离
    [附源码]JAVA毕业设计科大学生党员之家设计(系统+LW)
    淘宝扭蛋机一番赏小程序的玩法介绍
    C#小白(基础篇)1 (变量、赋值运算符、常量、+号的作用、占位符的使用)
    李沐动手学习深度学习——4.4练习
    uni-app父子组件传递数据(更新中)
    rancher2.6.4配置管理k8s,docker安装
    java 容器
  • 原文地址:https://blog.csdn.net/m0_73381672/article/details/133690633