在很多情况下,线程需要检查某一条件满足之后,才会继续运行,例如:父线程需要检查子线程是否执行完毕,这种等待如何实现呢?
#include
#include
using namespace std;
void* child(void* args)
{
cout << "child" << endl;
//how to indicate we are down?
return nullptr;
}
int main()
{
cout << "parent begin" << endl;
pthread_t t;
pthread_create(&t, nullptr, child, nullptr);
cout << "parent end" << endl;
return 0;
}
我们期望这样的输出:
parent begin
child
parent end
我们可以尝试使用一个共享变量:
volatile int done = 0;
void* child(void* args)
{
cout << "child" << endl;
done = 1;
return nullptr;
}
int main()
{
cout << "parent begin" << endl;
pthread_t t;
pthread_create(&t, nullptr, child, nullptr);
while(done == 0)
;
cout << "parent end" << endl;
return 0;
}
这种解决方案一般能工作,但是效率低下,因为主线程会自旋检查,浪费CPU时间,我们希望有某种方式让父线程休眠,直到等待的条件满足(这里的条件是子线程执行完毕).
线程可以使用条件变量,来等待一个条件为真,条件变量是一个显式队列,当执行某些状态不满足时,线程可以把自己加入队列,等待该条件. 另外某个线程,当它改变了上述状态时,就可以唤醒一个或多个等待线程(通过在该条件上发信号),让它们继续执行.
要声明这样的条件变量,只要像这样写:pthread_cond_t c;
,这里声明c
是一个条件变量,条件变量有两种相关操作:wait()
和signal()
. 线程要睡眠的时候,调用wait()
,当线程想要唤醒等待在某个条件变量上的睡眠线程时,调用signal().
接下来我们再使用条件变量来实现父线程等待子线程:
#include
#include
using namespace std;
int done = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t c = PTHREAD_COND_INITIALIZER;
void thread_exit()
{
pthread_mutex_lock(&mutex);
done = 1;
//唤醒在此条件变量上等待的线程
pthread_cond_signal(&c);
pthread_mutex_unlock(&mutex);
}
void* child(void* args)
{
cout << "child" << endl;
thread_exit();
return nullptr;
}
void thread_join()
{
pthread_mutex_lock(&mutex);
while(done == 0)
pthread_cond_wait(&c, &mutex); //条件不满足, 主线程睡眠
pthread_mutex_unlock(&mutex);
}
int main()
{
cout << "parent begin" << endl;
pthread_t t;
pthread_create(&t, nullptr, child, nullptr);
thread_join();
cout << "parent end" << endl;
return 0;
}
运行结果:
显然它达到了预期.
你可能注意一点,wait()
调用有一个参数,它是互斥量. 它假定在wait()
调用时,这个互斥量是已上锁状态. wait的职责是释放锁,并让调用线程休眠(原子地).当线程被唤醒时(即另外某个线程发信号给它后),它必须重新获取锁,再返回调用者,这是为了避免线程在陷入休眠时,产生一些竞态条件.
在这个例子中,我们假设线程在发信号和等待时都不加锁,会发生什么问题?
void thread_exit()
{
done = 1;
//唤醒在此条件变量上等待的线程
pthread_cond_signal(&c);
}
void thread_join()
{
while(done == 0)
pthread_cond_wait(&c, &mutex); //条件不满足, 主线程睡眠
}
这里的问题是一个微妙的竞态条件,具体来说,如果主线程调用thread_join()
,然后检查完done的值为0,然后试图睡眠,但在调用wait
进入睡眠之前,主线程被中断. 子线程修改变量done
为1,发出信号,同样没有等待线程,父线程再次运行时,就会长眠不醒.
尽管不是所有情况都严格需要,但有效且简单的做法,还是在使用条件变量发送信号时持有锁,虽然上面的例子是必须加锁的情况,但也有一些情况可以不加锁,但为了简单,请在调用signal
时持有锁.
这个提示的反面,即调用wait
时持有锁,这是wait的语义强制要求的,因为wait调用总是假设你调用它时已经持有锁、调用者睡眠之前会释放锁以及返回之前重新持有锁.
生产者/消费者问题,也叫作有界缓冲区问题.
假设有一个或多个生产者线程和一个或多个消费者线程,生产者把生成的数据项放入缓冲区;消费者从缓冲区中取走数据项,以某种方式消费.
很多实际的系统中都会有这种场景,例如:在多线程的网络服务器中,一个生产者将HTTP请求放入工作队列(即有界缓冲区),消费线程从队列中取走请求并处理.
我们在使用管道连接不同程序的输入输出时,也会使用有界缓冲区,例如grep foo | file.txt | wc -l
,这个例子并发的执行两个进程,grep
进程程从file.txt
中查找包括"foo"
的行,写到标准输出;Unix shell把输出重定向到管道(通过pipe系统调用创建),管道的另一端是wc
进程的标准输入,wc
统计完行数后打印结果,因此grep
进程是生产者,wc
进程是消费者,它们之间是内核中的有界缓冲区.
因为有界缓冲区是共享资源,所以我们必须通过同步机制来访问它,以免产生竞态条件.
首先需要一个共享缓冲区,让生产者放入数据,消费者取出数据,我们先拿一个整数作为缓冲区,两个内部函数将值放入缓冲区,从缓冲区中取值.
//一个整数缓冲区
int buffer;
int count = 0;
void put(int value)
{
assert(count == 0);
//向缓冲区中放数据
count = 1;
buffer = value;
}
int get()
{
assert(count == 1);
//取出缓冲区中的数据
count = 0;
return buffer;
}
put()
函数会假设缓冲区是空的,把一个值存放在缓冲区,然后把count
设置为1
表示缓冲区满了,get()
函数正好相反,把缓冲区清空后并返回该值.现在的这个共享缓冲区只能存放一条数据,但是不用担心,稍后我们会使用队列保存更多数据项.
将数据放入缓冲区和从缓冲区中取走数据,这项工作将由两种类型的线程完成,其中一类我们称之为生产者线程,另一类我们称之为消费者线程.
我们可以使用代码模拟一个生产者和与之对应的消费者:
void* producer(void* arg)
{
int i;
int loops = (int)arg;
for(int i = 0; i < loops; ++i)
put(i);
}
void* consumer(void* arg)
{
int i;
//消费者不断从缓冲区中拿数据
while(1)
{
int tmp = get();
cout << tmp << endl;
}
}
但这是有问题的,假设只有一个生产者和一个消费者,显然,get()
和put()
函数之中会有临界区,因为put()
更新缓冲区,get()
读取缓冲区.但是,给代码加锁没有用,我们还需要别的东西,即条件变量.
void* producer(void* arg)
{
int i = (int)arg;
//临界区
while(1)
{
pthread_mutex_lock(&mutex);
if(count == 1)
pthread_cond_wait(&cond, &mutex);
put(i);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
void* consumer(void* arg)
{
//消费者不断从缓冲区中拿数据
while(1)
{
pthread_mutex_lock(&mutex);
if(count == 0)
pthread_cond_wait(&cond, &mutex);
int tmp = get();
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
cout << tmp << endl;
}
}
让我们来看看生产者和消费者之间的信号逻辑,当生产者想要填充缓冲区时,它等待缓冲区变空,消费者具有完全相同的逻辑,但等待不同的条件-变满.
当只有一个生产者和一个消费者时,上述代码能够正常运行,但如果有超过一个线程,就会引发两个严重的问题:
假设有两个消费者(Tc1和Tc2
),一个生产者(Tp
). 首先,一个消费者开始执行,它获得锁(mutex
),检查缓冲区是否可以消费,然后等待(会释放锁),接着生产者(Tp
)运行,它获得锁,检查缓冲区是否已满,发现没满就给缓冲区添加一个数字,然后生产者发出信号,说缓冲区已满,这让第一个消费者(Tc1
),不再睡眠在条件变量上,进入就绪队列,Tc1
现在可以运行,生产者继续执行,直到发现缓冲区已满后睡眠.
这时问题就发生了:另一个消费者(Tc2
)抢先执行,消费了缓冲区中的值,现在假设Tc1
先运行,在从wait
返回之前,它获得了锁,然后返回,调用了get()
,但缓冲区已无法消费,断言触发!代码不能像预期那样工作.
问题产生的原因很简单:在Tc1
被生产者唤醒后,但在它运行之前,缓冲区中的状态发生了改变(由于Tc2
抢先执行),所以最终的解决办法:使用while
语句代替if.
void* producer(void* arg)
{
int i = (int)arg;
//临界区
while(1)
{
pthread_mutex_lock(&mutex);
while(count == 1)
pthread_cond_wait(&cond, &mutex);
put(i);
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
}
}
void* consumer(void* arg)
{
//消费者不断从缓冲区中拿数据
while(1)
{
pthread_mutex_lock(&mutex);
while(count == 0)
pthread_cond_wait(&cond, &mutex);
int tmp = get();
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
cout << tmp << endl;
}
}
把if改成while,当消费者Tc1
被唤醒后,立刻再次检查共享变量,如果缓冲区此时为空,消费者就会继续睡眠.
假设两个消费者(Tc1,Tc2
)都先运行,都睡眠了. 生产者开始运行,在缓冲区中放入一个值,唤醒了一个消费者(假定是Tc1
),并开始睡眠,现在是一个消费者马上要运行(Tc1
),两个线程(Tc2,Tp
)都等待在同一条件变量上,消费者Tc1
醒过来并从wait
调用返回,重新检查条件,发现缓冲区是满的,消费了这个值,这个消费者然后在该条件上发信号,唤醒一个在睡眠的线程,缓冲区为空,它应该唤醒生产者,但是,如果它唤醒了Tc2
,3
个线程就都睡眠了.
解决方案很简单:使用两个条件变量,而不是一个,以便正确的发出信号,在系统状态改变时,哪类线程应该唤醒:
void* producer(void* arg)
{
int i = (int)arg;
//临界区
while(1)
{
pthread_mutex_lock(&mutex);
while(count == 1)
pthread_cond_wait(&Fill, &mutex);
put(i);
pthread_cond_signal(&Empty);
pthread_mutex_unlock(&mutex);
}
}
void* consumer(void* arg)
{
//消费者不断从缓冲区中拿数据
while(1)
{
pthread_mutex_lock(&mutex);
while(count == 0)
pthread_cond_wait(&Empty, &mutex);
int tmp = get();
pthread_cond_signal(&Fill);
pthread_mutex_unlock(&mutex);
cout << tmp << endl;
}
}
在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构,其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出.(以上操作都是基于不同的线程来讲,线程在对阻塞队列进行操作时会被阻塞).
最终,我们实现一个基于阻塞队列的生产者消费者模型:
#pragma once
#include
#include
#include
#include
using namespace std;
const uint32_t gDefaultCap = 5;
template<class T>
class BlockQueue
{
public:
BlockQueue(uint32_t cap = gDefaultCap) : cap_(cap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&conCond_, nullptr);
pthread_cond_init(&proCond_, nullptr);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&conCond_);
pthread_cond_destroy(&proCond_);
}
public:
//生产接口
void push(const T& in)
{
lockQueue();
while(isFull())
{
//如果满了, 生产者睡眠等待 被唤醒 != 条件满足
proBlockWait();
}
pushCore(in);
wakeupCon();
unlockQueue();
}
//消费接口
T pop()
{
lockQueue();
while(isEmpty())
{
conBlockWait();
}
T tmp = popCore();
wakeupPro();
unlockQueue();
return tmp;
}
private:
void lockQueue()
{
pthread_mutex_lock(&mutex_);
}
void unlockQueue()
{
pthread_mutex_unlock(&mutex_);
}
//阻塞队列是否为空
bool isEmpty()
{
return bq_.empty();
}
bool isFull()
{
return bq_.size() == cap_;
}
//让生产者阻塞等待
void proBlockWait()
{
pthread_cond_wait(&proCond_, &mutex_);
}
void conBlockWait()
{
pthread_cond_wait(&conCond_, &mutex_);
}
void wakeupPro()
{
pthread_cond_signal(&proCond_);
}
void wakeupCon()
{
pthread_cond_signal(&conCond_);
}
void pushCore(const T& in)
{
bq_.push(in);
}
T popCore()
{
T tmp = bq_.front();
bq_.pop();
return tmp;
}
private:
uint32_t cap_; //容量
queue<T> bq_; //阻塞队列
pthread_mutex_t mutex_; //保护阻塞队列的互斥锁
pthread_cond_t conCond_; //让消费者等待的条件变量
pthread_cond_t proCond_; //让生产者等待的条件变量
};
//生产者和消费者线程
const string ops = "+-*/";
void* consumer(void* args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task>*>(args);
while(true)
{
Task t = bq->pop();
int result = t(); //处理任务
int one, two;
char op;
t.get(&one, &two, &op);
cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;
}
}
void* productor(void* args)
{
BlockQueue<Task>* bq = static_cast<BlockQueue<Task>* >(args);
while(true)
{
int one = rand() % 50;
int two = rand() % 20;
char op = ops[rand() % ops.size()];
Task t(one, two, op);
//2. 生产
bq->push(t);
cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;
sleep(1);
}
}
测试基于阻塞队列的生产者消费者模型: