目录
这就类似于一个超市(缓冲区)通过供货商(生产者)进货,放到自己的超市中,之后顾客(消费者)就可以从超市中买东西。
通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者之间不会直接通信,仅通过阻塞队列来通信,所以生产者生产完数据之后不用等待消费者处理,直接放到阻塞队列中,消费者不直接找生产者要数据,直接从阻塞队列中取。
阻塞队列就相当于一个缓冲区。这个阻塞队列就是用来给生产者和消费者解耦的。
不难看出,生产者消费者模型是多线程同步与互斥的一个场景。
- 三种关系:生产者和生产者(互斥),消费者和消费者(互斥),消费者和生产者(互斥、同步)
- 两种关系:生产者和消费者
- 一个交易场所:通常是内存中的缓冲区
生产者和生产者要确保同一时间只有一个执行流在放数据,消费者和消费者也要保证同一时间只有一个执行流在拿数据,生产者和消费者不仅要保证同一时间只有一个执行流在拿数据或者放数据,还要保证按照生产者放,消费者拿的顺序执行。
其实生产者和消费者现在看来就是多个线程实现的。当生产者把缓冲区放满的时候就要唤醒消费者拿,当消费者把数据拿完的时候也要唤醒生产者继续生生产。
生产者消费者模型的优点:
- 解耦:通过函数调用的方式。
- 并发:消费者在处理数据的时候,生产者可以一直放数据,不会像管道一样只有放了才能拿,拿了才能再放。而且以后也不会是简单的定义一个数据,然后发一个数据,也会有复杂的场景。
原来我们在说管道的时候,不也就是一端放数据,一端读数据吗,并且还能实现互斥和同步。现在我们就用一个阻塞队列来实现。
- 当队列为空的时候,消费者线程就会被阻塞,生产者生成数据。
- 当队列满了的时候,生产者线程就会被阻塞,消费者拿走数据。
我们先来看一下一个生产者和一个消费者的场景。
// Task.hpp #pragma once #include #include #include class Task { public: typedef std::function<int (int, int)> func_t; // 一个函数对象,返回值是int,两个参数也是int Task(){} Task(int x, int y, func_t func) :_x(x) ,_y(y) ,_func(func) {} int operator()() { return _func(_x, _y); } public: int _x; int _y; func_t _func; };
// BlockQueue.hpp #pragma once #include #include #include #include #include using namespace std; const int gDefaultCap = 5; // 限制缓冲区队列里只能放5个任务 template <class T> // 泛型编程 class BlockQueue { private: bool isQueueEmpty() { return _bq.size() == 0; } bool isQueueFull() { return _bq.size() == _capacity; } public: BlockQueue(size_t capacity = gDefaultCap) :_capacity(capacity) { pthread_mutex_init(&_mtx, nullptr); pthread_cond_init(&_Empty, nullptr); pthread_cond_init(&_Full, nullptr); } void push(const T& in) { pthread_mutex_lock(&_mtx); // 1. 先检测当前临界资源是否满足条件 // 【注意】:如果pthread_cond_wait失败了怎么办呢,又没有接受返回值判断 // 函数调用失败就不会阻塞,这里使用的queue容器,如果使用固定长度的数组就会越界 // 或者这个线程因为一些原因,但不应该被唤醒,这叫做伪唤醒,也会出问题 // 所以一定要使用while判断条件,只要队列中有数据就阻塞 while (isQueueFull()) pthread_cond_wait(&_Full, &_mtx); // 满了的时候就不要放了 // 2. 访问临界资源 _bq.push(in); // 放好了数据就要让消费者再拿空 pthread_cond_signal(&_Empty); pthread_mutex_unlock(&_mtx); } void pop(T* out) { pthread_mutex_lock(&_mtx); while (isQueueEmpty()) pthread_cond_wait(&_Empty, &_mtx); // 空了的时候就不要拿了 *out = _bq.front(); _bq.pop(); // 拿走了数据就要让生产者再放满 pthread_cond_signal(&_Full); pthread_mutex_unlock(&_mtx); } ~BlockQueue() { pthread_mutex_destroy(&_mtx); pthread_cond_destroy(&_Empty); pthread_cond_destroy(&_Full); } private: queue_bq; // 阻塞队列 size_t _capacity; // 容量上限 pthread_mutex_t _mtx; // 互斥锁,保证队列安全 pthread_cond_t _Empty; // 条件变量,用来表示bq是否为空 pthread_cond_t _Full; // 表示bq是否满了 };
#include "BlockQueue.hpp" #include "Task.hpp" #include void* consumer(void* args) { BlockQueue* bqueue = (BlockQueue *)args; // 消费 while (true) { // 获取任务 Task t; bqueue->pop(&t); // 完成任务 cout << pthread_self() << ", 消费者: " << t._x << " + " << t._y << " = " << t() << endl; sleep(1); } return nullptr; } int Add(int x, int y) { return x + y; } void* productor(void* args) { BlockQueue* bqueue = (BlockQueue *)args; // 生产 while (true) { // 制作任务 int x = rand() % 10 + 1; usleep(rand()%1000); // 不要让两个数一样 int y = rand() % 10 + 1; Task t(x, y, Add); bqueue->push(t); cout << pthread_self() << ", 生产者: " << t._x << " + " << t._y << " = ? " << endl; sleep(1); } return nullptr; } int main() { srand((unsigned)time(nullptr) ^ getpid()); BlockQueue* bqueue = new BlockQueue (); // 生产者 消费者 pthread_t c, p; // 创建 pthread_create(&c, nullptr, consumer, (void*)bqueue); pthread_create(&p, nullptr, productor, (void*)bqueue); // 等待 pthread_join(c, nullptr); pthread_join(p, nullptr); return 0; }
再来看一下多生产者和消费者的场景。代码没有太大变化。
#define TNUM 5 int main() { srand((unsigned)time(nullptr) ^ getpid()); BlockQueue* bqueue = new BlockQueue (); // 生产者 消费者 pthread_t c[TNUM], p[TNUM]; // 创建 for (int i = 0; i < TNUM; i++) { pthread_create(c + i, nullptr, consumer, (void*)bqueue); pthread_create(p + i, nullptr, productor, (void*)bqueue); } // 等待 for (int i = 0; i < TNUM; i++) { pthread_join(c[i], nullptr); pthread_join(p[i], nullptr); } return 0; }
所以整个生产者消费者模型的流程应该是这样的:
- 先创建一个阻塞队列、生产者线程、消费者线程。
- 生产者线程拿到阻塞队列,开始制作任务,再把任务push到阻塞队列中。在push的时候调用lock加锁,然后循环判断阻塞队列中的任务个数是不是满了,如果没满就push到阻塞队列,唤醒消费者线程pop,调用unlock解锁;如果满了就等消费者线程pop。
- 消费者线程拿到阻塞队列,开始获取任务,把任务从阻塞队列中拿出来。在pop的时候调用lock加锁,然后循环判断阻塞队列中是不是没有任务,如果有就拿到队列头,再pop,唤醒生产者线程push,调用unlock解锁;如果空了就等待生产者线程push。
这里唤醒线程和解锁的顺序,谁前谁后都可以,并不会影响进程运行。
在智能指针的章节也提到过RAII是什么,RAII是利用对象生命周期来控制资源的技术。
每次访问临界资源的时候都要加锁,出了临界区就要解锁,那我们就可以通过一个对象完成这个工作。
对象在进入临界区的时候创建,调用构造函数进行加锁,出了临界区就出了作用域,调用析构函数解锁。
// lockGuard.hpp #pragma once #include #include class Mutex { public: Mutex(pthread_mutex_t* mtx) :_pmtx(mtx) {} void lock() { pthread_mutex_lock(_pmtx); } void unlock() { pthread_mutex_unlock(_pmtx); } ~Mutex() {} private: pthread_mutex_t* _pmtx; }; // RAII风格的加锁方式 class lockGuard { public: lockGuard(pthread_mutex_t* mtx) :_mtx(mtx) { _mtx.lock(); } ~lockGuard() { _mtx.unlock(); } private: Mutex _mtx; };修改push和pop部分的函数。
// ... void push(const T& in) { lockGuard lockguard(&_mtx); //可以自动调用构造和析构函数来加锁和解锁 while (isQueueFull()) pthread_cond_wait(&_Full, &_mtx); // 满了的时候就不要放了 _bq.push(in); pthread_cond_signal(&_Empty); } void pop(T* out) { lockGuard lockguard(&_mtx); while (isQueueEmpty()) pthread_cond_wait(&_Empty, &_mtx); // 空了的时候就不要拿了 *out = _bq.front(); _bq.pop(); pthread_cond_signal(&_Full); } // ...