生产者消费者问题是一个经典的问题,用于多进程同步,即各个线程之间的同步。在生产者消费者问题中,有一个生产者负责生产某种物品,还有一个消费者负责消费生产者生产的产品。生产者和消费者共享一个固定大小的内存缓冲区。生产者的工作是生产数据,将其放入缓冲区,然后再次开始生成数据。而消费者的工作是从缓冲区中消费数据。
生产者和消费者之间的关系可以简单的描述为:
生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产数据之后不用等待消费者处理,直接放入阻塞队列中,消费者也不必找生产者要数据,而是直接通过阻塞来取数据,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。这个阻塞队列就是用来给生产者和消费者解耦的。
在上述对生产者和消费者模型的描述中,该模型中可能会出现的问题:
解决上述问题的一种常见方法就是使用互斥锁和条件变量。生产者在向缓冲区中放入数据之前,需要获取互斥锁,并检查缓冲区是否已满。如果缓冲区已满,生产者会等待条件变量,直到有空间可用。消费者在从缓冲区中取出数据之前,也需要获取互斥锁,并检查缓冲区是否为空。如果缓冲区为空,消费者会等待条件变量,直到有数据可用。
生产者消费者模型的优点:
解耦
:生产者和消费者是独立的实体,彼此之间通过缓冲区进行交互。这种解耦使生产者和消费者可以独自进行操作,而不需要依赖对方的操作或状态。提高了代码的可维护性。
支持并发
:生产者消费者模型能够支持多个生产者和多个消费者并发运行。这种并发性能够提高系统的吞吐量,提高整体性能。
支持忙闲不均
:生产者和消费者能够有效处理它们之间的忙闲不均。当生产者的生产速度超过消费者的消费速度时,多余的数据或资源可以暂时存储在缓冲区中,以供消费者以后使用。而当消费者的消费速度超过生产者的生产速度时,缓冲区可以暂时保存消费者暂未消费的数据或资源。这种能力使得系统能够平衡生产者和消费者之间的速度差异,提高了系统的稳定性和性能。
生产者消费者模型是一个经典的多线程同步与互斥的场景。其特点如下:
三种关系:生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(同步互斥关系)。
两种角色:生产者和消费者。
一个交易场所:通常为内存中的一段缓冲区,用于存储生产者生产的数据和消费者消耗的数据。
当我们编写生产者消费者模型时,需要对以上三点进行维护。
🎯 生产者和生产者、消费者和消费者,生产者和消费者之间存在互斥关系的原因就是因为它们都需要访问共享的临界资源,即缓冲区。因多个执行流同时访问缓冲区资源可能导致数据竞争和不一致等问题,为了保证数据的正确性,需要使用互斥锁对临界区进行保护。
🎯生产者和消费者之间存在同步关系是为了确保生产者和消费者在合适的时间进行操作,避免不必要的等待和资源浪费。为确保生产者和消费者能够顺序操作,需要满足以下两个条件:
通过同步关系,生产者和消费者可以在合适时间进行操作,避免生产者生产的数据丢失或覆盖,以及消费者的错误操作问题。
同步关系可以通过条件变量、信号量等机制实现。
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列为满时,往队列中存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)。
总的来说,阻塞队列在以下场景中发挥重要作用:
下面我们使用STL库中的 queue 来对实现一个单生产者单消费者的模型:
#pragma once
#include
#include
#include
#include
#include
using namespace std;
const uint32_t gDefaultCap = 5; // 阻塞队列默认容量
template <class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap) : cap_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&consumerCond_, nullptr);
pthread_cond_init(&producerCond_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&consumerCond_);
pthread_cond_destroy(&producerCond_);
}
// 生产者接口
void push(const T &in) // const &:纯输入
{
lockQueue();
while (isFull()) // isFull就是我们在临界区设定的条件
{
// before:当等待的时候,会自动释放mutex_
proBlockWait(); // 阻塞等待,等待被唤醒。被唤醒!=条件被满足
// after:当被唤醒时,是在临界区中醒来的
}
// 条件满足,可以生产数据
pushCore(in);
unlockQueue();
wakeupCon(); // 唤醒消费者
}
// 消费者接口
T pop()
{
lockQueue();
while (isEmpty())
{
conBlockWait();
}
// 条件满足,可以消费数据
T tmp = popCore();
unlockQueue();
wakeupPro(); // 唤醒生产者
return tmp;
}
private:
void lockQueue() { pthread_mutex_lock(&mutex_); }
void unlockQueue() { pthread_mutex_unlock(&mutex_); }
bool isEmpty() { return bq_.empty(); }
bool isFull() { return bq_.size() == cap_; }
// 生产者一定是在临界区中的;
// 1.在阻塞线程的时候,会自动释放mutex_锁
// 2.当阻塞结束返回的时候,pthread_cond_wait会自动重新获得mutex_,然后才返回
void proBlockWait() { pthread_cond_wait(&producerCond_, &mutex_); }
void conBlockWait() { pthread_cond_wait(&consumerCond_, &mutex_); }
void wakeupCon() { pthread_cond_signal(&consumerCond_); }
void wakeupPro() { pthread_cond_signal(&producerCond_); }
void pushCore(const T &in) { bq_.push(in); }
T popCore()
{
T tmp = bq_.front();
bq_.pop();
return tmp;
}
private:
uint32_t cap_; // 容量
queue<T> bq_; // blockqueue
pthread_mutex_t mutex_; // 保护阻塞队列的互斥锁
pthread_cond_t consumerCond_; // 让消费者等待的条件变量
pthread_cond_t producerCond_; // 让生产者等待的条件变量
};
说明:
当判断生产者何消费者条件是否满足是应该用 while,而不是 if 。为什么?
对上述的单生产者单消费者模型进行测试,让生产者生产数据,消费者消费生产者生成的数据:
#include "blockqueue.hpp"
#include
void *consumer(void *args)
{
BlockQueue<int> *bqp = static_cast<BlockQueue<int> *>(args);
while (true)
{
int data = bqp->pop();
cout << "consumer 消费了一个数据:" << data << endl;
}
}
void *producer(void *args)
{
BlockQueue<int> *bqp = static_cast<BlockQueue<int> *>(args);
while (true)
{
// 1.制作数据
int data = rand() % 10;
// 2.生产数据
bqp->push(data);
cout << "producer 生产数据完成:" << data << endl;
sleep(1);
}
}
int main()
{
BlockQueue<int> bq;
bq.push(7);
cout << bq.pop() << endl;
return 0;
}
测试结果如下:
基于计算型任务的生产者消费者模型
在实现上述的生产者消费者模型时,我们将其中存入队列的数据进行了模板化,因此可以在 block queue 中放入其它类型的数据或者特定任务。
下面实现一个基于简单计算任务的生产者消费者模型,在这里我们定义一个任务类(Task),该类用于处理用户给出的运算任务:
#pragma once
#include
#include
class Task
{
public:
Task(int one = 0, int two = 0, char op = '0')
: elemOne_(one), elemTwo_(two), operator_(op)
{
}
int operator()() { return run(); }
int run()
{
int result = 0;
switch (operator_)
{
case '+':
result = elemOne_ + elemTwo_;
break;
case '-':
result = elemOne_ - elemTwo_;
break;
case '*':
result = elemOne_ * elemTwo_;
break;
case '/':
if (elemTwo_ == 0)
{
std::cout << "div zero,about " << std::endl;
result = -1;
}
else
result = elemOne_ / elemTwo_;
break;
case '%':
if (elemTwo_ == 0)
{
std::cout << "mod zero,about " << std::endl;
result = -1;
}
else
result = elemOne_ % elemTwo_;
break;
default:
std::cout << "非法操作:" << operator_ << std::endl;
break;
}
return result;
}
int get(int *e1, int *e2, char *op)
{
*e1 = elemOne_;
*e2 = elemTwo_;
*op = operator_;
}
private:
int elemOne_;
int elemTwo_;
char operator_;
};
将 Task 对象放入生产者消费者模型中的队列,消费者从中拿取任务并进行相应的计算,如下:
#include "blockqueue.hpp"
#include "task.hpp"
#include
const std::string ops = "+-*/%";
void *producer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
// 1.制作任务
int one = rand() % 50;
int two = rand() % 50;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
// 2.生产数据
bq->push(t);
cout << "producer task : " << one << " " << op << " " << two << " = ?" << endl;
sleep(1);
}
}
void *consumer(void *args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while (true)
{
Task t = bq->pop();
int result = t(); // 调用仿函数处理任务
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer calculate result : " << one << " " << op << " " << two << " = " << result << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
BlockQueue<Task> bq;
pthread_t c, p;
pthread_create(&p, nullptr, producer, &bq);
pthread_create(&c, nullptr, consumer, &bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
return 0;
}
运行代码结果如下:
POSIX 信号量和 System V 信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源的目的。然而,POSIX 信号量还可以用于线程间的同步,而 System V 信号量主要用于进程间的同步。
POSIX信号量是一种在多线程环境中同步和互斥访问共享资源的机制。它们可以通过在不同线程之间共享信号量来实现线程间的同步和互斥操作。每个线程都可以通过等待和通知操作来控制对共享资源的访问,从而避免竞争条件和数据访问冲突。
信号量的 PV 操作:
通过使用 P 操作和 V 操作,可以实现对临界资源的控制和同步。当一个进程或线程需要访问临界资源时,它会执行 P 操作来申请资源的使用权限。若资源可用,P 操作成功,计数器减少,进程或线程可以继续访问临界资源。若资源不可以,P 操作会阻塞进程或线程,直到资源可用为止。
当进程或线程完成对临界资源的访问时,它会执行 V 操作来释放资源的使用权限。V 操作会增加计数器的值,使得其它等待的进程或线程可以继续申请资源的使用权限。
🎯由于信号量的 PV 操作中涉及到对信号量值的额修改,如果这些操作不是原子的,可能会导致多个进程或线程同时修改信号量的值,从而引发数据不一致的问题。而信号量本身也是临界资源,因此 PV 操作必须是原子操作。
初始化信号量
sem_init()
函数用于初始化信号量。它的函数原型如下:
#include
int sem_init(sem_t *sem, int pshared, unsigned int value);
如果函数调用成功则返回 0,错误则返回 -1。
销毁信号量
sem_destroy()
函数用于销毁一个已经初始化过的信号量。它的函数原型如下:
int sem_destroy(sem_t *sem);
函数调用成功则返回 0,失败则返回 -1。
注意:只有在所有的线程或进程都不需要使用该信号量时,才应该调用该函数来销毁信号量。否则可能会出现未定义的行为。
等待信号量
sem_wait()
函数用于等待一个信号量,会将信号量的值减 1。它的函数原型如下:
int sem_wait(sem_t *sem); // P()
发布信号量
sem_post()
用于发布一个信号量,表示资源使用完毕,可以归还资源了。将信号量的值加1。它的函数原型如下:
int sem_post(sem_t *sem); // V()
sem_post() 函数会将信号量的值加 1,如果有其它线程或进程正在等待该信号量,则会唤醒其中的一个线程或进程。
环形队列是一个有用的数据结构,它在缓冲区填满后可以重复使用。这种数据结构像一个环,前后相连,当数据达到末端时,下一个数据将回到队列的开始。
环形队列采用数组模拟,用模运算来模拟环状特性:
环形结构起始状态和结束状态都是不一样的,不好判断为空或者为满,所以可以通过加计数器或标记为来判断满或者空。另外也可以预留一个空的位置,作为满的状态。但是我们现在有信号量这个计数器,就可以很简单的进行多线程间的同步过程。
以下是基于环形队列实现的生产者消费者模型:
#pragma once
#include
#include
#include
#include
using namespace std;
const int gCapacity = 5;
template <class T>
class RingQueue
{
public:
RingQueue(int cap = gCapacity) : ringqueue_(cap), pIndex_(0), cIndex_(0)
{
// 生产者信号量初始值为队列容量
sem_init(&roomSem_, 0, ringqueue_.size());
// 消费者信号量初始值为0
sem_init(&dataSem_, 0, 0);
pthread_mutex_init(&pMutex_, nullptr);
pthread_mutex_init(&cIndex_, nullptr);
}
~RingQueue()
{
sem_destroy(&roomSem_);
sem_destroy(&dataSem_);
pthread_mutex_destroy(&pMutex_);
pthread_mutex_destroy(&cMutex_);
}
void push(const T &in)
{
// 申请一个空间,如果没有空间则等待
sem_wait(&roomSem_);
pthread_mutex_lock(&pMutex_);
// 写入数据
ringqueue_[pIndex_] = in;
pIndex_++;
pIndex_ %= ringqueue_.size();
pthread_mutex_unlock(&pMutex_);
// 通知消费者有数据可以读取
sem_post(&dataSem_);
}
T pop()
{
// 申请一个数据,若没有数据则等待
sem_wait(&dataSem_);
pthread_mutex_lock(&cMutex_);
// 读取数据
T temp = ringqueue_[cIndex_];
cIndex_++;
cIndex_ %= ringqueue_.size();
pthread_mutex_unlock(&cMutex_);
// 通知生产者有空间可写
sem_post(&roomSem_);
return temp;
}
private:
vector<T> ringqueue_; // 环形队列
sem_t roomSem_; // 衡量空间计数器,producer,用于控制生产者
sem_t dataSem_; // 衡量数据计数器,consumer,用于控制消费者
uint32_t pIndex_; // 当前生产者写入的位置,如果是多线程,pIndex_也是临界资源
uint32_t cIndex_; // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源
pthread_mutex_t pMutex_; // 生产者的互斥锁
pthread_mutex_t cMutex_; // 消费者的互斥锁
};
说明:
接下来使用下列代码对生产者消费者模型进行测试:
#include "RingQueue.hpp"
#include
#include
void *producer(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
while (true)
{
int data = rand() % 10;
rqp->push(data);
cout << "pthread[" << pthread_self() << "] 生产了一个数据:" << data << endl;
sleep(1);
}
}
void *consumer(void *args)
{
RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);
while (true)
{
int data = rqp->pop();
cout << "pthread[" << pthread_self() << "] 消费了一个数据:" << data << endl;
}
}
int main()
{
srand((unsigned long)time(nullptr) ^ getpid());
RingQueue<int> rq;
pthread_t c1, c2, c3, p1, p2, p3;
pthread_create(&p1, nullptr, producer, &rq);
pthread_create(&p2, nullptr, producer, &rq);
pthread_create(&p3, nullptr, producer, &rq);
pthread_create(&c1, nullptr, consumer, &rq);
pthread_create(&c2, nullptr, consumer, &rq);
pthread_create(&c3, nullptr, consumer, &rq);
pthread_join(c1, nullptr);
pthread_join(c2, nullptr);
pthread_join(c3, nullptr);
pthread_join(p1, nullptr);
pthread_join(p2, nullptr);
pthread_join(p3, nullptr);
return 0;
}
测试结果如下: