生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。
生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
生产者消费者模型是多线程同步与互斥的一个经典场景,其特点如下:
生产者和生产者、消费者和消费者、生产者和消费者,它们之间为什么会存在互斥关系?
因为生产者与消费者之间的容器会被多个访问流进行访问,所以我们就需要将该临界资源使用互斥锁保护起来,防止线程安全问题的发生,因此所有的生产者和消费者都会竞争式的申请锁,生产者和生产者、消费者和消费者、生产者和消费者之间都存在互斥关系。
生产者和消费者之间为什么会存在同步关系?
如果生产者一直生产,当空间被填满以后,生产者就会停止生产,消费者也是一样,如果消费者一直消费,空间中数据被消耗完了,消费者也会停止消费。
虽然这样不会造成任何数据不一致的问题,但是这样会引起另一方的饥饿问题,是非常低效的。我们应该让生产者和消费者访问该容器时具有一定的顺序性,比如让生产者先生产,然后再让消费者进行消费。
注意: 互斥关系保证的是数据的正确性,而同步关系是为了让多线程之间协同起来。
如果我们在主函数中调用某一函数,那么我们必须等该函数体执行完后才继续执行主函数的后续代码,因此函数调用本质上是一种紧耦合。
对应到生产者消费者模型中,函数传参实际上就是生产者生产的过程,而执行函数体实际上就是消费者消费的过程,但生产者只负责生产数据,消费者只负责消费数据,在消费者消费期间生产者可以同时进行生产,因此生产者消费者模型本质是一种松耦合。
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
阻塞队列的特点在于:
知识联系: 看到以上阻塞队列的描述,我们很容易想到的就是管道,而阻塞队列最典型的应用场景实际上就是管道的实现。
我们先以单生产者,单消费者为例:
其中的BlockQueue就是生产者消费者模型当中的交易场所,我们可以用C++STL库当中的queue进行实现,下面我们进行一个简单的封装:
#pragma once
#include
#include
#include
#include
#include
#define NUM 5
template <class T>
class BlockQueue
{
private:
// 判断队列是否满了
bool isQueueFull()
{
return _bq.size() == _capacity;
}
// 判断队列是否为空
bool isQueueEmpty()
{
return _bq.size() == 0;
}
public:
// 构造函数
BlockQueue(int capacity = NUM) : _capacity(capacity)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_full, nullptr);
pthread_cond_init(&_empty, nullptr);
}
// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{
// 加锁
pthread_mutex_lock(&_mtx);
while (isQueueFull())
{
// 如果生产者生产过程中数据满了,就阻塞等待
pthread_cond_wait(&_full, &_mtx);
}
_bq.push(in);
// 解锁
pthread_mutex_unlock(&_mtx);
// 唤醒消费者
pthread_cond_signal(&_empty);
}
// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{
// 加锁
pthread_mutex_lock(&_mtx);
while (isQueueEmpty())
{
// 如果消费者消费过程中数据空了,就阻塞等待
pthread_cond_wait(&_empty, &_mtx);
}
out = _bq.front();
_bq.pop();
// 解锁
pthread_mutex_unlock(&_mtx);
// 唤醒生产者
pthread_cond_signal(&_full);
}
// 析构函数
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_full);
pthread_cond_destroy(&_empty);
}
private:
// 阻塞队列
std::queue<T> _bq;
// 阻塞队列最大容器个数
int _capacity;
// 通过互斥锁来保证队列安全
pthread_mutex_t _mtx;
// 用来表示_bq是否为满的条件
pthread_cond_t _full;
// 用来表示_bq是否为空的条件
pthread_cond_t _empty;
};
上述代码中需要注意以下几点:
_full
和_empty
描述阻塞队列的状态,进而才可以判断何时运行,何时等待;pthread_cond_wait
除了会传入一个条件变量以外还会传入一个互斥锁,我们会发现,我们是在临界区中进行等待的,我们此时还处于持有锁状态,pthread_cond_wait
第二个参数意义就在于成功调用wait之后,传入的锁,会被自动释放,当被唤醒的时候,就会自动获取线程锁;判断是否满足生产消费条件时不能用if,而应该用while:
pthread_cond_wait
函数是让当前执行流进行等待的函数,是函数就意味着有可能调用失败,调用失败后该执行流就会继续往后执行。pthread_cond_broadcast
函数唤醒消费者,就会一次性唤醒多个消费者,但待消费的数据只有一个,此时其他消费者就被伪唤醒了。在主函数中我们就只需要创建一个生产者线程和一个消费者线程,让生产者线程不断生产数据,让消费者线程不断消费数据:
#include "BlockQueue.hpp"
void* consumer(void* args)
{
BlockQueue<int>* bqueue = (BlockQueue<int>*)args;
while(true)
{
int a;
bqueue->pop(a);
std::cout << "consumer:" << a << std::endl;
sleep(1);
}
return nullptr;
}
void* productor(void* args)
{
BlockQueue<int>* bqueue = (BlockQueue<int>*)args;
int a = 1;
while(true)
{
bqueue->push(a);
std::cout << "productor:" << a << std::endl;
a++;
sleep(1);
}
return nullptr;
}
int main()
{
pthread_t c, p;
BlockQueue<int>* bq = new BlockQueue<int>();
//创建生产者消费者线程
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
}
当生产者与消费者步调一致时,我们会发现生产者生产一个数据,消费者就会消费一个数据:
当生产者生产的快,消费者消费的慢时,阻塞队列满了就会导致生产者阻塞等待,只有当消费者被唤醒以后消费掉一个数据,此时生产者才会被唤醒继续生产数据:
当生产者生产的慢,消费者消费的快,因为最开始阻塞队列中并没有数据,所以消费者就会阻塞等待,当生产者生产一个数据以后,消费者就会被唤醒消费一个数据,然后生产者继续被唤醒生产数据,消费者消费数据,步调保持一致:
当我们满足某一条件时再唤醒对应的生产者或消费者,比如当阻塞队列当中存储的数据大于队列容量的一半时,再唤醒消费者线程进行消费;当阻塞队列当中存储的数据小于队列容器的一半时,再唤醒生产者线程进行生产。
// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{
// 加锁
pthread_mutex_lock(&_mtx);
while (isQueueFull())
{
// 如果生产者生产过程中数据满了,就阻塞等待
pthread_cond_wait(&_full, &_mtx);
}
_bq.push(in);
if (_bq.size() > _capacity / 2)
// 唤醒消费者
pthread_cond_signal(&_empty);
// 解锁
pthread_mutex_unlock(&_mtx);
}
// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{
// 加锁
pthread_mutex_lock(&_mtx);
while (isQueueEmpty())
{
// 如果消费者消费过程中数据空了,就阻塞等待
pthread_cond_wait(&_empty, &_mtx);
}
out = _bq.front();
_bq.pop();
if (_bq.size() <= _capacity / 2)
// 唤醒生产者
pthread_cond_signal(&_full);
// 解锁
pthread_mutex_unlock(&_mtx);
}
我们仍然让生产者生产的快,消费者消费的慢。运行代码后生产者还是一瞬间将阻塞队列打满后进行等待,但此时不是消费者消费一个数据就唤醒生产者线程,而是当阻塞队列当中的数据小于队列容器的一半时,才会唤醒生产者线程进行生产。
基于计算任务的生产者消费者模型
当然,实际使用生产者消费者模型时可不是简单的让生产者生产一个数字让消费者进行打印而已,我们这样做只是为了测试代码的正确性。
由于我们将BlockingQueue当中存储的数据进行了模板化,此时就可以让BlockingQueue当中存储其他类型的数据。
例如,我们想要实现一个基于计算任务的生产者消费者模型,此时我们只需要定义一个Task类,这个类当中需要包含一个func_t成员函数:
#pragma once
#include
#include
typedef std::function<int(int, int)> func_t;
class Task
{
public:
Task()
{
}
Task(int x, int y, func_t func) : _x(x), _y(y), _func(func)
{
}
~Task()
{
}
int operator()()
{
return _func(_x, _y);
}
public:
int _x;
int _y;
func_t _func;
};
同时我们也可以将锁进行一个封装,采用RAII形式的加锁解锁风格,创建锁对象自动调用构造函数加锁,除了作用域自动调用析构函数解锁。
#pragma once
#include
#include
class Mutex
{
public:
Mutex(pthread_mutex_t *mtx) : _mtx(mtx)
{
}
void lock()
{
std::cout << "需要进行加锁" << std::endl;
pthread_mutex_lock(_mtx);
}
void unlock()
{
std::cout << "需要进行解锁" << std::endl;
pthread_mutex_unlock(_mtx);
}
~Mutex()
{
}
private:
pthread_mutex_t *_mtx;
};
class LockGuard
{
public:
LockGuard(Mutex mtx) :_mtx(mtx)
{
_mtx.lock();
}
~LockGuard()
{
_mtx.unlock();
}
private:
Mutex _mtx;
};
此时我们的BlockQueue.hpp
中插入和获取数据代码就可以优化为:
// 向阻塞队列中插入数据(生产者)
void push(const T &in)
{
LockGuard lockguard(&_mtx);
while (isQueueFull())
{
// 如果生产者生产过程中数据满了,就阻塞等待
pthread_cond_wait(&_full, &_mtx);
}
_bq.push(in);
// 唤醒消费者
pthread_cond_signal(&_empty);
}
// 向阻塞队列中获取数据(消费者)
void pop(T &out)
{
LockGuard lockguard(&_mtx);
while (isQueueEmpty())
{
// 如果消费者消费过程中数据空了,就阻塞等待
pthread_cond_wait(&_empty, &_mtx);
}
out = _bq.front();
_bq.pop();
// 唤醒生产者
pthread_cond_signal(&_full);
}
运行代码,当生产者向阻塞队列中写入一个数据后,随即消费者就会被唤醒,获取数据,也就是进行计算操作:
同样,我们也可以创建多个线程进行计算: