• Linux Day17 生产者消费者


    一、生产者消费者问题概述

    生产者 / 消费者问题,也被称作有限缓冲问题。两个或者更多的线程共享同一个缓冲 区,其中一个或多个线程作为 生产者 会不断地向缓冲区中添加数据,另一个或者多个线程作为 消费者 从缓冲区中取走数据。生产者 / 消费者模型关注的是以下几点:
    1. 生产者和消费者必须互斥的使用缓冲区,即生产者添加数据的时候,消费者不能取走数据,同样消费者在取走数据的时候,生产者不能添加数据。
    2.缓冲区空时,消费者不能读取数据
    3.缓冲区满时,生产者不能添加数据

    二、生产者消费者模型优点:

    1. 解耦:因为多了一个缓冲区,所以生产者和消费者并不直接相互调用,这样生产者和消费者的代码 发生变化,都不会对对方产生影响。这样其实就是把生产者和消费者之间的强耦合解开,变成了生 产者和缓冲区,消费者和缓冲区之间的弱耦合
    2. 支持并发:如果消费者直接从生产者拿数据,则消费者需要等待生产者生产数据,同样生产者需等待消费者消费数据。而有了生产者 / 消费者模型,生产者和消费者可以是两个独立的并发主体。 生产者把制造出来的数据添加到缓冲区,就可以再去生产下一个数据了。而消费者也是一样的,从
    缓冲区中读取数据,不需要等待生产者。这样,生产者和消费者就可以并发的执行。
    3. 支持忙闲不均:如果消费者直接从生产者这里拿数据,而生产者生产数据很慢,消费者消费数据很快,或者生产者生产数据很多,消费者消费数据很慢。都会造成占用 CPU 的时间片白白浪费。生产 / 消费者模型中,生产者只需要将生产的数据添加到缓冲区,缓冲区满了就不生产了。消费者从 缓冲区中读取数据,缓冲区空了就不消费了,使得生产者 / 消费者的处理能力达到一个动态的平
    衡。

    三、生产者消费者模型实现

    假定缓冲池中有 N 个缓冲区,一个缓冲区只能存储一个 int 类型的数据。定义互斥锁 mutex 实现对缓
    冲区的互斥访问;计数信号量product 用来表示空闲缓冲区的数量,其初值为 N ;计数信号量consume 用来表 示有数据的缓冲区的数量,其初值为 0
    一下为画图表示

    如图所示,有三个生产者,两个消费者(注意该图所指向仅表示写入数据,并不代表写入同一个空间,读取亦是),设置两个信号量用来控制生产者消费者的运行,首先注意不能先加锁,如果缓冲区满了,此时s1=0,p操作就会阻塞,以至于加锁后无法解锁,所以应该先p操作再加锁,加完锁后就可以开始写入数据,写完后解锁,然后给消费者的信号量+1,这样消费者就可以读取操作了。注意生产者消费者模型并不是生产者将数据生产完后,消费者才能读,是一边生产一边读取。

    四、代码实现

    根据三的图,我们可以用代码来实现该操作

    4.1 信号量+互斥锁

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #define BUFF_SIZE 10
    8. sem_t product_sem;
    9. sem_t consume_sem;
    10. pthread_mutex_t mutex;
    11. int buff[BUFF_SIZE];
    12. int in=0;
    13. int out=0;
    14. void*pro_fun(void*arg)//生产者
    15. {
    16. for(int i=0;i<20;i++)
    17. {
    18. sem_wait(&product_sem);
    19. pthread_mutex_lock(&mutex);
    20. buff[in]=rand()%100;
    21. printf("生产者在%d处产生数据%d\n",in,buff[in]);
    22. in=(++in)%10;
    23. pthread_mutex_unlock(&mutex);
    24. sem_post(&consume_sem);
    25. }
    26. }
    27. void*con_fun(void*arg)//消费者
    28. {
    29. for(int i=0;i<30;i++)
    30. {
    31. sem_wait(&consume_sem);
    32. pthread_mutex_lock(&mutex);
    33. printf("----------消费者在%d处消费数据%d\n",out,buff[out]);
    34. out=(++out)%10;
    35. pthread_mutex_unlock(&mutex);
    36. sem_post(&product_sem);
    37. }
    38. }
    39. int main()
    40. {
    41. sem_init(&product_sem,0,BUFF_SIZE);
    42. sem_init(&consume_sem,0,0);
    43. pthread_mutex_init(&mutex,NULL);
    44. srand((time(NULL)));
    45. pthread_t pro_id[3];
    46. for(int i=0;i<3;i++)
    47. {
    48. pthread_create(&pro_id[i],NULL,pro_fun,NULL);
    49. }
    50. pthread_t con_id[2];
    51. for(int i=0;i<2;i++)
    52. {
    53. pthread_create(&con_id[i],NULL,con_fun,NULL);
    54. }
    55. for(int i=0;i<3;i++)
    56. {
    57. pthread_join(pro_id[i],NULL);
    58. }
    59. for(int i=0;i<2;i++)
    60. {
    61. pthread_join(con_id[i],NULL);
    62. }
    63. sem_destroy(&product_sem);
    64. sem_destroy(&consume_sem);
    65. pthread_mutex_destroy(&mutex);
    66. exit(0);
    67. }

     

    该方法可以精确记录空闲和数据的个数。

    4.2 条件变量

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #define BUFF_SIZE 10
    8. pthread_mutex_t mutex;
    9. pthread_cond_t cond;
    10. int buff[BUFF_SIZE];
    11. int in = 0;
    12. int out = 0;
    13. void* pro_fun(void* arg) {
    14. for (int i = 0; i < 20; i++) {
    15. pthread_mutex_lock(&mutex);
    16. while ((in + 1) % BUFF_SIZE == out) {
    17. pthread_cond_wait(&cond, &mutex);
    18. }//如果生产者生产数据值满了,通知消费者
    19. buff[in] = rand() % 100;
    20. printf("生产者在%d处产生数据%d\n", in, buff[in]);
    21. in = (in + 1) % BUFF_SIZE;
    22. pthread_cond_signal(&cond);
    23. pthread_mutex_unlock(&mutex);
    24. }
    25. return NULL;
    26. }
    27. void* con_fun(void* arg) {
    28. for (int i = 0; i < 30; i++) {
    29. pthread_mutex_lock(&mutex);
    30. while (in == out) {
    31. pthread_cond_wait(&cond, &mutex);
    32. }//如果消费者消费完,通知生产者生产数据
    33. printf("----------消费者在%d处消费数据%d\n", out, buff[out]);
    34. out = (out + 1) % BUFF_SIZE;
    35. pthread_cond_signal(&cond);
    36. pthread_mutex_unlock(&mutex);
    37. }
    38. return NULL;
    39. }
    40. int main() {
    41. pthread_cond_init(&cond, NULL);
    42. pthread_mutex_init(&mutex, NULL);
    43. srand(time(NULL));
    44. pthread_t pro_id[3];
    45. for (int i = 0; i < 3; i++) {
    46. pthread_create(&pro_id[i], NULL, pro_fun, NULL);
    47. }
    48. pthread_t con_id[2];
    49. for (int i = 0; i < 2; i++) {
    50. pthread_create(&con_id[i], NULL, con_fun, NULL);
    51. }
    52. for (int i = 0; i < 3; i++) {
    53. pthread_join(pro_id[i], NULL);
    54. }
    55. for (int i = 0; i < 2; i++) {
    56. pthread_join(con_id[i], NULL);
    57. }
    58. pthread_cond_destroy(&cond);
    59. pthread_mutex_destroy(&mutex);
    60. exit(0);
    61. }

    如果对条件变量和信号量不明白的uu可以看一下上几章的内容。后面就要开启网络编程咧。

  • 相关阅读:
    jbase打印完善
    用Python下载漫画,每天掌握一个实用知识
    Kotlin 数据类生成多个构造函数
    选择 REST ,还是 GraphQL
    Vue3前端实现一个本地消息队列(MQ), 让消息延迟消费或者做缓存
    指定显卡运行python脚本
    电商前台项目(一):项目前的初始化及搭建
    STM32 PA15/JTDI 用作普通IO,烧录口不能使用问题解决
    java计算机毕业设计郑工校园二手交易平台网站源程序+mysql+系统+lw文档+远程调试
    Playcanvas后处理-辉光bloom
  • 原文地址:https://blog.csdn.net/hello_world_juns/article/details/133023649