生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
在多线程编程中阻塞队列是一种常用于实现生产者消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
BlockQueue.hpp
#pragma once
#include
#include
#include
namespace ns_blockqueue
{
const int default_cap = 5;
template <class T>
class BlockQueue
{
private:
std::queue<T> _bq; //阻塞队列
int _cap; //队列的元素上限
pthread_mutex_t _mtx; //保护临界资源的锁
//1. 当生产满了的时候,就应该不要生产了(不要竞争锁了),应该让消费者来消费
//2. 当消费空了的时候,就不应该消费(不要竞争锁了),应该让生产者来进行生产
pthread_cond_t _is_full; //_bq满了,消费者在该条件变量下等待
pthread_cond_t _is_empty; //_bq空,生产者在该条件变量下等待
private:
bool IsFull()
{
return _bq.size() == _cap;
}
bool IsEmpty()
{
return _bq.size() == 0;
}
void LockQueue()
{
pthread_mutex_lock(&_mtx);
}
void UnlockQueue()
{
pthread_mutex_unlock(&_mtx);
}
void ProducterWait()
{
//pthread_cond_wait
//1.调用的时候,首先会自动释放_mtx,然后再挂起自己
//2.返回的时候,首先会自动竞争锁,获取到锁之后,才能返回
pthread_cond_wait(&_is_empty, &_mtx);
}
void ConsumerWait()
{
pthread_cond_wait(&_is_full, &_mtx);
}
void WakeupConsumer()
{
pthread_cond_signal(&_is_full);
}
void WakeupProducter()
{
pthread_cond_signal(&_is_empty);
}
public:
BlockQueue(int cap = default_cap) : _cap(cap)
{
pthread_mutex_init(&_mtx, nullptr);
pthread_cond_init(&_is_empty, nullptr);
pthread_cond_init(&_is_full, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&_mtx);
pthread_cond_destroy(&_is_empty);
pthread_cond_destroy(&_is_full);
}
public:
void Push(const T &in)
{
LockQueue();
//临界区
//if(IsFull) //bug
//进行条件检测的时候,需要使用循环方式
//来保证退出循环一定是因为条件不满足导致的
while (IsFull())
{
//等待的,把线程挂起,我们当前是持有锁的
ProducterWait();
}
//向队列中放数据,生产函数
_bq.push(in);
// if (bq.size() > _cap / 2) WakeupConsumer();
UnlockQueue();
WakeupConsumer();
}
void Pop(T *out)
{
LockQueue();
//从队列拿数据
while (IsEmpty())
{
ConsumerWait();
}
*out = _bq.front();
_bq.pop();
// if (_bq.size() < _cap/2) WakeupProducter();
UnlockQueue();
WakeupProducter();
}
};
}
CpTest.cc
#include "BlockQueue.hpp"
#include "Task.hpp"
#include
#include
#include
using namespace ns_blockqueue;
using namespace ns_task;
void *consumer(void *arg)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)arg;
Task t;
while (true)
{
bq->Pop(&t);
t();
}
}
void *producter(void *arg)
{
BlockQueue<Task> *bq = (BlockQueue<Task>*)arg;
std::string ops = "+-*/%";
while (true)
{
int x = rand() % 20 + 1;
int y = rand() % 10 + 1;
char op = ops[rand() % 5];
Task t(x, y, op);
std::cout << "Producter produce a task: " << x << op << y << "=?" << std::endl;
bq->Push(t);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
BlockQueue<Task> *bq = new BlockQueue<Task>();
pthread_t c1, c2, c3, c4;
pthread_t p;
pthread_create(&c1, nullptr, consumer, (void*)bq);
pthread_create(&c2, nullptr, consumer, (void*)bq);
pthread_create(&c3, nullptr, consumer, (void*)bq);
pthread_create(&c4, nullptr, consumer, (void*)bq);
pthread_create(&p, nullptr, producter, (void*)bq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(c4, nullptr);
pthread_join(p, nullptr);
return 0;
}
Task.hpp
#pragma once
#include
#include
namespace ns_task
{
class Task
{
private:
int _x;
int _y;
char _op;
public:
Task() {}
Task(int x, int y, char op)
: _x(x)
, _y(y)
, _op(op)
{}
int Run()
{
int res = 0;
switch(_op)
{
case '+':
res = _x + _y;
break;
case '-':
res = _x - _y;
break;
case '*':
res = _x * _y;
break;
case '/':
res = _x / _y;
break;
case '%':
res = _x % _y;
break;
default:
std::cout << "no such op" << std::endl;
break;
}
std::cout << pthread_self() << "is dealing with " << _x << _op << _y << "=" << res << std::endl;
return res;
}
int operator()()
{
return Run();
}
~Task()
{}
};
}
上一个生产者-消费者的例子是基于queue的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序(POSIX信号量)。
ring_queue.hpp
#pragma once
#include
#include
#include
namespace ns_ring_queue
{
const int g_cap_default = 10;
template<class T>
class RingQueue
{
private:
std::vector<T> _rq;
int _cap;
//生产者关心空位置资源
sem_t _blank_sem;
//消费者关心数据位置资源
sem_t _data_sem;
int _c_step;
int _p_step;
public:
RingQueue(int cap = g_cap_default)
: _rq(cap)
, _cap(cap)
{
sem_init(&_blank_sem, 0, cap);
sem_init(&_data_sem, 0, 0);
_c_step = _p_step = 0;
}
~RingQueue()
{
sem_destroy(&_blank_sem);
sem_destroy(&_data_sem);
}
public:
void Push(T &in)
{
//生产者
sem_wait(&_blank_sem);
_rq[_p_step] = in;
sem_post(&_data_sem);
_p_step++;
_p_step %= _cap;
}
void Pop(T *out)
{
//消费者
sem_wait(&_data_sem);
*out = _rq[_c_step];
sem_post(&_blank_sem);
_c_step++;
_c_step %= _cap;
}
};
}
ring_cp.cc
#include "ring_queue.hpp"
#include
#include
#include
#include
using namespace ns_ring_queue;
void *consumer(void *arg)
{
RingQueue<int> *rq = (RingQueue<int>*)arg;
while (true)
{
int x;
rq->Pop(&x);
std::cout << "消费数据: " << x << std::endl;
}
}
void *producter(void *arg)
{
RingQueue<int> *rq = (RingQueue<int>*)arg;
while (true)
{
int x = rand() % 20 + 1;
std::cout << "生产数据: " << x << std::endl;
rq->Push(x);
sleep(1);
}
}
int main()
{
srand((long long)time(nullptr));
RingQueue<int> *rq = new RingQueue<int>();
pthread_t c, p;
pthread_create(&c, nullptr, consumer, (void*)rq);
pthread_create(&p, nullptr, producter, (void*)rq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}