生产者-消费者模型
是一种常见的多线程编程模式,用于解决生产者和消费者之间协作的问题。在该模型中,生产者负责生产数据,并将数据放入共享的缓冲区;消费者则从缓冲区中取出数据并进行消费。
下面我们来看一个现实生活中的例子:消费者——超市——供货商
的例子来理解一下生产者消费者模型。
现实生活中消费者要求比较零散,供货商生产能力很强,但是考虑到成本问题,所以就需要超市这种零售行业,超市的存在使得生产者和消费者的效率变得更高。同时,它的存在也允许了生产者和消费者的步调不一致。
在计算机中,生产者和消费者代表线程,超市可以看作是 特定的缓冲区,生产者把自己的数据交给特定的缓冲区,再由消费者把数据取走,这种工作模式即生产者——消费者模型
。
注意:
💕 生产者和消费者之间的三种关系:
💕 两种角色: 生产者线程和消费者线程
💕 一个交易场所: 一段特定结构的缓冲区
在多线程编程中 阻塞队列(Blocking Queue)
是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
💕 阻塞队列(blockQueue.hpp
)的实现:
#pragma once
#include
#include
#include
using namespace std;
const int gcap = 5;
template <class T>
class BlockQueue
{
public:
BlockQueue(const int cap = gcap):_cap(cap)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_consumerCond, nullptr);
pthread_cond_init(&_productorCond, nullptr);
}
bool isFull(){ return _q.size() == _cap; }
bool isEmpty(){ return _q.empty(); }
void push(const T& in)
{
pthread_mutex_lock(&_mutex); // 我们只能在临界区内部,判断临界资源是否就绪!注定了我们在当前一定是持有锁的!
while(isFull())
{
pthread_cond_wait(&_productorCond, &_mutex);
}
_q.push(in);
pthread_cond_signal(&_consumerCond);
pthread_mutex_unlock(&_mutex);
}
void pop(T* out)
{
pthread_mutex_lock(&_mutex);
while(isEmpty())
{
pthread_cond_wait(&_consumerCond, &_mutex);
}
*out = _q.front();
_q.pop();
pthread_cond_signal(&_productorCond);
pthread_mutex_unlock(&_mutex);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_consumerCond);
pthread_cond_destroy(&_productorCond);
}
private:
queue<T> _q;
int _cap;
pthread_mutex_t _mutex;
pthread_cond_t _consumerCond;
pthread_cond_t _productorCond;
};
将阻塞队列看作一个整体,共享资源都存在队列中,为了防止多线程并发访问共享资源存在问题,我们将互斥锁
引入阻塞队列类内部。
同时,为了让生产者线程和消费者线程访问临界资源阻塞队列时能够按照一定的顺序,若队列中没有数据存在,则不让消费者消费,若队列中数据满了,不让生产者进行生产数据。所以需要引入条件变量
。
同时,为了保证生产者和消费者互相等待,当生产者生产数据后,就可以唤醒正在等待的消费者线程,当消费者消费数据后,可以唤醒正在等待的生产者线程。我们需要引入两个条件变量:consumercond 和 productorcond
。
注意:
假设单生产多消费模型,当消费者pop数据后队列节省出一个空间,但是却使用了pthread_cond_broadcast
函数唤醒了多个生产者线程,导致了多个生产者线程 生产出了多个数据,那么队列的容量将会不够。
💕 main.cc
#include
#include
#include
#include "task.hpp"
#include "blockQueue.hpp"
void* consumer(void* args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
// sleep(1);
int data = 0;
// 将数据从阻塞队列中取出
bq->pop(&data);
// 结合某种业务逻辑, 处理数据
cout << "consumer data: " << data << endl;
}
}
void* productor(void* args)
{
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
while(true)
{
sleep(1);
// 随机数获取函数获取随机数并指定其范围
int data = rand() % 10 + 1;
// 完成生产任务——将数据推送到阻塞队列中
bq->push(data);
cout << "productor data: " << data << endl;
}
}
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());
BlockQueue<int>* bq = new BlockQueue<int>();
pthread_t p, c;
pthread_create(&p, nullptr, productor, bq);
pthread_create(&c, nullptr, consumer, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
生产者生产快消费者消费慢:
可以看到生产者先生产一大批数据,稳定后,消费者先消费一个,生产者再生产一个。
生产者生产慢消费者消费快:
此时就是消费者再等生产者了,生产一个消费一个,而且消费的都是最新生产出来的数据。
💕 task.hpp
#pragma once
#include
#include
using namespace std;
class Task
{
public:
Task()
{}
Task(int x, int y, char op):_x(x), _y(y), _op(op), _result(0), _exitcode(0)
{}
void operator()()
{
switch (_op)
{
case '+':
_result = _x + _y;
break;
case '-':
_result = _x - _y;
break;
case '*':
_result = _x * _y;
break;
case '/':
{
if(_y == 0)
_exitcode = -1;
else
_result = _x / _y;
}
break;
case '%':
{
if(_y == 0)
_exitcode = -1;
else
_result = _x % _y;
}
break;
default:
break;
}
}
string formatArge()
{
return to_string(_x) + _op + to_string(_y) + "=";
}
string formatRes()
{
return to_string(_result) + "(" + to_string(_exitcode) + ")";
}
~Task()
{}
private:
int _x;
int _y;
char _op;
int _result;
int _exitcode;
};
💕 main.cc
#include
#include
#include
#include "task.hpp"
#include "blockQueue.hpp"
void* consumer(void* args)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
Task t;
bq->pop(&t);
t();
cout << pthread_self() << " | consumer data: " << t.formatArge() << t.formatRes() << endl;
}
}
void* productor(void* args)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(args);
string opers = "+-*/%";
while(true)
{
sleep(1);
// 随机数获取函数获取随机数并指定其范围
int x = rand() % 20 + 1;
int y = rand() % 10 + 1;
char op = opers[rand() % opers.size()];
Task t(x, y, op);
bq->push(t);
cout << pthread_self() << " | productor data: " << t.formatArge() << "?" << endl;
}
}
int main()
{
// 单生产和单消费
srand((uint64_t)time(nullptr) ^ getpid());
BlockQueue<Task>* bq = new BlockQueue<Task>();
pthread_t p, c;
pthread_create(&p, nullptr, productor, bq);
pthread_create(&c, nullptr, consumer, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
💕 main.cc
int main()
{
srand((uint64_t)time(nullptr) ^ getpid());
BlockQueue<Task>* bq = new BlockQueue<Task>();
// 多生产和多消费
pthread_t p[2], c[3];
pthread_create(&p[0], nullptr, productor, bq);
pthread_create(&p[1], nullptr, productor, bq);
pthread_create(&c[0], nullptr, consumer, bq);
pthread_create(&c[1], nullptr, consumer, bq);
pthread_create(&c[2], nullptr, consumer, bq);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
pthread_join(c[2], nullptr);
delete bq;
return 0;
}
从上面多生产多消费
的代码我们可以看到,大量的生产者消费者线程都在竞争同一把锁?也就是一次只能放一把锁去阻塞队列里,那么这种效率岂不是非常慢吗?
传统的线程运作方式是让大部分线程阻塞在临界区之外,而生产者消费者模型是将任务工序分开,一组线程分为生产者,另一组分为消费者。充分利用了生产者的阻塞时间,用以提前准备好生产资源;同时也利用了消费者计算耗时的问题,让消费者线程将更多的时间花在计算上,而不是抢不到锁造成线程“干等”。
生产者消费者模型可以在生产前和消费后的线程并行执行,减少线程阻塞时间。
POSIX信号量
和SystemV信号量
作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。信号量本质是一个计数器,用于描述临界资源的数目。
二元信号量
: 计数器维护的value只有0和1两种可能,1表示可以访问资源,0表示不可以访问资源,以此来实现互斥,所以也称为互斥信号量(互斥锁)。
每一个线程,在访问对应的资源的时候,现申请信号量,申请成功,表明该线程允许使用该资源,申请不成功,目前无法使用该资源!
信号量的工作机制:类似于我们看电影买票,是一种资源的预定机制。
信号量已经是资源的计数器了,申请信号量成功,本身就表明资源可用,申请信号量失败本身表明资源不可用——本质就是把判断转化成信号量的申请行为
!
💕 初始化信号量
#include
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 参数:
pshared
: 0表示线程间共享,非零表示进程间共享
value
:信号量初始值
💕 销毁信号量
int sem_destroy(sem_t *sem);
💕 等待信号量
int sem_wait(sem_t *sem);
P
操作,等待信号量,会将信号量的值-1
💕 发布信号量
int sem_post(sem_t *sem);
V
操作,发布信号量,表示资源使用完毕,可以归还资源了,并将信号量的值加1
基于环形队列的生产消费模型
是一种常见的并发编程模型,用于解决生产者和消费者之间的协作问题。在该模型中,生产者将数据放入一个共享的环形队列中,而消费者则从队列中取出数据进行处理。
💕 构建CP问题
- 生产者向tail中
push
数据,消费者向head中pop
数据- 生产者和消费者关心的资源不一样,生产者关心的是
空间
,消费者关心的是数据
- 环形队列只要访问的是不同的区域,生产和消费的行为可以同时进行。
- 生产者和消费者访问同一个区域时怎么处理?
- 当队列中没有资源的时候,也就是队列为空时,此时应该让生产者先运行。
- 当队列为满的时候,生产者和消费者指向同一个位置时,此时应该让消费者先运行。
💕 申请和释放资源
生产者申请空间资源,释放数据资源
对于生产者来说,生产者每次申请数据前都需要申请space_sem,如果space_sem不为0,则申请信号量成功,否则申请信号量失败,生产者则需要在space_sem的等待队列中进行阻塞等待,直到环形队列中有新的空间资源,才能被唤醒。
当生产者生产数据后,应该释放data_sem,这里我们需要注意的是,生产者生产前是对space_sem进行P操作,但是生产结束后并不是对space_sem进行V操作,而是对data_sem进行V操作。
消费者申请数据资源,释放空间资源
对于消费者来说,消费者每次消费数据前都需要申请data_sem,如果data_sem不为0,则申请信号量成功,如果data_sem为0,则申请信号量失败,消费者需要在data_sem的等待队列中进行阻塞等待,直到环形队列中有新的数据资源,才能被唤醒。
当消费者消费完数据后,需要释放data_sem,同时消费结束后对space_sem进行V操作。
💕 两个规则
规则一:生产者和消费者不能对同一个位置进行访问。
如果生产者和消费者访问的是环形队列中的同一个位置,那么就相当于生产者和消费者同时对一块临界资源进行访问,这样就会导致数据不一致的问题。当然,如果访问的是不同的位置,那么生产者和消费者就可以并发的进行生产和消费数据。
通过信号量九三保证了当生产者和消费者指向环形队列中的同一个位置时,生产和消费的串行化过程,同时也保证了当生产者和消费者执行的不是同一个位置时,生产者和消费者可以并发的进行生产和消费,以提高效率。
规则二:生产者不能将消费者套圈,消费者不能超过生产者。
随机数任务
💕 RingQueue.hpp
#include
#include
#include
using namespace std;
const int N = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
public:
RingQueue(int cap = N):_cap(cap), _ring(cap)
{
// 初始化信号量
sem_init(&_space_sem, 0, _cap);
sem_init(&_data_sem, 0, 0);
_p_step = _c_step = 0;
}
// 生产
void push(const T& in)
{
P(_space_sem); // 申请信号量
_ring[_p_step++] = in;
_p_step %= _cap;
V(_data_sem); // 释放信号量
}
// 消费
void pop(T* out)
{
P(_data_sem);
*out = _ring[_c_step++];
_c_step %= _cap;
V(_space_sem);
}
~RingQueue()
{
// 销毁信号量
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
}
private:
vector<T> _ring;
int _cap; // 环形队列容量
sem_t _data_sem; // 数据信号量——只有消费者关心
sem_t _space_sem; // 空间信号量——只有生产者关心
int _p_step; // 生产者下标
int _c_step; // 消费者下标
};
💕 Main.cc
#include "RingQueue.hpp"
#include
#include
#include
#include
#include
#include
const string ops = "+-*/%";
// 单生产单消费
void* consumerRoutine(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while(true)
{
int data = 0;
rq->pop(&data);
cout << "consumer done: " << data << endl;
}
}
void* productorRoutine(void* args)
{
RingQueue<int>* rq = static_cast<RingQueue<int>*>(args);
while(true)
{
sleep(1);
int data = rand() % 10 + 1;
rq->push(data);
cout << "productor done: " << data << endl;
}
}
int main()
{
srand(time(nullptr)^getpid());
RingQueue<int> *rq = new RingQueue<int>();
// 单生产单消费
pthread_t p, c;
pthread_create(&p, nullptr, consumerRoutine, rq);
pthread_create(&c, nullptr, productorRoutine, rq);
pthread_join(p, nullptr);
pthread_join(c, nullptr);
return 0;
}
计算器任务
💕 Main.cc
#include "RingQueue.hpp"
#include
#include
#include
#include
#include
#include
void* consumerRoutine(void* args)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);
while(true)
{
Task t;
rq->pop(&t);
t();
cout << "consumer done, 处理完成的任务是: " << t.formatArge() << t.formatRes() << endl;
}
}
void* productorRoutine(void* args)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);
while(true)
{
sleep(1);
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % ops.size()];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.formatArge() << "?" << endl;
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task>* rq = new RingQueue<Task>();
pthread_t c, p;
pthread_create(&c, nullptr, consumerRoutine, rq);
pthread_create(&p, nullptr, productorRoutine, rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete rq;
return 0;
}
对于多生产多消费的这种情况,因为生产者和消费者不止是一个,所以就会存在生产者和生产者之间的竞争关系、消费者和消费者之间的竞争关系。因此,在这种情况下,我们就需要对临界资源进行加锁保护。
_p_pos 和 c_pos 的更新需要再加锁和解锁之间。如果它们的更新不在加锁和解锁之间,将可能会出现这样的情况:线程 A 释放了锁并没来得及将下标进行更新,然后线程 B 就获得了锁并执行到更新下标的地方,这样就有可能会出现数据不一致的问题!
💕 RingQueue.hpp
#include
#include
#include
using namespace std;
const int N = 5;
template <class T>
class RingQueue
{
private:
void P(sem_t &s)
{
sem_wait(&s);
}
void V(sem_t &s)
{
sem_post(&s);
}
void Lock(pthread_mutex_t& m)
{
pthread_mutex_lock(&m);
}
void Unlock(pthread_mutex_t& m)
{
pthread_mutex_unlock(&m);
}
public:
RingQueue(int cap = N):_cap(cap), _ring(cap)
{
// 初始化信号量
sem_init(&_space_sem, 0, _cap);
sem_init(&_data_sem, 0, 0);
_p_step = _c_step = 0;
pthread_mutex_init(&_c_mutex, nullptr);
pthread_mutex_init(&_p_mutex, nullptr);
}
// 生产
void push(const T& in)
{
P(_space_sem); // 申请信号量
Lock(_p_mutex);
_ring[_p_step++] = in;
_p_step %= _cap;
Unlock(_p_mutex);
V(_data_sem); // 释放信号量
}
// 消费
void pop(T* out)
{
P(_data_sem);
Lock(_c_mutex);
*out = _ring[_c_step++];
_c_step %= _cap;
Unlock(_c_mutex);
V(_space_sem);
}
~RingQueue()
{
// 销毁信号量
sem_destroy(&_data_sem);
sem_destroy(&_space_sem);
pthread_mutex_destroy(&_c_mutex);
pthread_mutex_destroy(&_p_mutex);
}
private:
vector<T> _ring;
int _cap; // 环形队列容量
sem_t _data_sem; // 数据信号量——只有消费者关心
sem_t _space_sem; // 空间信号量——只有生产者关心
int _p_step; // 生产者下标
int _c_step; // 消费者下标
pthread_mutex_t _c_mutex;
pthread_mutex_t _p_mutex;
};
这里比较推荐先申请信号量,如果先申请锁,意味着生产线程只要持有锁了,其他线程再也没有机会进入后续的代码逻辑了,意味着其他线程在当前线程持有锁的情况下,不能够进行信号量的申请,不能够先分配好资源。效率会比较低一些。
如果先申请信号量,就能保证生产线程在持有锁期间,其他线程也可以进行信号量的申请。也就是资源的分配,因此可以大大提高效率。
💕 Main.cc
#include "RingQueue.hpp"
#include "Task.hpp"
#include
#include
#include
#include
#include
#include
const string ops = "+-*/%";
void* consumerRoutine(void* args)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);
while(true)
{
Task t;
rq->pop(&t);
t();
cout << "consumer done, 处理完成的任务是: " << t.formatArge() << t.formatRes() << endl;
}
}
void* productorRoutine(void* args)
{
RingQueue<Task>* rq = static_cast<RingQueue<Task>*>(args);
while(true)
{
sleep(1);
int x = rand() % 100;
int y = rand() % 100;
char op = ops[(x + y) % ops.size()];
Task t(x, y, op);
rq->push(t);
cout << "productor done, 生产的任务是: " << t.formatArge() << "?" << endl;
}
}
int main()
{
srand(time(nullptr) ^ getpid());
RingQueue<Task>* rq = new RingQueue<Task>();
pthread_t c[3], p[2];
for(int i = 0; i < 3; i++)
pthread_create(c + i, nullptr, consumerRoutine, rq);
for(int i = 0; i < 2; i++)
pthread_create(p + i, nullptr, productorRoutine, rq);
for(int i = 0; i < 3; i++)
pthread_join(c[i], nullptr);
for(int i = 0; i < 2; i++)
pthread_join(p[i], nullptr);
delete rq;
return 0;
}
多生产多消费的意义:
生产的本质是将私有的任务或数据放入到公共空间中,消费的本质是将公共空间中的任务或数据变成私有。生产和消费并不是简单地将任务或数据放入到交易场所或从交易场所中取出,还需要考虑数据或任务放入到交易场所前和拿到任务或数据之后的处理,这两个过程是最耗费时间的。所以,多生产多消费的意义就在于能够并发地生产和处理任务。
信号量本质是一个计数器,那这个计数器的意义是什么呢?计数器的意义就是不用进入临界区,就可以得知临界资源的情况,甚至可以减少临界区内部的判断!而在基于阻塞队列的生产者和消费者模型中,需要申请锁,然后进行临界资源是否满足的判断再进行访问,最后释放锁,需要进行判断的原因就是我们并不清楚临界资源的情况!而信号量要提前预设资源的情况,在进行 PV 操作的过程中,我们在临界区外部就能得知临界资源的情况。