我们已经介绍了 C++ 标准库有关并发编程的常用接口, 以及一些并发安全的数据结构, 算法, 接下来将要介绍的是有关线程的组织方法, 线程池.
线程池的运用主要是减少由于不停的产生和消灭线程而带来的开销, 让固定数量的线程 ( 多核 CPU 支持的最大线程 ) 一直等待任务, 将任务封装后推入队列, 线程则通过工作队列的任务出队获取任务并执行.
以下实现是一个简单的线程池, 通过共有的工作队列, 多线程轮询获取任务, 并且用 future 设置任务等待.
代码需要线程安全的队列, 前面文章介绍过, 不过需要注意, 入队和出队需要移动语义配合, 需要修改一点代码.
另外, 本例使用了 invoke_result 获取结果类型(参考书中用的是 result_of ), 因为 C++17 中 result_of 被舍弃, 并在C++20 中删除.
#include "ThreadSafeQueue_2.h"
#include
#include
#include
#include
#include
#include
#include
// 函数类型擦除类基类
struct implBase
{
virtual void call() = 0;
virtual ~implBase() = default;
};
// 函数类型擦除类
template <typename FuncType>
struct implType : implBase
{
explicit implType(FuncType &&rhs)
: func(std::move(rhs))
{}
void call() override
{
func();
}
private:
FuncType func;
};
// 函数包装类
struct functionWrapper
{
functionWrapper() = default;
// 禁用拷贝构造
functionWrapper(const functionWrapper &) = delete;
// 禁用引用拷贝构造
functionWrapper(functionWrapper &) = delete;
// 禁用拷贝赋值
auto operator=(const functionWrapper &) -> functionWrapper & = delete;
// 移动构造
functionWrapper(functionWrapper &&rhs) noexcept
: impl(std::move(rhs.impl))
{}
// 移动拷贝
auto operator=(functionWrapper &&rhs) noexcept -> functionWrapper &
{
impl = std::move(rhs.impl);
return *this;
}
// 通过可执行类对象构造
template <typename FuncType>
explicit functionWrapper(FuncType &&func)
: impl(new implType<FuncType>(std::forward<FuncType>(func)))
{}
// 执行
void operator()()
{
impl->call();
}
private:
// 通过智能指针封装, 无需担心资源回收
std::unique_ptr<implBase> impl;
};
// 线程池
struct threadPool
{
threadPool()
{
done = false;
const unsigned threadCount = std::thread::hardware_concurrency();
try
{
threads.reserve(threadCount);
for (int i = 0; i < threadCount; ++i)
{
threads.emplace_back(&threadPool::workerThread, this);
}
}
catch (...)
{
done = true;
throw;
}
}
~threadPool()
{
done = true;
for (auto &thread : threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
// 将函数提交到工作队列, 返回 future
template <typename FunctionType>
auto submit(FunctionType func)
-> std::future<typename std::invoke_result<FunctionType>::type>
{
// invoke_result 获取结果类型, 其类型是函数类型而非函数本身,
// 区别是 FunctionType 不带括号, 如有参数用逗号分隔写在后面
using resultType = typename std::invoke_result<FunctionType>::type;
// 封装函数
std::packaged_task<resultType()> task(std::move(func));
// 获取 future
std::future<resultType> result(task.get_future());
// 封装 task, 推入工作队列
// packaged_task 类型只可移动不可拷贝
workQueue.push(functionWrapper(std::move(task)));
return result;
}
private:
// 工作线程
void workerThread()
{
functionWrapper task;
while (!done)
{
if (workQueue.tryPop(task))
{
task();
}
else
{
std::this_thread::yield();
}
}
}
// 所有任务完成标识
std::atomic<bool> done;
// 工作队列
TS::threadSafeQueue<functionWrapper> workQueue;
// 线程数组
std::vector<std::thread> threads;
};
auto main() -> int
{
std::atomic<int> test(0);
threadPool tp;
auto result1 = tp.submit([&test]() {
std::cout << 5 << ' ';
test.store(9, std::memory_order_release);
});
result1.get();
auto result2 = tp.submit([]() {
for (int i = 0; i != 5; ++i)
{
std::cout << i << ' ';
}
});
result2.get();
auto result3 = tp.submit([&test]() {
std::cout << test.load(std::memory_order_acquire) << ' ';
});
result3.get();
return 0;
}
与前面的并行累加不同, 此次约束线程数量的条件只有数据块大小, 剩下的交给线程池处理.
在实际使用中, 并发效率依赖于是否能够取得足够合适的数据块, 太小会引起不必要的 future 传递开销, 太大可能不能充分利用所有线程.
#include "threadPool.h"
#include
#include
// 累加仿函数
template <typename Iterator, typename T>
struct accumulateBlock
{
auto operator()(Iterator first, Iterator last) -> T
{
return std::accumulate(first, last, T());
}
};
template <typename Iterator, typename T>
auto parallelAccumulate(Iterator first, Iterator last, T init) -> T
{
const unsigned long length = std::distance(first, last);
if (!length)
{
return init;
}
const unsigned long blockSize = 25;
const unsigned long numBlocks = (length + blockSize - 1) / blockSize;
std::vector<std::future<T>> futures(numBlocks - 1);
TS::threadPool pool;
Iterator blockStart = first;
for (int i = 0; i != (numBlocks - 1); ++i)
{
Iterator blockEnd = blockStart;
std::advance(blockEnd, blockSize);
futures[i] = pool.submit([=]() {
return accumulateBlock<Iterator, T>()(blockStart, blockEnd);
});
blockStart = blockEnd;
}
T lastResult = accumulateBlock<Iterator, T>()(blockStart, last);
T result = init;
for (int i = 0; i != (numBlocks - 1); ++i)
{
result += futures[i].get();
}
result += lastResult;
return result;
}
auto main() -> int
{
std::vector<int> vi(100);
for (int i = 0; i != 100; ++i)
{
vi[i] = i;
}
int result = parallelAccumulate(vi.begin(), vi.end(), 0);
std::cout << result << std::endl;
TS::threadPool tp;
std::this_thread::sleep_for(std::chrono::seconds(10));
return 0;
}
以上线程池实现并不完善, 比如任务的执行是在一个死循环之中,
当然, 如果线程池无执行任务, 而有其它非线程池的执行任务, 线程池会让渡 CPU 时间给其它任务,
但如果没有其它任务, 则空转会将 CPU 拉满, 如果有其它监测主机状态的程序, 则无法正确监测, 这个还需要改改.
现在这个线程池通过引入条件变量进行通知, 在没有任务时进行等待, 当任务队列 push 到一个任务, 进行通知, 停止线程阻塞, 进入循环, 运行任务程序.
#ifndef THREADPOOL
#define THREADPOOL
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace TS
{
template <typename T>
struct threadSafeQueue
{
threadSafeQueue() = default;
threadSafeQueue(const threadSafeQueue &rhs)
{
std::lock_guard<std::mutex> makeLock(rhs.mutx);
dataQueue = rhs.dataQueue;
}
auto operator=(const threadSafeQueue &rhs) -> threadSafeQueue & = delete;
void push(const T &newVal)
{
std::lock_guard<std::mutex> makeLock(mutx);
dataQueue.push(newVal);
dataCond.notify_one();
}
void push(T &&newVal)
{
std::lock_guard<std::mutex> makeLock(mutx);
dataQueue.push(std::move(newVal));
dataCond.notify_one();
}
// 通过返回 true false 判断赋值是否正确
auto tryPop(T &val) -> bool
{
std::lock_guard<std::mutex> const makeLock(mutx);
if (dataQueue.empty())
{
return false;
}
val = std::move(dataQueue.front());
dataQueue.pop();
return true;
}
// 通过返回 nullptr 或有效 shared_ptr 判定是否正确赋值
auto tryPop() -> std::shared_ptr<T>
{
std::lock_guard<std::mutex> makeLock(mutx);
if (dataQueue.empty())
{
// 返回 nullptr
return std::shared_ptr<T>(nullptr);
}
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
void waitAndPop(T &val)
{
std::unique_lock<std::mutex> makeLock(mutx);
// 如果不为空,加锁,执行下面的程序
// 如果为空,等待传来消息,再判断是否为空,为空则继续等
// 不为空则则锁住,继续下面的程序
// 就是没有通知,也会判断是否为空,为空等,不空锁,向下执行
dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
val = dataQueue.front();
dataQueue.pop();
}
// 如果一直为空,没有程序产生队列数据,会一直阻塞,可能会让程序不可结束
auto waitAndPop() -> std::shared_ptr<T>
{
std::unique_lock<std::mutex> makeLock(mutx);
dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
auto empty() const -> bool
{
std::lock_guard<std::mutex> const makeLock(mutx);
return dataQueue.empty();
}
private:
mutable std::mutex mutx;
std::queue<T> dataQueue;
std::condition_variable dataCond;
};
// 函数类型擦除类基类
struct implBase
{
virtual void call() = 0;
virtual ~implBase() = default;
};
// 函数类型擦除类
template <typename FuncType>
struct implType : implBase
{
explicit implType(FuncType &&rhs)
: func(std::move(rhs))
{}
void call() override
{
func();
}
private:
FuncType func;
};
// 函数包装类
struct functionWrapper
{
functionWrapper() = default;
// 禁用拷贝构造
functionWrapper(const functionWrapper &) = delete;
// 禁用引用拷贝构造
functionWrapper(functionWrapper &) = delete;
// 禁用拷贝赋值
auto operator=(const functionWrapper &) -> functionWrapper & = delete;
// 移动构造
functionWrapper(functionWrapper &&rhs) noexcept
: impl(std::move(rhs.impl))
{}
// 移动拷贝
auto operator=(functionWrapper &&rhs) noexcept -> functionWrapper &
{
impl = std::move(rhs.impl);
return *this;
}
// 通过可执行类对象构造
template <typename FuncType>
explicit functionWrapper(FuncType &&func)
: impl(new implType<FuncType>(std::forward<FuncType>(func)))
{}
// 执行
void operator()()
{
impl->call();
}
private:
// 通过智能指针封装, 无需担心资源回收
std::unique_ptr<implBase> impl;
};
// 线程池
struct threadPool
{
threadPool()
{
done = false;
const unsigned threadCount = std::thread::hardware_concurrency();
try
{
threads.reserve(threadCount);
for (int i = 0; i < threadCount; ++i)
{
threads.emplace_back(&threadPool::workerThread, this);
}
}
catch (...)
{
done = true;
throw;
}
}
~threadPool()
{
done = true;
condVar.notify_all();
for (auto &thread : threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
// 将函数提交到工作队列, 返回 future
template <typename FunctionType>
auto submit(FunctionType func)
-> std::future<typename std::invoke_result<FunctionType>::type>
{
// invoke_result 获取结果类型, 其类型是函数类型而非函数本身,
// 区别是 FunctionType 不带括号, 如有参数用逗号分隔写在后面
using resultType = typename std::invoke_result<FunctionType>::type;
// 封装函数
std::packaged_task<resultType()> task(std::move(func));
// 获取 future
std::future<resultType> result(task.get_future());
// 封装 task, 推入工作队列
// packaged_task 类型只可移动不可拷贝
workQueue.push(functionWrapper(std::move(task)));
condVar.notify_one();
return result;
}
private:
// 工作线程
void workerThread()
{
functionWrapper task;
std::mutex mut;
std::unique_lock<std::mutex> uLock(mut);
while (!done)
{
if (workQueue.tryPop(task))
{
task();
}
else
{
std::this_thread::yield();
condVar.wait(uLock);
}
}
}
// 所有任务完成标识
std::atomic<bool> done;
// 工作队列
TS::threadSafeQueue<functionWrapper> workQueue;
// 线程数组
std::vector<std::thread> threads;
// 条件变量
std::condition_variable condVar;
};
} // namespace TS
#endif
当所有线程池都要从一个公有队列竞争任务, 必然产生竞争, 竞争导致低效. 那么就让每一个线程池都有自己的任务队列.
同样, 为了防止不必要的空转, 也要用条件变量进行等待和开启任务.
#ifndef THREADPOOL
#define THREADPOOL
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace TS
{
template <typename T>
struct threadSafeQueue
{
threadSafeQueue() = default;
threadSafeQueue(const threadSafeQueue &rhs)
{
std::lock_guard<std::mutex> makeLock(rhs.mutx);
dataQueue = rhs.dataQueue;
}
auto operator=(const threadSafeQueue &rhs) -> threadSafeQueue & = delete;
void push(const T &newVal)
{
std::lock_guard<std::mutex> makeLock(mutx);
dataQueue.push(newVal);
dataCond.notify_one();
}
void push(T &&newVal)
{
std::lock_guard<std::mutex> makeLock(mutx);
dataQueue.push(std::move(newVal));
dataCond.notify_one();
}
// 通过返回 true false 判断赋值是否正确
auto tryPop(T &val) -> bool
{
std::lock_guard<std::mutex> const makeLock(mutx);
if (dataQueue.empty())
{
return false;
}
val = std::move(dataQueue.front());
dataQueue.pop();
return true;
}
// 通过返回 nullptr 或有效 shared_ptr 判定是否正确赋值
auto tryPop() -> std::shared_ptr<T>
{
std::lock_guard<std::mutex> makeLock(mutx);
if (dataQueue.empty())
{
// 返回 nullptr
return std::shared_ptr<T>(nullptr);
}
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
void waitAndPop(T &val)
{
std::unique_lock<std::mutex> makeLock(mutx);
// 如果不为空,加锁,执行下面的程序
// 如果为空,等待传来消息,再判断是否为空,为空则继续等
// 不为空则则锁住,继续下面的程序
// 就是没有通知,也会判断是否为空,为空等,不空锁,向下执行
dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
val = dataQueue.front();
dataQueue.pop();
}
// 如果一直为空,没有程序产生队列数据,会一直阻塞,可能会让程序不可结束
auto waitAndPop() -> std::shared_ptr<T>
{
std::unique_lock<std::mutex> makeLock(mutx);
dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
auto empty() const -> bool
{
std::lock_guard<std::mutex> makeLock(mutx);
return dataQueue.empty();
}
private:
mutable std::mutex mutx;
std::queue<T> dataQueue;
std::condition_variable dataCond;
};
// 函数类型擦除类基类
struct implBase
{
virtual void call() = 0;
virtual ~implBase() = default;
};
// 函数类型擦除类
template <typename FuncType>
struct implType : implBase
{
explicit implType(FuncType &&rhs)
: func(std::move(rhs))
{}
void call() override
{
func();
}
private:
FuncType func;
};
// 函数包装类
struct functionWrapper
{
functionWrapper() = default;
// 禁用拷贝构造
functionWrapper(const functionWrapper &) = delete;
// 禁用引用拷贝构造
functionWrapper(functionWrapper &) = delete;
// 禁用拷贝赋值
auto operator=(const functionWrapper &) -> functionWrapper & = delete;
// 移动构造
functionWrapper(functionWrapper &&rhs) noexcept
: impl(std::move(rhs.impl))
{}
// 移动拷贝
auto operator=(functionWrapper &&rhs) noexcept -> functionWrapper &
{
impl = std::move(rhs.impl);
return *this;
}
// 通过可执行类对象构造
template <typename FuncType>
explicit functionWrapper(FuncType &&func)
: impl(new implType<FuncType>(std::forward<FuncType>(func)))
{}
// 执行
void operator()()
{
try
{
impl->call();
}
catch (...)
{
throw;
}
}
private:
// 通过智能指针封装, 无需担心资源回收
std::unique_ptr<implBase> impl;
};
// 线程池
struct threadPool
{
// 构造函数, 线程安全
threadPool()
{
done = false;
const unsigned threadCount = std::thread::hardware_concurrency();
try
{
threads.reserve(threadCount);
for (int i = 0; i < threadCount; ++i)
{
threads.emplace_back(&threadPool::workerThread, this);
}
}
catch (...)
{
done = true;
throw;
}
}
// 析构函数
~threadPool()
{
done = true;
// 当所有任务结束, 通知所有线程, 以便顺利解除阻塞
condVar.notify_all();
for (auto &thread : threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
// 将函数提交到工作队列, 返回 future
template <typename FunctionType>
auto submit(FunctionType func)
-> std::future<typename std::invoke_result<FunctionType>::type>
{
// invoke_result 获取结果类型, 其类型是函数类型而非函数本身,
// 区别是 FunctionType 不带括号, 如有参数用逗号分隔写在后面
using resultType = typename std::invoke_result<FunctionType>::type;
// 封装函数
std::packaged_task<resultType()> task(func);
// 获取 future
std::future<resultType> result(task.get_future());
if (localWorkQueue)
{
// 封装 task, 推入工作队列
// packaged_task 类型只可移动不可拷贝
localWorkQueue->push(functionWrapper(std::move(task)));
// 通知以解除阻塞
condVar.notify_one();
}
else
{
// 当没有局部工作队列, 则将任务放入总队列
poolWorkQueue.push(functionWrapper(std::move(task)));
// 通知以解除阻塞
condVar.notify_one();
}
return result;
}
// 任务运行
auto runPendingTask() -> bool
{
functionWrapper task;
// 如果局部队列有任务,则先行分配
if (localWorkQueue && !localWorkQueue->empty())
{
task = std::move(localWorkQueue->front());
localWorkQueue->pop();
task();
return true;
}
// 否则从公共队列分配任务
if (poolWorkQueue.tryPop(task))
{
task();
return true;
}
// 如有其它程序需要cpu资源, 让渡
std::this_thread::yield();
// 无任务返回 false
return false;
}
private:
// 工作线程
void workerThread()
{
// 初始化局部工作队列
localWorkQueue = std::make_unique<localQueueType>();
std::mutex mtx;
std::unique_lock<std::mutex> uLock(mtx);
while (!done)
{
// 如果没有任务执行则等待, 否则继续循环
if (!runPendingTask())
{
// 等待
condVar.wait(uLock);
}
}
}
// 所有任务完成标识
std::atomic<bool> done;
// 工作队列
TS::threadSafeQueue<functionWrapper> poolWorkQueue;
using localQueueType = std::queue<functionWrapper>;
// 局部静态任务队列智能指针
thread_local static std::unique_ptr<localQueueType> localWorkQueue;
// 线程数组
std::vector<std::thread> threads;
// 条件变量
std::condition_variable condVar;
};
// 对于静态成员, 必须在类内声明, 类外定义
thread_local std::unique_ptr<threadPool::localQueueType>
threadPool::localWorkQueue;
} // namespace TS
#endif
以下是利用上面的线程池的快速排序算法.
#include "threadPool_6.h"
#include
#include
#include
// 排序类
template <typename T>
struct sorter
{
// 排序
auto doSort(std::list<T> &chunkData) -> std::list<T>
{
if (chunkData.empty())
{
return chunkData;
}
// 新建结果链表
std::list<T> result;
// 将被排序链表头节点贴到结果头节点处
result.splice(result.begin(), chunkData, chunkData.begin());
// 设置分割值
const T &partitionVal = *result.begin();
// 分割点, 将待排序链表分为大于分割值和小于分割值两部分,将分隔点返回
auto dividePoint =
std::partition(chunkData.begin(), chunkData.end(),
[&](const T &val) { return val < partitionVal; });
// 新建小端数据块
std::list<T> newLowerChunk;
// 将小于分割点的部分贴到小端数据块后
newLowerChunk.splice(newLowerChunk.end(), chunkData, chunkData.begin(),
dividePoint);
// 获取新小端部分的future, 将排序新小数据块任务加入工作队列
std::future<std::list<T>> newLower = pool.submit(
[&newLowerChunk, this]() { return doSort(newLowerChunk); });
// 当新小端 future 没有 ready 则线程池继续执行任务
while (newLower.wait_for(std::chrono::seconds(0)) !=
std::future_status::ready)
{
pool.runPendingTask();
}
// 新大端部分, 通过排序待排序数据块返回
std::list<T> newHigher(doSort(chunkData));
// 将大端链表贴入结果链表
result.splice(result.end(), newHigher);
// 将小端部分贴入结果链表
result.splice(result.begin(), newLower.get());
return result;
}
private:
TS::threadPool pool;
};
template <typename T>
auto parallelQuickSort(std::list<T> input) -> std::list<T>
{
if (input.empty())
{
return input;
}
sorter<T> sort;
return sort.doSort(input);
}
auto main() -> int
{
std::list<int> listInt;
std::vector<int> vecInt;
vecInt.reserve(10000000);
for (int i = 0; i != 10000000; ++i)
{
vecInt.push_back(i);
}
std::shuffle(vecInt.begin(), vecInt.end(),
std::default_random_engine(time(nullptr)));
std::cout << std::endl;
for (const auto &i : vecInt)
{
listInt.push_back(i);
}
auto rest = parallelQuickSort(listInt);
return 0;
}
随着我们的迭代, 线程池是越来越复杂了, 上一个实现, 已经有了线程专属的工作队列, 如果任务分配均匀, 则已经可以使用.
但任务能均匀的分配么, 一般来说, 比较困难, 那么就会出现各个线程 “闲的闲死, 忙的忙死”, 为了更为高效的使用每个线程, 我们引入线程间可转移工作的队列, 代码见下方.
#ifndef THREADPOOL
#define THREADPOOL
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace TS
{
template <typename T>
struct threadSafeQueue
{
threadSafeQueue() = default;
threadSafeQueue(const threadSafeQueue &rhs)
{
std::lock_guard<std::mutex> makeLock(rhs.mutx);
dataQueue = rhs.dataQueue;
}
auto operator=(const threadSafeQueue &rhs) -> threadSafeQueue & = delete;
void push(const T &newVal)
{
std::lock_guard<std::mutex> makeLock(mutx);
dataQueue.push(newVal);
dataCond.notify_one();
}
void push(T &&newVal)
{
std::lock_guard<std::mutex> makeLock(mutx);
dataQueue.push(std::move(newVal));
dataCond.notify_one();
}
// 通过返回 true false 判断赋值是否正确
auto tryPop(T &val) -> bool
{
std::lock_guard<std::mutex> const makeLock(mutx);
if (dataQueue.empty())
{
return false;
}
val = std::move(dataQueue.front());
dataQueue.pop();
return true;
}
// 通过返回 nullptr 或有效 shared_ptr 判定是否正确赋值
auto tryPop() -> std::shared_ptr<T>
{
std::lock_guard<std::mutex> makeLock(mutx);
if (dataQueue.empty())
{
// 返回 nullptr
return std::shared_ptr<T>(nullptr);
}
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
void waitAndPop(T &val)
{
std::unique_lock<std::mutex> makeLock(mutx);
// 如果不为空,加锁,执行下面的程序
// 如果为空,等待传来消息,再判断是否为空,为空则继续等
// 不为空则则锁住,继续下面的程序
// 就是没有通知,也会判断是否为空,为空等,不空锁,向下执行
dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
val = dataQueue.front();
dataQueue.pop();
}
// 如果一直为空,没有程序产生队列数据,会一直阻塞,可能会让程序不可结束
auto waitAndPop() -> std::shared_ptr<T>
{
std::unique_lock<std::mutex> makeLock(mutx);
dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
auto empty() const -> bool
{
std::lock_guard<std::mutex> makeLock(mutx);
return dataQueue.empty();
}
private:
mutable std::mutex mutx;
std::queue<T> dataQueue;
std::condition_variable dataCond;
};
// 函数类型擦除类基类
struct implBase
{
virtual void call() = 0;
virtual ~implBase() = default;
};
// 函数类型擦除类
template <typename FuncType>
struct implType : implBase
{
explicit implType(FuncType &&rhs)
: func(std::move(rhs))
{}
void call() override
{
func();
}
private:
FuncType func;
};
// 函数包装类
struct functionWrapper
{
functionWrapper() = default;
// 禁用拷贝构造
functionWrapper(const functionWrapper &) = delete;
// 禁用引用拷贝构造
functionWrapper(functionWrapper &) = delete;
// 禁用拷贝赋值
auto operator=(const functionWrapper &) -> functionWrapper & = delete;
// 移动构造
functionWrapper(functionWrapper &&rhs) noexcept
: impl(std::move(rhs.impl))
{}
// 移动拷贝
auto operator=(functionWrapper &&rhs) noexcept -> functionWrapper &
{
impl = std::move(rhs.impl);
return *this;
}
// 通过可执行类对象构造
template <typename FuncType>
explicit functionWrapper(FuncType &&func)
: impl(new implType<FuncType>(std::forward<FuncType>(func)))
{}
// 执行
void operator()()
{
try
{
impl->call();
}
catch (...)
{
throw;
}
}
private:
// 通过智能指针封装, 无需担心资源回收
std::unique_ptr<implBase> impl;
};
// 工作转移队列
struct workStealQueue
{
using dataType = functionWrapper;
workStealQueue() = default;
// 不可拷贝构造
workStealQueue(const workStealQueue &rhs) = delete;
// 不可拷贝赋值
auto operator=(const workStealQueue &rhs) -> workStealQueue & = delete;
// 推入队列
void push(dataType data)
{
const std::lock_guard<std::mutex> lock(theMutex);
theQueue.push_front(std::move(data));
}
// 队列是否为空
auto empty() const -> bool
{
const std::lock_guard<std::mutex> lock(theMutex);
return theQueue.empty();
}
// 试图弹出数据
auto tryPop(dataType &result) -> bool
{
const std::lock_guard<std::mutex> lock(theMutex);
if (theQueue.empty())
{
return false;
}
result = std::move(theQueue.front());
theQueue.pop_front();
return true;
}
// 试图转移数据
auto trySteal(dataType &result) -> bool
{
const std::lock_guard<std::mutex> lock(theMutex);
if (theQueue.empty())
{
return false;
}
result = std::move(theQueue.back());
theQueue.pop_back();
return true;
}
private:
// 双向队列
std::deque<dataType> theQueue;
// 互斥锁, 为了用于 const 函数, 要加 mutable
mutable std::mutex theMutex;
};
// 线程池
struct threadPool
{
// 构造函数, 线程异常安全
threadPool()
: done(false)
{
const unsigned threadCount = std::thread::hardware_concurrency();
try
{
threads.reserve(threadCount);
queues.reserve(threadCount);
for (unsigned i = 0; i < threadCount; ++i)
{
// 初始化工作转移队列并推入 queues
queues.push_back(std::make_unique<workStealQueue>());
// 初始化工作线程并推入线程数组
threads.emplace_back(&threadPool::workerThread, this, i);
}
}
catch (...)
{
done = true;
throw;
}
}
// 析构函数
~threadPool()
{
done = true;
// 当所有任务结束, 通知所有线程, 以便顺利解除阻塞
condVar.notify_all();
for (auto &thread : threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
// 将函数提交到工作队列, 返回 future
template <typename FunctionType>
auto submit(FunctionType func)
-> std::future<typename std::invoke_result<FunctionType>::type>
{
// invoke_result 获取结果类型, 其类型是函数类型而非函数本身,
// 区别是 FunctionType 不带括号, 如有参数用逗号分隔写在后面
using resultType = typename std::invoke_result<FunctionType>::type;
// 封装函数
std::packaged_task<resultType()> task(func);
// 获取 future
std::future<resultType> result(task.get_future());
if (localWorkQueue)
{
// 封装 task, 推入工作队列
// packaged_task 类型只可移动不可拷贝
localWorkQueue->push(functionWrapper(std::move(task)));
// 通知以解除阻塞
condVar.notify_one();
}
else
{
// 当没有局部工作队列, 则将任务放入总队列
poolWorkQueue.push(functionWrapper(std::move(task)));
// 通知以解除阻塞
condVar.notify_one();
}
return result;
}
// 任务运行
auto runPendingTask() -> bool
{
functionWrapper task;
// 从局部队列, 或全局队列, 或其它线程队列获取任务
if (popTaskFromLocalQueue(task) || popTaskFromPoolQueue(task) ||
popTaskFromOtherThreadQueue(task))
{
// 成功则执行任务返回 true
task();
return true;
}
// 如有其它程序需要cpu资源, 让渡
std::this_thread::yield();
// 无任务返回 false
return false;
}
private:
// 工作线程
void workerThread(unsigned rhsMyIndex)
{
// 初始化局部索引
myIndex = rhsMyIndex;
// 初始化局部工作队列
localWorkQueue = queues[myIndex].get();
std::mutex mtx;
std::unique_lock<std::mutex> uLock(mtx);
while (!done)
{
// 如果没有任务执行则等待, 否则继续循环
if (!runPendingTask())
{
// 等待
condVar.wait(uLock);
}
}
}
// 从局部队列弹出任务
static auto popTaskFromLocalQueue(functionWrapper &task) -> bool
{
return (localWorkQueue != nullptr) && localWorkQueue->tryPop(task);
}
// 从全局队列弹出任务
auto popTaskFromPoolQueue(functionWrapper &task) -> bool
{
return poolWorkQueue.tryPop(task);
}
// 从其它线程的局部队列弹出任务
auto popTaskFromOtherThreadQueue(functionWrapper &task) -> bool
{
// 遍历线程池中其它线程的局部队列
for (unsigned i = 0; i < queues.size(); ++i)
{
const unsigned index = (myIndex + i + 1) % queues.size();
// 从其它线程的局部队列弹出任务
if (queues[index]->trySteal(task))
{
return true;
}
}
return false;
}
// 所有任务完成标识
std::atomic<bool> done;
// 工作队列
TS::threadSafeQueue<functionWrapper> poolWorkQueue;
// 转移队列数组
std::vector<std::unique_ptr<workStealQueue>> queues;
// 局部静态任务转移队列指针
static thread_local workStealQueue *localWorkQueue;
// 局部静态索引
static thread_local unsigned myIndex;
// 线程数组
std::vector<std::thread> threads;
// 条件变量
std::condition_variable condVar;
};
// 对于静态成员, 必须在类内声明, 类外定义, 一般放在 .cpp 文件中, 防止二次定义
thread_local workStealQueue *threadPool::localWorkQueue;
thread_local unsigned threadPool::myIndex;
} // namespace TS
#endif
线程池的思想能够较为容易的避免不必要的线程开销, 对于并发编程, 是一个较为常见的技法, 需要掌握.
线程池需要注意的几个地方,
首先是异常安全, 我们要保证异常可以汇总到主线程中,
其次是避免数据竞争, 我们通过为每个线程添加局部工作队列避免所有线程争抢全局队列的数据, 同时为了保证高效, 对于未完成的任务, 可以转移给其它线程处理.
为了通用性, 我们用继承的方法包装函数, 我们包装的是函数而不是函数的结果, 所以在使用的时候, 我们的函数是有状态的, 换句话说, 我们包装的时候要把参数一同打包, 却不能用参数列表, 否则就是包装函数结果, 所以线程池的任务推入基本是如下形式:
std::future<std::list<T>> newLower = pool.submit(
[&newLowerChunk, this]() { return doSort(newLowerChunk); });
通过 lambda 将状态, 也就是要用的参数 newLowerChunk 捕获, 转移给真正需要使用此参数的函数, 如 doSort() 并返回.