• JavaEE 初阶篇-生产者与消费者模型(线程通信)


    🔥博客主页: 【小扳_-CSDN博客】
    ❤感谢大家点赞👍收藏⭐评论✍
     

    文章目录

            1.0 生产者与消费者模型概述

            2.0 在生产者与消费者模型中涉及的关键概念

            2.1 缓冲区

            2.2 生产者

            2.3 消费者

            2.4 同步机制

            2.5 线程间通信

            3.0 实现生产者与消费者模型的完整代码


            1.0 生产者与消费者模型概述

            消费者与生产者模型是计算机科学中一个经典的并发编程问题,描述了多个生产者和消费者之间如何共享有限缓冲区的情况。在该模型中,生产者负责生产物品并将其放入共享的缓冲区,而消费者则负责从缓冲区取出物品进行消费。生产者与消费者之间必须进行有效的同步和协调,以避免生产者在缓冲区满时继续生产物品,或消费者在缓冲区为空时尝试消费物品,从而导致竞争条件和数据不一致的问题。

            2.0 在生产者与消费者模型中涉及的关键概念

            缓冲区、生产者、消费者、同步机制和线程间通信。

            2.1 缓冲区

            用于存储生产者生产的物品,以便消费者可以从中取出。缓冲区通常是一个有限的队列或缓冲区,可以存储一定数量的物品。

            实现缓冲区可以用到数组、链表实现。目前用的是循环数组实现缓冲区的功能。可以自定义数组大小,默认大小为 10 。

            循环数组的实现思路,定义三个变量:当前存储的个数 size ,头队列的索引也是取出数据的索引:head 和 尾队列的索引也是放入数据的索引处:tail 。

    代码如下:

    1. public class Desk {
    2. private final String[] arr;
    3. private int size = 0;
    4. private int head = 0;
    5. private int tail = 0;
    6. //有参构造器
    7. public Desk(int size) {
    8. this.arr = new String[size];
    9. }
    10. //无参构造器,默认大小为10
    11. public Desk(){
    12. this.arr = new String[10];
    13. }
    14. }

            定义了有参和无参两个构造器。将 size 、head 、tail 初始化都为 0 。

            2.2 生产者

            负责向缓冲区中生产物品并放入到其中。生产者在生产物品之前通常会检查缓冲区是否已满,如果已满则需要等待直到有空间可用。

            实现生产者,就是实现一个 put 方法,先判断数组中的 size 与 数组大小关系,若 size >= arr.length 时,先唤醒其他全部线程,然后当前线程则进入等待状态;若 size < arr.length 时,将数据放入到索引为 tail 处的数组位置,接着 tail++ ,tail 加完之后需要判断是否越界了,如果越界了,则需要进行将 tail 重新置为 0 。再来 size++ 操作,最后在再唤醒其他线程,当前线程也就可以进行等待状态了。

    代码如下:

    1. //放入数据
    2. public synchronized void put(String data) throws InterruptedException {
    3. String name = Thread.currentThread().getName();
    4. String putData = name + ",放入一个数据:" + data;
    5. if ( ! (size >= arr.length) ){
    6. arr[tail] = putData;
    7. tail++;
    8. if (tail >= arr.length){
    9. tail = 0;
    10. }
    11. size++;
    12. System.out.println(name + "成功放入数据:" + data +
    13. ",当前数据个数为:" + size + "个");
    14. Thread.sleep(1000);
    15. this.notifyAll();
    16. this.wait();
    17. }else {
    18. this.notifyAll();
    19. this.wait();
    20. }
    21. }

             为了方便观察,用到了 Thread.sleep() 方法。

            2.3 消费者

            负责从缓冲区中取出物品并进行消费。消费者再消费物品之前常会检查缓冲区是否为空,如果为空则需要等待直到有物品可取。

            消费者的实现也是一个 take() 方法,先判断 siez == 0 ,若成立,则先唤醒其他线程,当前线程则进入等待;若不成立,则获取数组中索引位置为 head 的数据,接着 head++ 处理,紧接着判断 head >= arr.length ,若成立,将 head = 0 处理。再接着 size-- ,最后唤醒其他线程,当前线程则进入等待状态。

    代码如下:

    1. //取出数据
    2. public synchronized void take() throws InterruptedException {
    3. String name = Thread.currentThread().getName();
    4. if ( !(size == 0)){
    5. String ret = arr[head];
    6. head++;
    7. if (head >= arr.length){
    8. head = 0;
    9. }
    10. size--;
    11. System.out.println(name + "读取到了:" + ret +
    12. ",当前还剩数据个数为:" + size + "个");
    13. Thread.sleep(1000);
    14. this.notifyAll();
    15. this.wait();
    16. }else {
    17. this.notifyAll();
    18. this.wait();
    19. }
    20. }

            同样,这里也用到了 Thread.sleep() 方法,主要是为了方便观察。

            2.4 同步机制

            用于实现生产者与消费者之间的同步协调。常用的同步机制包括互斥锁、条件变量、信号变量等,以确保生产者和消费者之间的操作发生竞争条件。

            实现中就是用到了 synchronized() 这个关键字。这确保了在多线程环境下,同一时刻只有一个线程可以访问 put() 和 take() 方法中的关键代码块,从而保证了线程安全性。

            2.5 线程间通信

            生产者与消费者通常运行再不同的线程中,它们之间需要通过线程间通信机制进行协作。常用的线程间通信方式包括 wait-notify 机制等。

            在循环中调用 wait() 方法,以避免虚假唤醒问题。在同步块中调用 notifyAll() 方法,以确保线程安全性。

            3.0 实现生产者与消费者模型的完整代码

    1. public class ProducerConsumer {
    2. public static void main(String[] args) {
    3. Desk desk = new Desk(1);
    4. //生产者线程1
    5. Thread thread1 = new Thread(()->{
    6. try {
    7. while (true) {
    8. desk.put("华为电脑");
    9. }
    10. } catch (InterruptedException e) {
    11. throw new RuntimeException(e);
    12. }
    13. },"生产者1");
    14. thread1.start();
    15. //生产者线程2
    16. Thread thread2 = new Thread(()->{
    17. try {
    18. while (true) {
    19. desk.put("小米su7");
    20. }
    21. } catch (InterruptedException e) {
    22. throw new RuntimeException(e);
    23. }
    24. },"生产者2");
    25. thread2.start();
    26. //生产者线程3
    27. Thread thread3 = new Thread(()->{
    28. try {
    29. while (true) {
    30. desk.put("大疆无人机");
    31. }
    32. } catch (InterruptedException e) {
    33. throw new RuntimeException(e);
    34. }
    35. },"生产者3");
    36. thread3.start();
    37. //消费者1
    38. Thread thread4 = new Thread(()->{
    39. try {
    40. while (true) {
    41. desk.take();
    42. }
    43. } catch (InterruptedException e) {
    44. throw new RuntimeException(e);
    45. }
    46. },"消费者1");
    47. thread4.start();
    48. //消费者2
    49. Thread thread5 = new Thread(()->{
    50. try {
    51. while (true) {
    52. desk.take();
    53. }
    54. } catch (InterruptedException e) {
    55. throw new RuntimeException(e);
    56. }
    57. },"消费者2");
    58. thread5.start();
    59. }
    60. }
    1. public class Desk {
    2. private final String[] arr;
    3. private int size = 0;
    4. private int head = 0;
    5. private int tail = 0;
    6. //有参构造器
    7. public Desk(int size) {
    8. this.arr = new String[size];
    9. }
    10. //无参构造器,默认大小为10
    11. public Desk(){
    12. this.arr = new String[10];
    13. }
    14. //放入数据
    15. public synchronized void put(String data) throws InterruptedException {
    16. String name = Thread.currentThread().getName();
    17. String putData = name + ",放入一个数据:" + data;
    18. if ( ! (size >= arr.length) ){
    19. arr[tail] = putData;
    20. tail++;
    21. if (tail >= arr.length){
    22. tail = 0;
    23. }
    24. size++;
    25. System.out.println(name + "成功放入数据:" + data + ",当前数据个数为:" + size + "个");
    26. Thread.sleep(1000);
    27. this.notifyAll();
    28. this.wait();
    29. }else {
    30. this.notifyAll();
    31. this.wait();
    32. }
    33. }
    34. //取出数据
    35. public synchronized void take() throws InterruptedException {
    36. String name = Thread.currentThread().getName();
    37. if ( !(size == 0)){
    38. String ret = arr[head];
    39. head++;
    40. if (head >= arr.length){
    41. head = 0;
    42. }
    43. size--;
    44. System.out.println(name + "读取到了:" + ret + ",当前还剩数据个数为:" + size + "个");
    45. Thread.sleep(1000);
    46. this.notifyAll();
    47. this.wait();
    48. }else {
    49. this.notifyAll();
    50. this.wait();
    51. }
    52. }
    53. }

    一部分的运行结果:

            通过合理设计和实现生产者与消费者模型,可以有效地避免竞争条件和数据不一致的问题,实现多个生产者和消费者之间的协同工作。在实际应用中,消费者与生产者模型被广泛应用于操作系统、并发编程和分布式系统等领域,是并发编程中重要的基础知识之一。

  • 相关阅读:
    ES新特性与TypeScript、JS性能优化
    springMVC的简单数据绑定
    实测办公场景下,国产远程控制软件的表现力如何?(技术解析)
    【推荐算法】召回模型总结
    区块链、以太坊,以太坊和智能合约
    Linux·驱动
    【解题报告】CF练一下题 | 难度CF2500左右
    [Asp.Net Core]C#解析Markdown文档
    从指定 URL 读取图像并以 OpenCV 格式返回的函数(从指定 URL 读取图像并使其可由 OpenCV 处理。)
    TCP通信-实现 1 发 1 收
  • 原文地址:https://blog.csdn.net/Tingfeng__/article/details/137435710