• 基于阻塞队列的生产消费模型


    目录

    一、线程同步

    1.生产消费模型(或生产者消费者模型)

    2.认识同步

    (1)生产消费模型中的同步

    (2)生产者消费者模型的特点

    二、条件变量

    1.认识条件变量

    2.条件变量的使用

    3.代码改造

    三、基于阻塞队列的生产消费模型

    1.阻塞队列类

    (1)阻塞队列

    (2)实现生产者的生产函数

    (3)实现消费者的消费函数

    2.pthread_cond_wait为什么要传入锁

    3.生产者和消费者线程的执行函数

    (1)执行函数

    (2)试运行

    4.部分细节处理

    (1)伪唤醒问题

    (2)解锁与唤醒的顺序

    5.处理任务的生产消费模型

    (1)代码改造

    (2)运行

    6.生产消费模型为何高效

    四、双阻塞队列的生产消费模型

    1.编写类代码

    2.增加处理保存任务的函数

    3.更改线程函数

    4.更改main函数


    一、线程同步

    1.生产消费模型(或生产者消费者模型

    我们肯定有在超市买东西的经历,比如买水。超市的瓶装水是供应商提供的,所以供应商是生产者;超市从供应商进货,超市就是一个交易场所;我们从超市买水,我们就是消费者。

    这些概念也可以转换到线程中,我们将读取数据的线程叫做消费者线程(消费者),将产生数据的线程叫做生产者线程(供应商),将共享的特定数据结构叫做缓冲区(交易场所)。

    超市中售卖的瓶装水品牌很多,所以供应商肯定不止一个,这些不同牌子瓶装水的生产者之间的关系就是竞争关系。所以,多线程中各个生产者线程之间是互斥关系,同一时刻只有一个生产者线程能访问缓冲区。

    对于消费者线程也是一样的,一个消费者买到了水,其他人可能就买不到了。所以消费者线程和消费者线程之间也是互斥关系,同一时刻也只有一个消费者线程能访问缓冲区。

    超市也总要有补货的时间,如果当前仓库里有货,但是工作人员还没有将货物摆在货架上,那到来的顾客就以为这里没有水。

    为了避免出问题,应当在超市补货时阻止消费者进入。由于缓冲区数据又被错误覆盖的风险,所以最好在消费者线程访问缓冲区时,不允许生产者线程访问缓冲区,反之亦然。

    消费者线程和生产者线程之间也是互斥关系,在同一时间内只有一个线程可以访问缓冲区。

    2.认识同步

    (1)生产消费模型中的同步

    在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而有效避免饥饿问题,叫做同步。

    首先什么叫饥饿状态?

    我以多线程抢票代码为例,如果每个线程抢完票后都没有进行其他处理动作时,第一个申请到锁的线程更容易申请到锁。最终,大部分票都被一个线程抢走。而其他线程竞争能力弱,缺乏调度,这些线程就处于饥饿状态。

    那什么是同步?

    而同步就是让所有线程按照一定顺序来抢票,尽可能做到人人有份,避免线程饥饿问题产生。

    再次回到超市买水,供应商不能没完没了地向超市供货,一方面超市一直关着,消费者无法消费,另一方面,超市又不是四次元口袋,总是要装满的。

    同样,消费者也不能没完没了地买水,一方面超市不关门,供货商不能进货,另一方面,瓶装水又肯定会卖完。

    所以最好生产者先供货,货架摆满了就不进货了。消费者来买,当水卖完了再让供应商进货,让消费者和生产者协同起来。

    所以,消费者线程和生产者线程之间也是同步关系。生产者线程和消费者线程需要按照一定顺序去访问缓冲区。

    所以,我们可以将生产消费模型总结为321原则:3种关系、2种角色、1个交易场所。

    • 3种关系:生产者和生产者(互斥关系),消费者和消费者(互斥关系),生产者和消费者(互斥和同步关系)
    • 2种角色:生产者和消费者
    • 1个交易场所:一段特定结构的缓冲区

    生产消费模型的运行本质就是321原则。

    (2)生产者消费者模型的特点

    对供货商而言,只需要给超市供大量的货即可,不用关心消费者什么时候来买。

    对消费者而言,只需要直接去超市买方便面就行,不用等待方便面的生产运输。

    对超市而言,只需要在水卖完时,告诉供货商进货,进完货后告诉消费者来买。

    生产消费模型中消费者和生产者各自只需要关心自己所做的事情,生产与消费线程线程之间完全独立,在计算机科学的角度,我们称其实现了消费者线程和生产者线程之间的解耦。

    我们大部分人在周一到周五都是上班上学,所以这些时间去超市买水的人会相对少,这个时候超市就可以适时多进货。而周末大家都放假了,去超市买水的人变多,因为之前进货也很多了,就不需要进货了。

    就像上面所说的策略,生产消费模型解决了生产者线程和消费者线程忙闲不均的问题。

    而超市作为交易场所,能够储存更多的物品,同样缓冲区也能储存更多的数据。如果消费者直接去找供货商,供货商一般都不会零售。纵使能够零售,直接去找生产者还要等待生成者完成商品生产,消耗时间成本高,效率低。

    生产者消费者模型提高了了生产者线程和消费者线程的执行效率。

    二、条件变量

    1.认识条件变量

    条件变量是用来描述某种临界资源是否就绪的一种数据化描述。

    比如说存在一个共享的容器,生产者线程负责生产数据到容器内,消费者线程负责从容器中中读取数据。消费者线程发现容器为空时,就不应当去竞争锁,而是阻塞等待,直到生产者线程将数据生成到容器中。

    要想让消费者线程等待,那就必须使用条件变量标识容器的状态,那么就需要用到条件变量。

    那条件变量到底是什么呢?

    假设超市的架子进货一次只放一瓶水,只有这瓶水被买走后,供货商才会进货。

    此时又有很多消费者来买水,只有竞争能力强的消费者才能买到水,甚至他们会不停地买。竞争能力弱的消费者,买不到水。放在线程中也是一样的,竞争能力弱的消费者线程始终抢不到锁,产生了饥饿问题。

    为了解决这个问题,超市的工作人员设置了一个柜台,所有消费者都在这里排队,有一瓶水摆上货架,工作人员就允许一个消费者进去买,没有水所有人就需要在外面等待。而如果消费者想买第二瓶,就只能重新排队。而这个柜台和工作人员就相当于条件变量。

    多线程互斥访问临界资源时,为了让这些线程按一定顺序访问。通常会将这些线程都放在条件变量的等待队列中,当其他线程让条件变量符合线程的唤醒条件时,队列中的第一个线程就会去访问临界资源。

    2.条件变量的使用

    条件变量同样是一个类(pthread_cond_t),由POSIX线程库维护,使用的是POSIX标准。它也可以构造对象pthread_cond_t cond,cond就是条件变量的对象。

    以下是条件变量的一些成员函数和使用代码:

    int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);

    头文件:pthread.h

    功能:初始化条件变量。

    参数:pthread_cond_t *restrict cond表示需要被初始化的条件变量的地址,const pthread_condattr_t *restrict attr表示条件变量的属性,一般都为nullptr。

    返回值:取消成功返回0,取消失败返回错误码。

    int pthread_cond_destroy(pthread_cond_t *cond);

    头文件:pthread.h

    功能:销毁互斥条件变量。

    参数:pthread_cond_t *cond表示需要被销毁的条件变量的地址。

    返回值:销毁成功返回0,失败返回错误码。

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

    如果是全局或static修饰的条件变量,使用上面语句初始化。

    int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

    头文件:pthread.h

    功能:将调用该接口的线程放入传入的条件变量等待队列中。

    参数:pthread_cond_t *restrict cond创建的条件变量地址。pthread_mutex_t *restrict mutex互斥锁的地址(为什么传锁以后会解释)。

    返回值:放入等待队列成功返回0,失败返回错误码。

    int pthread_cond_signal(pthread_cond_t *cond);

    头文件:pthread.h

    功能:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的一个线程。

    参数:pthread_cond_t *cond表示需要唤醒的线程所在的等待队列的条件变量地址。

    返回值:唤醒成功返回0,失败返回错误码。

    int pthread_cond_broadcast(pthread_cond_t *cond);

    头文件:pthread.h

    功能:由另一个线程(通常是主线程)唤醒指定条件变量等待队列中的所有线程。

    参数:pthread_cond_t *cond表示需要唤醒的线程所在的等待队列的条件变量地址。

    返回值:唤醒成功返回0,失败返回错误码。

    3.代码改造

    我们使用条件变量使所有进程可以以一定顺序抢票。

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. using namespace std;
    7. #define NUM 5
    8. pthread_mutex_t mutx = PTHREAD_MUTEX_INITIALIZER;//构建一个全局锁
    9. pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//构建一个全局条件变量
    10. int tickets = 10;
    11. class pthread_data
    12. {
    13. public:
    14. pthread_t tid;
    15. char buffer[64];
    16. };
    17. void* start_routine(void* args)
    18. {
    19. pthread_data* p = (pthread_data*)args;
    20. string s;
    21. s += p->buffer;
    22. s += "Remaining tickets:";
    23. while(1)
    24. {
    25. pthread_mutex_lock(&mutx);//加锁
    26. pthread_cond_wait(&cond, &mutx);//线程进入等待队列
    27. if(tickets > 0)
    28. {
    29. --tickets;
    30. pthread_mutex_unlock(&mutx);//解锁
    31. printf("%s%d\n", s.c_str(), tickets);//不修改临界资源,可以不包含在内
    32. }
    33. else
    34. {
    35. pthread_mutex_unlock(&mutx);//解锁
    36. break;
    37. }
    38. }
    39. pthread_exit(nullptr);
    40. }
    41. int main()
    42. {
    43. vector vpd;
    44. //创建多个线程
    45. for(int i = 0; i
    46. {
    47. pthread_data* pd = new pthread_data;
    48. snprintf(pd->buffer, sizeof(pd->buffer), "thread%d buy ticket:",i+1);
    49. pthread_create(&(pd->tid), nullptr, start_routine, (void*)pd);
    50. vpd.push_back(pd);
    51. }
    52. //主线程唤醒其他线程
    53. for(;;)
    54. {
    55. sleep(1);
    56. pthread_cond_signal(&cond);
    57. printf("main thread wake up a thread\n");
    58. }
    59. //线程回收
    60. for(int i = 0; i
    61. {
    62. pthread_join(vpd[i]->tid, nullptr);
    63. delete vpd[i];
    64. }
    65. return 0;
    66. }

    条件变量、票数和锁都是全局变量,每个线程申请锁成功就进入条件变量等待队列。主线程每个一秒钟唤醒一个等待的线程抢票。

    运行结果:

    可以发现线程按12345的顺序循环抢票。

    使用pthread_cond_broadcast()接口可以一次唤醒条件变量等待队列中的的所有线程,每隔一秒唤醒一次。

    运行结果:

    仍然是按照一定顺序抢票,只是进行抢票的线程是5个同时进行。

    三、基于阻塞队列的生产消费模型

    既然讲了这么半天的生产消费模型,那我们不妨实现一个。

    1.阻塞队列类

    首先需要搭建模型的框架,也就是实现包括生产者线程、消费者线程,还有一个储存数据的阻塞队列(缓冲区)的简单执行代码。

    (1)阻塞队列

    阻塞队列的实现有以下注意事项:

    阻塞队列可使用C++STL中的queue实现。

    由于阻塞队列是公共资源,所以必须保证它是线程安全的。生产者线程和消费者线程需要互斥访问,其实也只需要一把互斥锁就能实现生产者和消费者间的互斥。以后的生产者和生产者,消费者和消费者之间的互斥也是这样实现的。

    只有阻塞队列中有数据消费者才能读取,消费者读取时,生产者不能生产,必须在等待队列中。

    阻塞队列中没有数据或者数据未填满时,生产者才能生产,消费者在生产的时候,消费者不能读取,必须在等待队列中。

    1. template
    2. class Blockqueue
    3. {
    4. public:
    5. //构造函数
    6. Blockqueue(size_t capcity = MAX_NUM)
    7. :_capcity(capcity)
    8. {
    9. pthread_mutex_init(&_mutx, nullptr);
    10. pthread_cond_init(&_pcond, nullptr);
    11. pthread_cond_init(&_ccond, nullptr);
    12. }
    13. //析构函数
    14. ~Blockqueue()
    15. {
    16. pthread_mutex_destroy(&_mutx);
    17. pthread_cond_destroy(&_pcond);
    18. pthread_cond_destroy(&_ccond);
    19. }
    20. //生产数据
    21. void push(const T& data);
    22. //消费数据
    23. void pop(T* data);
    24. private:
    25. //检测队列是否装满
    26. size_t Isfull() const
    27. {
    28. return (_q.size() == _capcity);
    29. }
    30. std::queue _q;
    31. pthread_mutex_t _mutx;
    32. pthread_cond_t _pcond;
    33. pthread_cond_t _ccond;
    34. size_t _capcity;
    35. };

    为了保持生产者和消费者的互斥,我们对生产者和消费者各使用一个条件变量,用一个锁控制对阻塞队列的访问。

    需要给阻塞队列储存的数据量设置一个上限,生产者线程不能无限制地生产数据。

    构造函数中初始化锁和条件变量,在析构函数中释放互斥锁和条件变量。阻塞队列的容量设置一个合适的缺省值。

    (2)实现生产者的生产函数

    生产数据我们一般使用push作为函数名。

    1. void push(const T& data)
    2. {
    3. //下面的判断就开始使用共享资源,需要加锁
    4. pthread_mutex_lock(&_mutx);
    5. //如果当前队列是满的,那就需要将生产者线程加入等待队列挂起
    6. if(Isfull())
    7. {
    8. pthread_cond_wait(&_pcond, &_mutx)
    9. }
    10. _q.push(data);
    11. //唤醒消费者线程消费
    12. pthread_cond_signal(&_ccond, &_mutx);
    13. //解锁
    14. pthread_mutex_unlock(&_mutx);
    15. }

    生产者线程调用生产数据接口时,先申请锁进入临界区。

    当阻塞队列满时,在生产者条件变量将线程放入等待队列中挂起。

    当阻塞队列不满时,生产者生产数据到阻塞队列,由于消费者线程全部在等待,所以需要唤醒消费者线程消费数据,否则生产者会一直生产至满。

    (3)实现消费者的消费函数

    消费数据我们一般使用pop作为函数名。

    1. void pop(T* data)
    2. {
    3. //下面的判断就开始使用共享资源,需要加锁
    4. pthread_mutex_lock(&_mutx);
    5. //如果当前队列是空的,那就需要将消费者线程加入等待队列挂起
    6. if(_q.empty())
    7. {
    8. pthread_cond_wait(&_ccond, &_mutx)
    9. }
    10. //将数据输出到data中并删除
    11. *data = _q.front();
    12. _q.pop();
    13. //唤醒生产者线程生产
    14. pthread_cond_signal(&_pcond, &_mutx);
    15. //解锁
    16. pthread_mutex_unlock(&_mutx);
    17. }

    消费者线程同样先申请到锁后进入临界区。

    当阻塞队列为空时,没有数据可以消费,消费者挂起等待。

    当阻塞队列为不为空时,消费者消费数据,由生产者线程全部在等待,所以需要唤醒生产者线程消费数据,否则消费者会一直消费至空。

    2.pthread_cond_wait为什么要传入锁

    使用pthread_cond_wait接口时,必须传如一个锁,这一点我们没有解释。

    线程在条件变量的等待队列中排队等待,其目的就是要拿到要访问临界资源的那把锁,申请到锁,线程就可以进入临界区。

    如果一个线程拿到了锁,而又发现自己不满足条件需要挂起等待。按照之前的知识,该线程应该继续拿着锁进入条件变量的等待队列。即使其他线程被唤醒了,因为申请不到到锁,无法访问共享资源,只能被挂起。

    为了解决这个问题,pthread_cond_wiat的实现大致分三个步骤:挂起该线程->释放锁->记录锁。

    也就是说,只要是持有锁的线程进入等待队列,就自动释放自己持有的锁,而释放锁也是原子性操作,不会引起线程安全问题。

    在最后,接口还会记录当前进程挂起时释放的锁,这也就解释了为什么唤醒线程的时候,pthread_cond_signal(pthread_cond_t* cond)只有一个参数,被唤醒线程根据记录就知道它该去申请哪把锁。

    我们最终得到以下结论:

    • 参数传递的这个锁必须是正在使用的锁。
    • 调用pthread_cond_wait函数的进程,会以原子性的方式,将锁释放,并将自己挂起。
    • 线程在被唤醒的时候,会自动重新获取挂起时传入的锁。

    3.生产者和消费者线程的执行函数

    (1)执行函数

    生产者不断生成随机数再将数据插入阻塞队列。消费者不断将随机数再从阻塞队列中拿出来。

    1. //生产者
    2. void* Produce(void* args)
    3. {
    4. Blockqueue<int>* bq = (Blockqueue<int>*)args;
    5. while(1)
    6. {
    7. sleep(1);
    8. int data = rand()%10;
    9. bq->push(data);
    10. printf("生产数据完成,数据为:%d\n", data);
    11. }
    12. return nullptr;
    13. }
    14. //消费者
    15. void* Consume(void* args)
    16. {
    17. Blockqueue<int>* bq = (Blockqueue<int>*)args;
    18. while(1)
    19. {
    20. sleep(1);
    21. int data = 0;
    22. bq->pop(&data);
    23. printf("消费数据完成,数据为:%d\n", data);
    24. }
    25. return nullptr;
    26. }

    (2)试运行

    我们只创建一个生产者线程和一个消费者线程,并且让生产者每一秒生产一个数据,所以最终代码如下:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. using namespace std;
    7. #define MAX_NUM 10
    8. template<class T>
    9. class Blockqueue
    10. {
    11. public:
    12. //构造函数
    13. Blockqueue(size_t capcity = MAX_NUM)
    14. :_capcity(capcity)
    15. {
    16. pthread_mutex_init(&_mutx, nullptr);
    17. pthread_cond_init(&_pcond, nullptr);
    18. pthread_cond_init(&_ccond, nullptr);
    19. }
    20. //析构函数
    21. ~Blockqueue()
    22. {
    23. pthread_mutex_destroy(&_mutx);
    24. pthread_cond_destroy(&_pcond);
    25. pthread_cond_destroy(&_ccond);
    26. }
    27. //生产数据
    28. void push(const T& data)
    29. {
    30. //下面的判断就开始使用共享资源,需要加锁
    31. pthread_mutex_lock(&_mutx);
    32. //如果当前队列是满的,那就需要将生产者线程加入等待队列挂起
    33. if(Isfull())
    34. {
    35. pthread_cond_wait(&_pcond, &_mutx);
    36. }
    37. _q.push(data);
    38. //唤醒消费者线程消费
    39. pthread_cond_signal(&_ccond);
    40. //解锁
    41. pthread_mutex_unlock(&_mutx);
    42. }
    43. //消费数据
    44. void pop(T* data)
    45. {
    46. //下面的判断就开始使用共享资源,需要加锁
    47. pthread_mutex_lock(&_mutx);
    48. //如果当前队列是空的,那就需要将消费者线程加入等待队列挂起
    49. if(_q.empty())
    50. {
    51. pthread_cond_wait(&_ccond, &_mutx);
    52. }
    53. //将数据输出到data中并删除
    54. *data = _q.front();
    55. _q.pop();
    56. //唤醒生产者线程生产
    57. pthread_cond_signal(&_pcond);
    58. //解锁
    59. pthread_mutex_unlock(&_mutx);
    60. }
    61. private:
    62. //检测队列是否装满
    63. size_t Isfull() const
    64. {
    65. return (_q.size() == _capcity);
    66. }
    67. std::queue _q;
    68. pthread_mutex_t _mutx;
    69. pthread_cond_t _pcond;
    70. pthread_cond_t _ccond;
    71. size_t _capcity;
    72. };
    73. //生产者
    74. void* Produce(void* args)
    75. {
    76. Blockqueue<int>* bq = (Blockqueue<int>*)args;
    77. while(1)
    78. {
    79. sleep(1);
    80. int data = rand()%10;
    81. bq->push(data);
    82. printf("生产数据完成,数据为:%d\n", data);
    83. }
    84. return nullptr;
    85. }
    86. //消费者
    87. void* Consume(void* args)
    88. {
    89. Blockqueue<int>* bq = (Blockqueue<int>*)args;
    90. while(1)
    91. {
    92. //sleep(1);
    93. int data = 0;
    94. bq->pop(&data);
    95. printf("消费数据完成,数据为:%d\n", data);
    96. }
    97. return nullptr;
    98. }
    99. int main()
    100. {
    101. srand((unsigned int)time(nullptr));
    102. Blockqueue<int>* bq = new Blockqueue<int>;
    103. pthread_t tids[2];
    104. pthread_create(&tids[0], nullptr, Produce, (void*)bq);
    105. pthread_create(&tids[1], nullptr, Consume, (void*)bq);
    106. pthread_join(tids[0], nullptr);
    107. pthread_join(tids[1], nullptr);
    108. return 0;
    109. }

    运行结果:

    4.部分细节处理

    对于一个消费者和一个生产者的模型而言,上面的代码的确足够了。但是生产消费模型的生产者和消费者都应当有多个,此时我们就需要对其进行修改。

    (1)伪唤醒问题

    如果现在有多个生产者线程在进行数据生产。当阻塞队列满了以后,所有生产者线程都会在条件变量的等待队列中等待。

    第一种情况,某个生产者线程调用挂起接口失败。

    pthread_cond_wait即使调用失败,它也只会返回错误码,并不能阻断执行流继续向下执行。所以,即使出错,生产者还是会生成数据到阻塞队列中。

    第二种情况,一个消费者线程一次唤醒所有生产者线程。

    比如,现在阻塞队列满了,消费者线程消费了一个数据并且唤醒了所有的生产者线程,那么很多个生产者向阻塞队列的一个空位置生成数据,同样会出现上述问题。

    上面这种情况被叫做伪唤醒,再生产者和消费者线程中都存在这个问题。所以需要让执行流只要不满足队列满或空的条件就循环执行pthread_cond_wait。执行成功了,该线程就被挂起了,执行流不再运行;执行失败了,执行流会一直调用pthread_cond_wait,执行流也不会向下操作数据。

    生产数据

    消费数据

    (2)解锁与唤醒的顺序

    在生产者生产完数据后,它需要做两件事:唤醒消费者线程和归还锁。

    对于消费者进程也是一样的:唤醒生产者线程和归还锁。

    由于唤醒线程是不对共享资源进行操作的,所以对于唤醒线程和解锁的顺序谁先谁后都可以。只是更建议先唤醒再解锁。

    5.处理任务的生产消费模型

    (1)代码改造

    我们使用生产消费模型可不是用来保存随机数的,而是用它处理任务的。

    我们可以写一个保存计算方法的任务类,从而实现一个随机数计算器。

    任务类

    1. //任务类
    2. class Task
    3. {
    4. typedef std::function<int(int,int,char)> func_t;
    5. public:
    6. //默认构造
    7. Task()
    8. {}
    9. //构造函数
    10. Task(int a, int b, char op, func_t func)
    11. :_a(a)
    12. ,_b(b)
    13. ,_op(op)
    14. ,_func(func)
    15. {}
    16. //仿函数
    17. string operator()()
    18. {
    19. int result = _func(_a, _b, _op);
    20. char buffer[64];
    21. snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _a, _op, _b, result);
    22. string s(buffer);
    23. return s;
    24. }
    25. //显示任务
    26. string show_task()
    27. {
    28. char buffer[64];
    29. snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _a, _op, _b);
    30. string s(buffer);
    31. return s;
    32. }
    33. private:
    34. func_t _func;
    35. int _a;
    36. int _b;
    37. char _op;
    38. };

    设置处理任务的函数,修改生产者和消费者线程执行的函数。

    1. //计算器函数
    2. const string ops = "+-*/%";
    3. int calculate(int a, int b, char op)
    4. {
    5. int result = 0;
    6. switch(op)
    7. {
    8. case '+':
    9. result = a + b;
    10. break;
    11. case '-':
    12. result = a - b;
    13. break;
    14. case '*':
    15. result = a * b;
    16. break;
    17. case '/':
    18. {
    19. if(b == 0)
    20. cerr << "除数不能为0\n";
    21. else
    22. result = a / b;
    23. }
    24. break;
    25. case '%':
    26. {
    27. if(b == 0)
    28. cerr << "取模的数字不能为0\n";
    29. else
    30. result = a % b;
    31. }
    32. break;
    33. default:
    34. break;
    35. }
    36. return result;
    37. }
    38. //生产者
    39. void* Produce(void* args)
    40. {
    41. Blockqueue* bq = (Blockqueue*)args;
    42. while(1)
    43. {
    44. sleep(1);
    45. int a = rand()%10;
    46. int b = rand()%10;
    47. int opnum = rand()%ops.size();
    48. Task data(a, b, ops[opnum], calculate);
    49. string s = "数据生产完成,需要计算:";
    50. bq->push(data);
    51. s += data.show_task().c_str();
    52. cout << s;
    53. }
    54. return nullptr;
    55. }

    (2)运行

    在主线程中多创建几个线程就可以运行了。

    总代码如下:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. using namespace std;
    8. #define MAX_NUM 10
    9. //任务类
    10. class Task
    11. {
    12. typedef std::function<int(int,int,char)> func_t;
    13. public:
    14. //默认构造
    15. Task()
    16. {}
    17. //构造函数
    18. Task(int a, int b, char op, func_t func)
    19. :_a(a)
    20. ,_b(b)
    21. ,_op(op)
    22. ,_func(func)
    23. {}
    24. //仿函数
    25. string operator()()
    26. {
    27. int result = _func(_a, _b, _op);
    28. char buffer[64];
    29. snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _a, _op, _b, result);
    30. string s(buffer);
    31. return s;
    32. }
    33. //显示任务
    34. string show_task()
    35. {
    36. char buffer[64];
    37. snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _a, _op, _b);
    38. string s(buffer);
    39. return s;
    40. }
    41. private:
    42. func_t _func;
    43. int _a;
    44. int _b;
    45. char _op;
    46. };
    47. template<class T>
    48. class Blockqueue
    49. {
    50. public:
    51. //构造函数
    52. Blockqueue(size_t capcity = MAX_NUM)
    53. :_capcity(capcity)
    54. {
    55. pthread_mutex_init(&_mutx, nullptr);
    56. pthread_cond_init(&_pcond, nullptr);
    57. pthread_cond_init(&_ccond, nullptr);
    58. }
    59. //析构函数
    60. ~Blockqueue()
    61. {
    62. pthread_mutex_destroy(&_mutx);
    63. pthread_cond_destroy(&_pcond);
    64. pthread_cond_destroy(&_ccond);
    65. }
    66. //生产数据
    67. void push(const T& data)
    68. {
    69. //下面的判断就开始使用共享资源,需要加锁
    70. pthread_mutex_lock(&_mutx);
    71. //如果当前队列是满的,那就需要将生产者线程加入等待队列挂起
    72. while(Isfull())
    73. {
    74. pthread_cond_wait(&_pcond, &_mutx);
    75. }
    76. _q.push(data);
    77. //唤醒消费者线程消费
    78. pthread_cond_signal(&_ccond);
    79. //解锁
    80. pthread_mutex_unlock(&_mutx);
    81. }
    82. //消费数据
    83. void pop(T* data)
    84. {
    85. //下面的判断就开始使用共享资源,需要加锁
    86. pthread_mutex_lock(&_mutx);
    87. //如果当前队列是空的,那就需要将消费者线程加入等待队列挂起
    88. while(_q.empty())
    89. {
    90. pthread_cond_wait(&_ccond, &_mutx);
    91. }
    92. //将数据输出到data中并删除
    93. *data = _q.front();
    94. _q.pop();
    95. //唤醒生产者线程生产
    96. pthread_cond_signal(&_pcond);
    97. //解锁
    98. pthread_mutex_unlock(&_mutx);
    99. }
    100. private:
    101. //检测队列是否装满
    102. size_t Isfull() const
    103. {
    104. return (_q.size() == _capcity);
    105. }
    106. std::queue _q;
    107. pthread_mutex_t _mutx;
    108. pthread_cond_t _pcond;
    109. pthread_cond_t _ccond;
    110. size_t _capcity;
    111. };
    112. //计算器函数
    113. const string ops = "+-*/%";
    114. int calculate(int a, int b, char op)
    115. {
    116. int result = 0;
    117. switch(op)
    118. {
    119. case '+':
    120. result = a + b;
    121. break;
    122. case '-':
    123. result = a - b;
    124. break;
    125. case '*':
    126. result = a * b;
    127. break;
    128. case '/':
    129. {
    130. if(b == 0)
    131. cerr << "除数不能为0\n";
    132. else
    133. result = a / b;
    134. }
    135. break;
    136. case '%':
    137. {
    138. if(b == 0)
    139. cerr << "取模的数字不能为0\n";
    140. else
    141. result = a % b;
    142. }
    143. break;
    144. default:
    145. break;
    146. }
    147. return result;
    148. }
    149. //生产者
    150. void* Produce(void* args)
    151. {
    152. Blockqueue* bq = (Blockqueue*)args;
    153. while(1)
    154. {
    155. sleep(1);
    156. int a = rand()%10;
    157. int b = rand()%10;
    158. int opnum = rand()%ops.size();
    159. Task data(a, b, ops[opnum], calculate);
    160. string s = "数据生产完成,需要计算:";
    161. bq->push(data);
    162. s += data.show_task().c_str();
    163. cout << s;
    164. }
    165. return nullptr;
    166. }
    167. //消费者
    168. void* Consume(void* args)
    169. {
    170. Blockqueue* bq = (Blockqueue*)args;
    171. while(1)
    172. {
    173. //sleep(1);
    174. Task data;
    175. string s = "数据消费完成,计算结果为:";
    176. bq->pop(&data);
    177. string result = data();
    178. s += result;
    179. cout << s;
    180. }
    181. return nullptr;
    182. }
    183. #define NUM_PRODUCE 3
    184. #define NUM_CONSUME 3
    185. int main()
    186. {
    187. srand((unsigned int)time(nullptr));
    188. Blockqueue* bq = new Blockqueue;
    189. pthread_t ptids[NUM_PRODUCE];
    190. pthread_t ctids[NUM_CONSUME];
    191. //创建多个生产者线程
    192. for(int i = 0; i
    193. {
    194. pthread_create(&ptids[i], nullptr, Produce, (void*)bq);
    195. }
    196. //创建多个消费者线程
    197. for(int i = 0; i
    198. {
    199. pthread_create(&ctids[i], nullptr, Consume, (void*)bq);
    200. }
    201. //回收所有线程
    202. for(int i = 0; i
    203. {
    204. pthread_join(ptids[i], nullptr);
    205. }
    206. for(int i = 0; i
    207. {
    208. pthread_join(ctids[i], nullptr);
    209. }
    210. return 0;
    211. }

    运行结果:

    有一个地方我一直没说,像cout 所以我们在打印信息时尽量使用单句printf或者cout

    6.生产消费模型为何高效

    该模型中,多个生产者线程向阻塞队列生成数据,多个消费者线程也从阻塞队列中消费数据。

    各生产消费者线程之间互斥关系,各线程对于阻塞队列的访问是串行的。同一时间访问阻塞队列的线程只有一个,拿这样又何来高效呢?

    注意观察程序的运行逻辑,我们能发现只有临界区的代码是串行的,其他代码所有线程都是并发执行的。这些非临界区的代码通常耗时长,而它们是并发的,所以该模型的效率就变得很高。

    结论:生产消费模型的高效不体现在对临界资源的访问上,而是体现在对非临界区代码的并发执行上。

    四、双阻塞队列的生产消费模型

    我么们可以使用上面的生产者消费者模型,将消费者处理完的计算任务保存成日志,并储存到磁盘上。

    所以该模型有两个阻塞队列,一个阻塞队列用于保存计算任务,另一个阻塞队列用于保存保存任务。原来的生产者线程还是生产者,原来的消费者作为中间的线程,既是消费者也是生产者,保存线程是消费者。

    1.编写类代码

    我们需要构建四个类:计算任务类、保存任务类、阻塞队列类和多队列集合类。

    • 计算任务类就是之前的Task类,我们其他代码不用动,对它改个CalTask的名字就可以。
    • 保存任务类与计算任务类很相似,需要传入一个string表示需要打印的信息,还有一个函数对象表示保存数据的具体执行函数。
    • 阻塞队列类也不用改
    • 多队列集合类是为了线程执行函数的void* args传参设计的,包含了两个阻塞队列指针。
    1. //计算任务类
    2. class CalTask
    3. {
    4. typedef std::function<int(int,int,char)> func_t;
    5. public:
    6. //默认构造
    7. CalTask()
    8. {}
    9. //构造函数
    10. CalTask(int a, int b, char op, func_t func)
    11. :_a(a)
    12. ,_b(b)
    13. ,_op(op)
    14. ,_func(func)
    15. {}
    16. //仿函数
    17. string operator()()
    18. {
    19. int result = _func(_a, _b, _op);
    20. char buffer[64];
    21. snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _a, _op, _b, result);
    22. string s(buffer);
    23. return s;
    24. }
    25. //显示任务
    26. string show_task()
    27. {
    28. char buffer[64];
    29. snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _a, _op, _b);
    30. string s(buffer);
    31. return s;
    32. }
    33. private:
    34. func_t _func;
    35. int _a;
    36. int _b;
    37. char _op;
    38. };
    39. //保存任务类
    40. class SaveTask
    41. {
    42. typedef function<void(const string&)> func_t;
    43. public:
    44. //默认构造
    45. SaveTask()
    46. {}
    47. //构造函数
    48. SaveTask(string message, func_t func)
    49. :_message(message)
    50. ,_func(func)
    51. {}
    52. //仿函数
    53. void operator()()
    54. {
    55. _func(_message);
    56. }
    57. private:
    58. string _message;
    59. func_t _func;
    60. };
    61. //阻塞队列类
    62. template<class T>
    63. class Blockqueue
    64. {
    65. public:
    66. //构造函数
    67. Blockqueue(size_t capcity = MAX_NUM)
    68. :_capcity(capcity)
    69. {
    70. pthread_mutex_init(&_mutx, nullptr);
    71. pthread_cond_init(&_pcond, nullptr);
    72. pthread_cond_init(&_ccond, nullptr);
    73. }
    74. //析构函数
    75. ~Blockqueue()
    76. {
    77. pthread_mutex_destroy(&_mutx);
    78. pthread_cond_destroy(&_pcond);
    79. pthread_cond_destroy(&_ccond);
    80. }
    81. //生产数据
    82. void push(const T& data)
    83. {
    84. //下面的判断就开始使用共享资源,需要加锁
    85. pthread_mutex_lock(&_mutx);
    86. //如果当前队列是满的,那就需要将生产者线程加入等待队列挂起
    87. while(Isfull())
    88. {
    89. pthread_cond_wait(&_pcond, &_mutx);
    90. }
    91. _q.push(data);
    92. //唤醒消费者线程消费
    93. pthread_cond_signal(&_ccond);
    94. //解锁
    95. pthread_mutex_unlock(&_mutx);
    96. }
    97. //消费数据
    98. void pop(T* data)
    99. {
    100. //下面的判断就开始使用共享资源,需要加锁
    101. pthread_mutex_lock(&_mutx);
    102. //如果当前队列是空的,那就需要将消费者线程加入等待队列挂起
    103. while(_q.empty())
    104. {
    105. pthread_cond_wait(&_ccond, &_mutx);
    106. }
    107. //将数据输出到data中并删除
    108. *data = _q.front();
    109. _q.pop();
    110. //唤醒生产者线程生产
    111. pthread_cond_signal(&_pcond);
    112. //解锁
    113. pthread_mutex_unlock(&_mutx);
    114. }
    115. private:
    116. //检测队列是否装满
    117. size_t Isfull() const
    118. {
    119. return (_q.size() == _capcity);
    120. }
    121. std::queue _q;
    122. pthread_mutex_t _mutx;
    123. pthread_cond_t _pcond;
    124. pthread_cond_t _ccond;
    125. size_t _capcity;
    126. };
    127. //多队列集合类
    128. template<class A,class B>
    129. class Blockqueues
    130. {
    131. public:
    132. Blockqueue* _q1;

    2.增加处理保存任务的函数

    我们之前对计算任务有一个处理函数calculate,那处理保存任务也同样需要一个函数Savedata。

    1. //保存函数
    2. void Savedata(const string& message)
    3. {
    4. //需要保存信息的文件
    5. char buffer[64] = "log.txt";
    6. //按追加方式打开文件
    7. FILE* fp = fopen(buffer, "a");
    8. if(fp == nullptr)
    9. {
    10. cerr << "文件打开失败" << endl;
    11. return;
    12. }
    13. fprintf(fp, "%s", message.c_str());
    14. fclose(fp);
    15. }

    3.更改线程函数

    生产者线程此时依旧生产,只不过生产的位置是第一个阻塞队列。

    消费者线程此时也成为了后一个模型的生产者,那就需要添加向第二个队列生产的代码。

    保存者线程此时是消费者,从第二个队列中取任务执行。

    1. //生产者
    2. void* Produce(void* args)
    3. {
    4. Blockqueues* bqs = (Blockqueues*)args;
    5. //生产数据
    6. while(1)
    7. {
    8. sleep(1);
    9. int a = rand()%10;
    10. int b = rand()%10;
    11. int opnum = rand()%ops.size();
    12. CalTask data(a, b, ops[opnum], calculate);
    13. string s = "数据生产完成,需要计算:";
    14. bqs->_q1->push(data);
    15. s += data.show_task().c_str();
    16. cout << s;
    17. }
    18. return nullptr;
    19. }
    20. //消费者
    21. void* Consume(void* args)
    22. {
    23. Blockqueues* bqs = (Blockqueues*)args;
    24. while(1)
    25. {
    26. //消费数据
    27. //sleep(1);
    28. CalTask data;
    29. string s1 = "数据消费完成,计算结果为:";
    30. bqs->_q1->pop(&data);
    31. string result = data();
    32. s1 += result;
    33. cout << s1;
    34. //生成待保存的数据
    35. string s2 = "数据保存任务推送完毕\n";
    36. SaveTask task = SaveTask(result, Savedata);
    37. bqs->_q2->push(task);
    38. cout << s2;
    39. }
    40. return nullptr;
    41. }
    42. void* Save(void* args)
    43. {
    44. Blockqueues* bqs = (Blockqueues*)args;
    45. while(1)
    46. {
    47. //sleep(1);
    48. SaveTask data;
    49. string s = "数据保存完成\n";
    50. bqs->_q2->pop(&data);
    51. data();
    52. cout << s;
    53. }
    54. return nullptr;
    55. }

    4.更改main函数

    最后我们在main函数中创建两个队列,创建三种线程最后加上回收代码就可以了。

    所以总代码如下:

    produce_consume.h

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #define MAX_NUM 10
    9. using namespace std;
    10. //计算任务类
    11. class CalTask
    12. {
    13. typedef std::function<int(int,int,char)> func_t;
    14. public:
    15. //默认构造
    16. CalTask()
    17. {}
    18. //构造函数
    19. CalTask(int a, int b, char op, func_t func)
    20. :_a(a)
    21. ,_b(b)
    22. ,_op(op)
    23. ,_func(func)
    24. {}
    25. //仿函数
    26. string operator()()
    27. {
    28. int result = _func(_a, _b, _op);
    29. char buffer[64];
    30. snprintf(buffer, sizeof(buffer), "%d %c %d = %d\n", _a, _op, _b, result);
    31. string s(buffer);
    32. return s;
    33. }
    34. //显示任务
    35. string show_task()
    36. {
    37. char buffer[64];
    38. snprintf(buffer, sizeof(buffer), "%d %c %d = ?\n", _a, _op, _b);
    39. string s(buffer);
    40. return s;
    41. }
    42. private:
    43. func_t _func;
    44. int _a;
    45. int _b;
    46. char _op;
    47. };
    48. //保存任务类
    49. class SaveTask
    50. {
    51. typedef function<void(const string&)> func_t;
    52. public:
    53. //默认构造
    54. SaveTask()
    55. {}
    56. //构造函数
    57. SaveTask(string message, func_t func)
    58. :_message(message)
    59. ,_func(func)
    60. {}
    61. //仿函数
    62. void operator()()
    63. {
    64. _func(_message);
    65. }
    66. private:
    67. string _message;
    68. func_t _func;
    69. };
    70. //阻塞队列类
    71. template<class T>
    72. class Blockqueue
    73. {
    74. public:
    75. //构造函数
    76. Blockqueue(size_t capcity = MAX_NUM)
    77. :_capcity(capcity)
    78. {
    79. pthread_mutex_init(&_mutx, nullptr);
    80. pthread_cond_init(&_pcond, nullptr);
    81. pthread_cond_init(&_ccond, nullptr);
    82. }
    83. //析构函数
    84. ~Blockqueue()
    85. {
    86. pthread_mutex_destroy(&_mutx);
    87. pthread_cond_destroy(&_pcond);
    88. pthread_cond_destroy(&_ccond);
    89. }
    90. //生产数据
    91. void push(const T& data)
    92. {
    93. //下面的判断就开始使用共享资源,需要加锁
    94. pthread_mutex_lock(&_mutx);
    95. //如果当前队列是满的,那就需要将生产者线程加入等待队列挂起
    96. while(Isfull())
    97. {
    98. pthread_cond_wait(&_pcond, &_mutx);
    99. }
    100. _q.push(data);
    101. //唤醒消费者线程消费
    102. pthread_cond_signal(&_ccond);
    103. //解锁
    104. pthread_mutex_unlock(&_mutx);
    105. }
    106. //消费数据
    107. void pop(T* data)
    108. {
    109. //下面的判断就开始使用共享资源,需要加锁
    110. pthread_mutex_lock(&_mutx);
    111. //如果当前队列是空的,那就需要将消费者线程加入等待队列挂起
    112. while(_q.empty())
    113. {
    114. pthread_cond_wait(&_ccond, &_mutx);
    115. }
    116. //将数据输出到data中并删除
    117. *data = _q.front();
    118. _q.pop();
    119. //唤醒生产者线程生产
    120. pthread_cond_signal(&_pcond);
    121. //解锁
    122. pthread_mutex_unlock(&_mutx);
    123. }
    124. private:
    125. //检测队列是否装满
    126. size_t Isfull() const
    127. {
    128. return (_q.size() == _capcity);
    129. }
    130. std::queue _q;
    131. pthread_mutex_t _mutx;
    132. pthread_cond_t _pcond;
    133. pthread_cond_t _ccond;
    134. size_t _capcity;
    135. };
    136. //多队列集合类
    137. template<class A,class B>
    138. class Blockqueues
    139. {
    140. public:
    141. Blockqueue* _q1;

    produce_consume.cc

    1. #include"produce_consume.h"
    2. using namespace std;
    3. //计算器函数
    4. const string ops = "+-*/%";
    5. int calculate(int a, int b, char op)
    6. {
    7. int result = 0;
    8. switch(op)
    9. {
    10. case '+':
    11. result = a + b;
    12. break;
    13. case '-':
    14. result = a - b;
    15. break;
    16. case '*':
    17. result = a * b;
    18. break;
    19. case '/':
    20. {
    21. if(b == 0)
    22. cerr << "除数不能为0\n";
    23. else
    24. result = a / b;
    25. }
    26. break;
    27. case '%':
    28. {
    29. if(b == 0)
    30. cerr << "取模的数字不能为0\n";
    31. else
    32. result = a % b;
    33. }
    34. break;
    35. default:
    36. break;
    37. }
    38. return result;
    39. }
    40. //保存函数
    41. void Savedata(const string& message)
    42. {
    43. //需要保存信息的文件
    44. char buffer[64] = "log.txt";
    45. //按追加方式打开文件
    46. FILE* fp = fopen(buffer, "a");
    47. if(fp == nullptr)
    48. {
    49. cerr << "文件打开失败" << endl;
    50. return;
    51. }
    52. fprintf(fp, "%s", message.c_str());
    53. fclose(fp);
    54. }
    55. //生产者
    56. void* Produce(void* args)
    57. {
    58. Blockqueues* bqs = (Blockqueues*)args;
    59. //生产数据
    60. while(1)
    61. {
    62. sleep(1);
    63. int a = rand()%10;
    64. int b = rand()%10;
    65. int opnum = rand()%ops.size();
    66. CalTask data(a, b, ops[opnum], calculate);
    67. string s = "数据生产完成,需要计算:";
    68. bqs->_q1->push(data);
    69. s += data.show_task().c_str();
    70. cout << s;
    71. }
    72. return nullptr;
    73. }
    74. //消费者
    75. void* Consume(void* args)
    76. {
    77. Blockqueues* bqs = (Blockqueues*)args;
    78. while(1)
    79. {
    80. //消费数据
    81. //sleep(1);
    82. CalTask data;
    83. string s1 = "数据消费完成,计算结果为:";
    84. bqs->_q1->pop(&data);
    85. string result = data();
    86. s1 += result;
    87. cout << s1;
    88. //生成待保存的数据
    89. string s2 = "数据保存任务推送完毕\n";
    90. SaveTask task = SaveTask(result, Savedata);
    91. bqs->_q2->push(task);
    92. cout << s2;
    93. }
    94. return nullptr;
    95. }
    96. void* Save(void* args)
    97. {
    98. Blockqueues* bqs = (Blockqueues*)args;
    99. while(1)
    100. {
    101. //sleep(1);
    102. SaveTask data;
    103. string s = "数据保存完成\n";
    104. bqs->_q2->pop(&data);
    105. data();
    106. cout << s;
    107. }
    108. return nullptr;
    109. }
    110. #define NUM_PRODUCE 3
    111. #define NUM_CONSUME 3
    112. #define NUM_SAVE 3
    113. int main()
    114. {
    115. srand((unsigned int)time(nullptr));
    116. Blockqueues* bqs = new Blockqueues();
    117. bqs->_q1 = new Blockqueue();
    118. bqs->_q2 = new Blockqueue();
    119. pthread_t ptids[NUM_PRODUCE];
    120. pthread_t ctids[NUM_CONSUME];
    121. pthread_t stids[NUM_SAVE];
    122. //创建多个生产者线程
    123. for(int i = 0; i
    124. {
    125. pthread_create(&ptids[i], nullptr, Produce, (void*)bqs);
    126. }
    127. //创建多个消费者线程
    128. for(int i = 0; i
    129. {
    130. pthread_create(&ctids[i], nullptr, Consume, (void*)bqs);
    131. }
    132. //创建多个保存者线程
    133. for(int i = 0; i
    134. {
    135. pthread_create(&stids[i], nullptr, Save, (void*)bqs);
    136. }
    137. //回收所有线程
    138. for(int i = 0; i
    139. {
    140. pthread_join(ptids[i], nullptr);
    141. }
    142. for(int i = 0; i
    143. {
    144. pthread_join(ctids[i], nullptr);
    145. }
    146. for(int i = 0; i
    147. {
    148. pthread_join(stids[i], nullptr);
    149. }
    150. return 0;
    151. }

    运行结果:

    目录中确实多了一个log.txt,也能看出线程的具体执行轨迹。

    基于阻塞队列的生产者消费者模型是线程同步与互斥的充分应用,现实中很多场景都可以应用,是线程中的一大杀器。

  • 相关阅读:
    合合信息大模型加速器亮相WAIC大会:文档解析与文本识别新突破
    多线程的概念(多线程的代码实现)
    练习敲代码速度/提高打字速度
    10大数据恢复软件可以帮助您恢复电脑数据
    基于springboot外委员工后台管理系统毕业设计源码101157
    云上的云服务器、裸金属以及容器可以为 Lakehouse 提供海量的计算资源
    基于Java web的校园滴滴代驾管理系统 毕业设计-附源码260839
    linux内核中修改TCP MSS值
    批量调整视频饱和度和色度,提升你的视频剪辑效率!
    无需公网IP,教学系统如何实现远程一站式管理维护?
  • 原文地址:https://blog.csdn.net/qq_65285898/article/details/132654397