• Linux —— 生产消费者模型


    目录

    一、生产者消费者模型

    1. 生产者消费者模型的概念

    2. 生产者消费者模型的特点

    3. 生产者消费者模型的优点

    二、基于BlockQueue的生产者消费者模型

    1.基于阻塞队列的生产者消费者模型概述

    2.模拟实现基于阻塞队列的生产者消费者模型

    1.单生产者与单消费者

    2.多生产者与多消费者


    一、生产者消费者模型

    1. 生产者消费者模型的概念

            在现实生活中,当我们缺少某些生活用品时,就会到超市去购买。当你到超市时,你的身份就是消费者,那么这些商品又是哪里来的呢,自然是供应商,那么它们就是生产者,而超市在生产者与消费者之间,就充当了一个交易场所。正是这样的方式才使得人类的交易变得高效,生产者只需要向超市供应商品,消费者只需要去超市购买商品;

            生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。

    2. 生产者消费者模型的特点

    我们将其总结为321原则:3种关系、2个角色和1个场所

    3种关系:

    • 生产者VS生产者 --- 两者是互斥关系
    • 消费者VS消费者 --- 两者是互斥关系
    • 生产者VS消费者 --- 两者是同步+互斥关系

    2个角色:生产者和消费者

    1个场所:通常指的是内存中的一个缓冲区,用于数据交互

    我们编写代码来实现生产消费者模型,本质上就是要维护这3种关系;

    为什么3种关系中两两都存在着互斥关系?

            当一个生产者向缓冲区里写入数据时,自然不希望有其他的生产者进入缓冲区,所以要在缓冲区用互斥锁保护起来。同样,一个消费者读取数据也要避免其他的消费者干扰,所以在一个消费者读取缓冲区数据时用互斥锁保护起来

    为什么生产者和消费者存在同步关系?

            缓冲区也是有大小的,当生产者向缓冲区写入数据并且写满了,此时就不能写了,就需要消费者来消费,反之消费者也一样;所以让生产者在缓冲区满时休眠,等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。

    互斥本质上是保证数据的安全,同步本质上是保证线程之间协调工作;

    3. 生产者消费者模型的优点

    •  解耦
    • 支持并发
    • 支持忙闲不均

            这里最难理解的是解耦,例如:我们曾经写过Add函数,在没有学习多线程之前,当我们生产数据并调用这个函数时,必须让这个函数执行完毕返回后,才可以进行后续代码的运作。本质上就是一个人在完成两件事(生产数据和消费数据);其实这样的耦合性非常高

            对应到生产者消费者模型中,是将这件事分配给两个线程去做,主线程(对应到生产者)只负责生产数据,不关心你怎么处理数据;新线程(对应到消费者)只负责处理数据,不关心生产数据;其耦合性就降低了,因为两者之间加上了一个缓冲区,将两者的耦合性由高变低的过程就称为解耦;

    二、基于BlockQueue的生产者消费者模型

    1.基于阻塞队列的生产者消费者模型概述

            处于生产者与消费者之间的缓冲区可以有多种实现方式,比较经典的就如阻塞队列(Blocking Queue)。如下图当队列为空时,消费者将会被阻塞,直到队列中存放了数据;当队列满时,生产者就会被阻塞,直到有数据取出; 

    2.模拟实现基于阻塞队列的生产者消费者模型

    1.单生产者与单消费者

    首先我们先以单生产者和单消费者为例:这个BlockQueue我们就用C++STL中的queue即可。

    1. //BlockQueue.hpp
    2. #pragma once
    3. #include
    4. #include
    5. #include
    6. using namespace std;
    7. namespace ns_blockqqueue
    8. {
    9. const int default_cap = 5;//假设队列存放5个数据
    10. template<class T>
    11. class BlockQueue
    12. {
    13. private:
    14. queue bq_; //阻塞队列
    15. int cap_; //队列的容量上限
    16. pthread_mutex_t mtx_; //保护临界资源的锁
    17. pthread_cond_t is_full_; //bq_ 是满的,让消费者来消费,让消费者在该条件变量下等待
    18. pthread_cond_t is_empty_; //bq_ 是空的,让生产者来生产,让生产者在该条件变量下等待
    19. private:
    20. bool IsFull(){ return bq_.size() == cap_; } //判断队列是否已满
    21. bool Isempty(){ return bq_.size() == 0; } //判断队列是否为空
    22. void LockQueue(){ pthread_mutex_lock(&mtx_); } //加锁操作
    23. void UnlockQueue(){ pthread_mutex_unlock(&mtx_); } //解锁操作
    24. void ProducterWait(){ pthread_cond_wait(&is_empty_, &mtx_);} //让生产者在队列是否为空的条件变量下等待,并释放锁
    25. void ConsumerWait() { pthread_cond_wait(&is_full_, &mtx_);} //让消费者在队列是否为满的条件变量下等待,并释放锁
    26. void WakeupConsumer() { pthread_cond_signal(&is_full_); } //唤醒在队列是否为满的条件变量下的消费者
    27. void WakeupProducter(){ pthread_cond_signal(&is_empty_); } //唤醒在队列是否为空的条件变量下的生产者
    28. public:
    29. BlockQueue(int cap = default_cap):cap_(cap)
    30. {
    31. pthread_mutex_init(&mtx_, nullptr);
    32. pthread_cond_init(&is_empty_, nullptr);
    33. pthread_cond_init(&is_full_, nullptr);
    34. }
    35. ~BlockQueue()
    36. {
    37. pthread_mutex_destroy(&mtx_);
    38. pthread_cond_destroy(&is_empty_);
    39. pthread_cond_destroy(&is_full_);
    40. }
    41. public:
    42. void Push(const T& in) //生产函数
    43. {
    44. LockQueue(); //向队列中放数据前,先申请锁,进入临界资源
    45. while(IsFull())//判断队列是否为满
    46. {
    47. ProducterWait();//满足条件后,就需要挂起等待,并释放锁
    48. }
    49. bq_.push(in); //不满足则向队列中放数据
    50. WakeupConsumer(); //唤醒在empty条件变量下等待的消费者
    51. UnlockQueue(); //释放锁,离开临界资源
    52. }
    53. void Pop(T* out) //消费函数
    54. {
    55. LockQueue(); //从队列中拿数据前,先申请锁,进入临界资源
    56. while(Isempty()) //判断队列是否为空
    57. {
    58. ConsumerWait(); //满足条件后,就需要挂起等待,并释放锁
    59. }
    60. *out = bq_.front(); //不满足则从队列拿放数据
    61. bq_.pop(); //因为是队列,你要pop掉,确保能拿到下一个
    62. WakeupProducter();//唤醒在full条件变量下等待的消费者
    63. UnlockQueue(); //释放锁,离开临界资源
    64. }
    65. };
    66. }

    说明:

    1. 我们实现的是单生产与单消费的模型,因此不需要维护生产者和生产者、消费者与消费者之间的的关系。
    2. 设置BlockQueue的存储数据上限(方便演示),不能让生产者一直生产下去,具体的实现根据业务的需求。
    3. BlockQueue是被生产者和消费者共同访问的,也就是临界资源,因此我们需要用互斥锁将临界资源进行保护。
    4. 当生产者向阻塞队列中放数据时,若此时阻塞队列满了,那生产者要先通知消费者开始消费数据,然后挂起等待。当消费者从阻塞队列中拿数据时,若此时阻塞队列为空了,那消费者先通知生产者开始生产数据,然后挂起等待。
    5. 我们需要2个条件变量,1.当阻塞队列满时,生产者需要在此条件变量下等待。2.当阻塞队列为空时,消费者在此条件变量下等待。
    6. 生产者和消费者都要进入临界区就要判断条件是否满足,若条件不满足就要被挂起,此时它们中的1个是拿着锁在等待的,为了避免死锁的问题,在调用pthread_cond_wait时要传入互斥锁,当被挂起时释放手中的锁资源。
    7. 当生产者生产完一个数据后,意味着阻塞队列当中至少有一个数据,而此时可能有消费者线程正在empty条件变量下进行等待,因此当生产者生产完数据后需要唤醒在empty条件变量下等待的消费者线程,反之消费者也一样。

            在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据。 

    1. #include "BlockQueue.hpp"
    2. #include
    3. #include
    4. #include
    5. using namespace ns_blockqqueue;
    6. void* consumer(void* args)
    7. {
    8. BlockQueue<int>* bq = (BlockQueue<int>*)args;
    9. while(true)
    10. {
    11. sleep(1);
    12. //消费数据
    13. int data = 0;
    14. bq->Pop(&data);
    15. cout << "消费者消费了一个数据:" << data << endl;
    16. }
    17. }
    18. void* produter(void* args)
    19. {
    20. BlockQueue<int>* bq = (BlockQueue<int>*)args;
    21. while(true)
    22. {
    23. sleep(1);
    24. //制造数据
    25. int data = rand() % 20 + 1;
    26. cout << "生产者生产数据:" << data << endl;
    27. bq->Push(data);
    28. }
    29. }
    30. int main()
    31. {
    32. srand((long long)time(nullptr));
    33. BlockQueue<int>* bq = new BlockQueue<int>();
    34. pthread_t c, p;
    35. pthread_create(&c, nullptr, consumer,(void*)bq);//传入阻塞队列
    36. pthread_create(&p, nullptr, produter,(void*)bq);//传入阻塞队列
    37. pthread_join(c, nullptr);
    38. pthread_join(p, nullptr);
    39. return 0;
    40. }

            为了让生产者和消费者同时看到这个阻塞队列,我们可以在创建生产者线程和消费者线程时将阻塞队列传入

            由于代码中生产者是每隔一秒生产一个数据,而消费者是每隔一秒消费一个数据,因此运行代码后我们可以看到生产者和消费者的执行步调保持一致。

    我们可以让消费者消费慢一些:

     我们可以让生产者生产慢一些:

     可以发现生产者5秒后生产一个数据,瞬间就被消费者消费了

            单单让生产者生产数据,消费者消费数据,显然意义不是很大,我们可以通过生产者生产任务,消费者去执行任务;

    1. //Task.hpp
    2. #pragma once
    3. #include
    4. #include
    5. using namespace std;
    6. namespace ns_task
    7. {
    8. class Task
    9. {
    10. private:
    11. int x_;
    12. int y_;
    13. char op_;//用来表示:+ 、- 、* 、/ 、%
    14. public:
    15. Task(){}
    16. Task(int x, int y, char op):x_(x), y_(y), op_(op){}
    17. int Run()
    18. {
    19. int res = 0;
    20. switch(op_)
    21. {
    22. case '+':
    23. res = x_ + y_;
    24. break;
    25. case '-':
    26. res = x_ - y_;
    27. break;
    28. case '*':
    29. res = x_ * y_;
    30. break;
    31. case '/':
    32. res = x_ / y_;
    33. break;
    34. case '%':
    35. res = x_ % y_;
    36. break;
    37. default:
    38. cout << "bug" << endl;
    39. break;
    40. }
    41. cout << "当前任务正在被:" << pthread_self() << "处理:" \
    42. << x_ << op_ << y_ << "=" << res << endl;
    43. return res;
    44. }
    45. int operator()()
    46. {
    47. return Run();
    48. }
    49. ~Task(){}
    50. };
    51. }

     运行结果如下:

    2.多生产者与多消费者

            我们刚刚的代码是能够满足多生产和多消费模型的,虽然我们只要一把锁,但是已经足够了,足以维护那3种关系了。我们只需要创建线程就可以了

    1. int main()
    2. {
    3. srand((long long)time(nullptr));
    4. BlockQueue* bq = new BlockQueue();
    5. pthread_t c, p;
    6. pthread_t c1, c2, c3, c4;
    7. pthread_create(&c, nullptr, consumer,(void*)bq);
    8. pthread_create(&c1, nullptr, consumer,(void*)bq);
    9. pthread_create(&c2, nullptr, consumer,(void*)bq);
    10. pthread_create(&c3, nullptr, consumer,(void*)bq);
    11. pthread_create(&c4, nullptr, consumer,(void*)bq);
    12. pthread_create(&p, nullptr, produter,(void*)bq);
    13. pthread_join(c, nullptr);
    14. pthread_join(c1, nullptr);
    15. pthread_join(c2, nullptr);
    16. pthread_join(c3, nullptr);
    17. pthread_join(c4, nullptr);
    18. pthread_join(p, nullptr);
    19. return 0;
    20. }

     运行结果:

  • 相关阅读:
    防暴力破解服务器ssh登入次数限制
    timezone 时区
    yolov5
    微信小程序一对多个页面间传递数据进行通信,事件触发的实现方法
    语法基础(函数)
    基于ATMega328微控制器的APRS Kiss TNC: 从原理到C语言实现的完整指南
    6张思维导图,搞定项目管理!(PMP项目管理可用)
    ROS 学习应用篇(六)参数的使用与编程
    STC 51单片机58——旋转LED
    RTSP/Onvif安防平台EasyNVR接入EasyNVS,出现报错“Login error, i/o deadline reached”的解决方法
  • 原文地址:https://blog.csdn.net/sjsjnsjnn/article/details/126283778