多线程逻辑是乱序的,然而很多时候,我们却需要一定的逻辑顺序,但这种顺序又不简单是单线程的一贯顺序,此时我们需要通过一些手段,来使得多线程变得有序。
前几篇文章讲述了数据保护,用互斥锁确实可以达到某个线程等待另一个线程的目的,但这不是它的使用场景,我们将引入条件变量和 std::future.
条件变量的语义是当条件成立则上锁进行下一步操作,否则等待。由此,我们可以令多线程有逻辑顺序的执行。
C++的头文件库
我们用条件变量实现一个简单的 生产–使用 逻辑的程序,
void dataPreparationThread() 函数负责生产数据,通过加锁 dataQueue 安全的传输数据,并在每产生一个数据后通知 void dataProcessingThread() 函数的条件变量 dataCond。
后者会取得锁,并停止阻滞,从 dataQueue 进行安全的数据转移。随后处理数据。
#include
#include
#include
#include
#include
struct dataChunk
{
explicit dataChunk(int aa)
: a(aa)
{}
void show() const
{
std::cout << a;
}
auto equalNum(int num) const -> bool
{
return a == num;
}
private:
int a = 0;
};
std::mutex mut;
std::queue<dataChunk> dataQueue;
std::condition_variable dataCond;
auto prepareData() -> dataChunk
{
static int dataNum = 0;
return dataChunk{dataNum++};
}
auto moreDataToPrepare() -> bool
{
static int times = 10;
return times-- > 0;
}
void dataPreparationThread()
{
while (moreDataToPrepare())
{
const dataChunk data = prepareData();
{
std::lock_guard<std::mutex> lk(mut);
dataQueue.push(data);
}
dataCond.notify_one();
}
}
void process(const dataChunk &data)
{
data.show();
}
auto isLastChunk(const dataChunk &data) -> bool
{
return data.equalNum(8);
}
void dataProcessingThread()
{
while (true)
{
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk, [] { return !dataQueue.empty(); });
dataChunk data = dataQueue.front();
dataQueue.pop();
lk.unlock();
process(data);
if (isLastChunk(data))
{
break;
}
}
}
auto main() -> int
{}
条件变量的 wait() 函数的等待过程,是持锁,判断条件,条件成立,继续持锁,并向下执行,否则放开锁,继续等待直到条件成立。由于需要持锁,放开锁,所以需要 std::unique_lock 配合。
此例中wait() 函数的实参是一个锁和一个返回bool值的 lambda 函数,非常方便,当然我们也可以传入更复杂的判断函数。
另外,由于wait() 函数存在伪唤醒现象,即未经 dataCond.notify_one() 通知就自行持锁,判断条件这一过程,而且根据标准,伪唤醒频率不确定,所以一般不给 wait() 传入含有副作用的判读函数。
c++标准库有自己的队列实现,但并非多线程安全,好在改造为线程安全的并不困难。
首先,禁止复制
只留下入队,出队,判断是否为空。
新增等待出队,利用条件变量
#include
#include
#include
#include
template <typename T>
struct threadSafeQueue
{
threadSafeQueue() = default;
threadSafeQueue(const threadSafeQueue &rhs)
{
std::lock_guard<std::mutex> lk(rhs.mut);
dataQueue = rhs.dataQueue;
}
auto operator=(const threadSafeQueue &rhs) -> threadSafeQueue & = delete;
void push(T newVal)
{
std::lock_guard<std::mutex> lk(mut);
dataQueue.push(newVal);
dataCond.notify_one();
}
//通过返回 true false 判断赋值是否正确
auto tryPop(T &val) -> bool
{
std::lock_guard<std::mutex> lk(mut);
if (dataQueue.empty())
{
return false;
}
val = dataQueue.front();
dataQueue.pop();
return true;
}
//通过返回 nullptr 或有效 shared_ptr 判定是否正确赋值
auto tryPop() -> std::shared_ptr<T>
{
std::lock_guard<std::mutex> lk(mut);
if (dataQueue.empty())
{
//返回 nullptr
return std::shared_ptr<T>(nullptr);
}
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
void waitAndPop(T &val)
{
std::unique_lock<std::mutex> lk(mut);
//如果不为空,加锁,执行下面的程序
//如果为空,等待传来消息,再判断是否为空,为空则继续等
//不为空则则锁住,继续下面的程序
//就是没有通知,也会判断是否为空,为空等,不空锁,向下执行
dataCond.wait(lk, [this] { return !dataQueue.empty(); });
val = dataQueue.front();
dataQueue.pop();
}
auto waitAndPop() -> std::shared_ptr<T>
{
std::unique_lock<std::mutex> lk(mut);
dataCond.wait(lk, [this] { return !dataQueue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
auto empty() const -> bool
{
std::lock_guard<std::mutex> lk(mut);
return dataQueue.empty();
}
private:
//互斥锁必须可变状态,用 mutable 修饰
mutable std::mutex mut;
std::queue<T> dataQueue;
//条件变量
std::condition_variable dataCond;
};
auto main() -> int
{}
条件变量的引入,令多线程编程的逻辑顺序可以容易操控,同时要注意条件变量的伪唤醒。
通过以前文章的锁,本文的条件变量,很容易实现符合多线程逻辑的队列容器,读者可根据自己需要进行改良。