生产者消费者模式是通过一个容器来解决生产者和消费者之间强耦合的关系。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者进行消费,而是先存到阻塞队列中。消费者不直接找生产者要数据,而是从阻塞队列中拿数据。阻塞队列就相当于一个缓冲区,将生产者和消费者之间进行解耦。
如果有多个生产者或者多个消费者那么他们之间的关系如下:
下面来简单实现一下生产者和消费者模型
这里是使用阻塞队列来作为生产者和消费者的“交易场所”。因此需要先创建一个类。由于其可能存放的各种类型的数据或者结构,因此需要使用模板来定义。
该类中需要包含以下内容:
const int default_cap = 5;
template<class T>
class BlockQueue
{
private:
std::queue<T> bq_;//阻塞队列
int cap_;//队列的元素上限
pthread_mutex_t mtx_;//保护临界资源的锁
//1、当生产满了的时候,就应该不要生产(不要竞争锁了),应该让消费者进行消费
//2、当消费空了,就不应该消费了(竞争锁),应该让生产者进行生产
pthread_cond_t is_full_;//bq满了,消费者在条件变量下等待
pthread_cond_t is_empty_;//bq空的,生产者在该条件变量下等待
public:
BlockQueue(const int cap = default_cap)
:cap_(cap)
{
pthread_mutex_init(&mtx_,nullptr);
pthread_cond_init(&is_empty_,nullptr);
pthread_cond_init(&is_full_,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&is_empty_);
pthread_cond_destroy(&is_full_);
}
};
}
生产者消费者模型中,需要知道缓冲区中的情况,是否为空或者是否为满,从而使用信号量对生产者和消费者进行控制,这里判断为空为满比较简单,将这里的队列大小和容量进行比较即可,代码实现如下:
bool IsFull()
{
return bq_.size() == cap_;
}
bool IsEmpty()
{
return bq_.size() == 0;
}
这里的阻塞队列可以有多个线程进行访问,因此需要保证其线程安全问题,因此需要对该临界资源进行加锁。该类中已经初始化了一把锁,因此只需要对其使用加锁解锁的函数操作即可,代码实现如下:
void LockQueue()
{
pthread_mutex_lock(&mtx_);
}
void UnlockQueue()
{
pthread_mutex_unlock(&mtx_);
}
信号量这里是用来实现同步的操作,让生产者和消费者有序的进行临界资源的访问,从而能够有效的避免了生产者在队列已经满的情况下还在继续生产,或者消费者在队列已经为空的情况下还在继续消费,因此可以通过两个信号量来进行空/满的操作,让生产者在队列满的情况下暂停生产将锁释放先让消费者进行消费,或者让消费者在队列为空的情况下先将锁释放让生产者先进行生产。函数实现如下:
void ProductorWait()
{
//1、调用该函数的时候,会先自动释放锁,然后再挂起自己
//2、返回的时候,首先会自动竞争锁,获得锁,才能返回
pthread_cond_wait(&is_empty_,&mtx_);
}
void ConsumerWait()
{
pthread_cond_wait(&is_full_,&mtx_);
}
void WakeupConsumer()
{
pthread_cond_signal(&is_full_);
}
void WakeupProductor()
{
pthread_cond_signal(&is_empty_);
}
生产和消费实际上就是往队列里面入数据和出数据的功能,生产者主要是向队列里面入数据,消费者主要是从队列里面出数据,这样可以使用如下代码实现:
入数据:该函数为了临界资源的安全,先进行加锁解锁操作。在临界区中首先要进行判断此时队列是否为满,如果为满则需要进行等待,先释放锁再将自己先暂时挂起,这里需要使用循环来判断,从而能保证退出循环一定是因为条件不满足导致的即队列不为满了。
后面正常使用stl容器队列的插入操作,将数据入队列。最后可以进行设置合适时候来唤醒消费者进行消费,这里是设置为当生产的数据大于容量的一半的时候唤醒消费者。
void Push(const T& in)
{
LockQueue();
//需要进行条件检测的时候,这里需要使用循环方式
//来保证退出循环一定是因为条件不满足导致的
while(IsFull())
{
ProductorWait();
}
//向队列中放数据,生产函数
bq_.push(in);
if(bq_.size()>cap_/2)
{
WakeupConsumer();
}
UnlockQueue();
}
出数据:这里出数据和上面的入数据流程类似,先对临界区进行加锁,使用循环判断是否为空,为空则先释放锁进行等待。然后使用输出型参数out来拿到出队列的值,最后设置合适时间唤醒生产者进行生产,这里设置为队列中数据个数小于一半时进行唤醒生产者。
void Pop(T* out)
{
LockQueue();
//从队列中拿数据,消费函数
while(IsEmpty())
{
//无法消费
ConsumerWait();
}
*out=bq_.front();
bq_.pop();
if(bq_.size()<cap_/2)
{
WakeupProductor();
}
UnlockQueue();
}
如上便是生产者消费者模型的阻塞队列的基本实现,如下是整体代码:
namespace ns_blockqueue
{
const int default_cap = 5;
template<class T>
class BlockQueue
{
private:
std::queue<T> bq_;//阻塞队列
int cap_;//队列的元素上限
pthread_mutex_t mtx_;//保护临界资源的锁
//1、当生产满了的时候,就应该不要生产(不要竞争锁了),应该让消费者进行消费
//2、当消费空了,就不应该消费了(竞争锁),应该让生产者进行生产
pthread_cond_t is_full_;//bq满了,消费者在条件变量下等待
pthread_cond_t is_empty_;//bq空的,生产者在该条件变量下等待
public:
BlockQueue(const int cap = default_cap)
:cap_(cap)
{
pthread_mutex_init(&mtx_,nullptr);
pthread_cond_init(&is_empty_,nullptr);
pthread_cond_init(&is_full_,nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mtx_);
pthread_cond_destroy(&is_empty_);
pthread_cond_destroy(&is_full_);
}
private:
bool IsFull()
{
return bq_.size() == cap_;
}
bool IsEmpty()
{
return bq_.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&mtx_);
}
void UnlockQueue()
{
pthread_mutex_unlock(&mtx_);
}
void ProductorWait()
{
//1、调用该函数的时候,会先自动释放锁,然后再挂起自己
//2、返回的时候,首先会自动竞争锁,获得锁,才能返回
pthread_cond_wait(&is_empty_,&mtx_);
}
void ConsumerWait()
{
pthread_cond_wait(&is_full_,&mtx_);
}
void WakeupConsumer()
{
pthread_cond_signal(&is_full_);
}
void WakeupProductor()
{
pthread_cond_signal(&is_empty_);
}
public:
//const& :输入
//*:输出
//&:输入输出
void Push(const T& in)
{
LockQueue();
//需要进行条件检测的时候,这里需要使用循环方式
//来保证退出循环一定是因为条件不满足导致的
while(IsFull())
{
ProductorWait();
//return;
}
//向队列中放数据,生产函数
bq_.push(in);
if(bq_.size()>cap_/2)
{
WakeupConsumer();
}
UnlockQueue();
}
void Pop(T* out)
{
LockQueue();
//从队列中拿数据,消费函数
while(IsEmpty())
{
//无法消费
ConsumerWait();
}
*out=bq_.front();
bq_.pop();
if(bq_.size()<cap_/2)
{
WakeupProductor();
}
UnlockQueue();
}
};
}
生产者消费者模式的测试这里是创建多个消费者和一个生产者进行相关测试,其中给阻塞队列中传入的可以是正常的数据类型让他们进行相关数据传输,也可以传入相关任务,让生产者产生任务,消费者去执行任务,这里测试后一种情况:
这里先创建了一个生产者线程和五个消费者线程
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq=new BlockQueue<Task>();
pthread_t c,p;
pthread_t c1,c2,c3,c4;
pthread_create(&c,nullptr,consumer,(void*)bq);
pthread_create(&c1,nullptr,consumer,(void*)bq);
pthread_create(&c2,nullptr,consumer,(void*)bq);
pthread_create(&c3,nullptr,consumer,(void*)bq);
pthread_create(&c4,nullptr,consumer,(void*)bq);
pthread_create(&p,nullptr,productor,(void*)bq);
pthread_join(c,nullptr);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(c3,nullptr);
pthread_join(c4,nullptr);
pthread_join(p,nullptr);
return 0;
}
其中给他们传入的是任务也是一种结构类型,该类型是用来进行简单的±*/%操作,任务代码如下:
namespace ns_task
{
class Task
{
private:
int x_;
int y_;
char op_;//+-*/%
public:
Task(){}
Task(int x ,int y ,char op)
:x_(x)
,y_(y)
,op_(op)
{
}
int Run()
{
int res = 0;
switch (op_)
{
case '+':
res = x_ + y_;
break;
case '-':
res = x_ - y_;
break;
case '*':
res = x_ * y_;
break;
case '/':
res = x_ / y_;
break;
case '%':
res = x_ % y_;
break;
default:
std::cout<<"bug"<<std::endl;
break;
}
std::cout<<"当前任务正在被:"<< pthread_self()<<"处理:"<< x_ << op_ << y_ << "="<< res <<std::endl;
return res;
}
int operator()()
{
return Run();
}
~Task(){}
};
}
生产者主要是用来进行任务的发布,即这里是产生两个数运算的数字以及运算符,代码实现如下:
void* productor(void* args)
{
BlockQueue<Task> *bq =(BlockQueue<Task>*)args;
std::string ops = "+-*/%";
while(true)
{
int x = rand()%20 + 1;//1-20
int y = rand()%20 + 1;//1-20
char op = ops[rand()%5];
Task t(x,y,op);
std::cout<<"生产者派发了一个任务:"<< x << op << y << "=?" <<std::endl;
bq->Push(t);
}
}
消费者主要是进行数据的处理以及计算出结构,这里是通过Task类中的仿函数来实现的,消费者函数实现如下:
void* consumer(void* args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
sleep(2);
while(true)
{
Task t;
bq->Pop(&t);//这里完成了任务消费的第一步
t(); //这里完成了任务处理的第二步
}
}
运行结果如下:
整体代码实现如下:
void* consumer(void* args)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)args;
sleep(2);
while(true)
{
Task t;
bq->Pop(&t);//这里完成了任务消费的第一步
t(); //这里完成了任务处理的第二步
}
}
void* productor(void* args)
{
BlockQueue<Task> *bq =(BlockQueue<Task>*)args;
std::string ops = "+-*/%";
while(true)
{
int x = rand()%20 + 1;//1-20
int y = rand()%20 + 1;//1-20
char op = ops[rand()%5];
Task t(x,y,op);
std::cout<<"生产者派发了一个任务:"<< x << op << y << "=?" <<std::endl;
bq->Push(t);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq=new BlockQueue<Task>();
pthread_t c,p;
pthread_t c1,c2,c3,c4;
pthread_create(&c,nullptr,consumer,(void*)bq);
pthread_create(&c1,nullptr,consumer,(void*)bq);
pthread_create(&c2,nullptr,consumer,(void*)bq);
pthread_create(&c3,nullptr,consumer,(void*)bq);
pthread_create(&c4,nullptr,consumer,(void*)bq);
pthread_create(&p,nullptr,productor,(void*)bq);
pthread_join(c,nullptr);
pthread_join(c1,nullptr);
pthread_join(c2,nullptr);
pthread_join(c3,nullptr);
pthread_join(c4,nullptr);
pthread_join(p,nullptr);
return 0;
}