上篇文章我们介绍了线程池, 比较复杂, 本文会在线程池的基础上增加中断线程, 完善线程的较高级操作.
线程的中断需要让线程主动而安全的停止, 通过信号的传入结束线程, 将其停止运行.
可中断的线程可以通过将 thread 包装到一个有中断标识的类中, 通过另一个中断检查或等待中断检查函数判断中断标识是否设置, 如果设置, 则线程弹出中断异常, 中断此线程.
#ifndef THREADPOOL
#define THREADPOOL
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace TS
{
这是线程中断异常, 继承自异常类, 会在后续的可中断线程类中用于异常抛出和捕获.
// 线程中断异常
struct threadInterrupted : std::exception
{
[[nodiscard]] auto what() const noexcept -> const char * override
{
return "thread interrupted.";
}
};
以下是用于中断标识类的可锁类, 用于中断标识类的 wait 函数
// 中断标识
struct interruptFlag;
// 自定义锁
template <typename Lockable>
struct customLock
{
customLock(interruptFlag *self_, std::condition_variable_any &cond,
Lockable &lock_);
~customLock();
void unlock();
void lock();
private:
// 中断标识指针
interruptFlag *self;
// 成员锁
Lockable &theLock;
};
以下是中断标识类的具体实现, 中断标识是一个原子布尔类对象, 当中断标识被设置, 线程中一旦触发 interruptionPoint() 函数则会立即抛出异常并中断.
或者线程中有等待逻辑, 在中断设置时发出通知, 被 interruptibleWait() 等待中断函数接受, 抛出异常并中断.
// 中断函数,
// 若线程局部中断标识被设置,
// 则抛出线程中断异常
void interruptionPoint();
// 中断标识
struct interruptFlag
{
// 默认构造
interruptFlag() = default;
// 设置中断标识
void set()
{
// 中断标识设置为 true
atomicFlag.store(true, std::memory_order_relaxed);
// 设置清除锁上锁
const std::lock_guard<std::mutex> lock(setClearMutex);
// 如果线程条件变量指针不为空, 通知所有线程
if (threadCond != nullptr)
{
threadCond->notify_all();
}
// 否则如果线程任意条件变量指针不为空, 通知所有线程
else if (threadCondAny != nullptr)
{
threadCondAny->notify_all();
}
}
// 等待
template <typename Lockable>
void wait(std::condition_variable_any &condVarAny, Lockable &lock)
{
// 初始化自定义锁
// 将条件变量 condVarAny 传递给本中断标识
// 本中断标识的设置锁上锁 setClearMutex.lock()
// 析构时清空本中断标识的条件变量指针
// 并解锁设置锁 setClearMutex.unlock()
customLock<Lockable> cusLock(this, condVarAny, lock);
// 若线程局部中断标识被设置, 则抛出线程中断异常
interruptionPoint();
// 条件变量等待
// 未通知时解锁参数中的 lock 和本中断标识的设置锁
// 通知后将以上两锁加锁
// 这是一个巧妙的结构,
// 等待唤醒和上边检查中断异常之间是可以用 set() 设置中断标识的,
// 但是 set() 想通知所有线程, 则是阻塞的, 因为 setClearMutex 已经锁定
// 只有当下面 wait() 执行释放 cusLock 时, setClearMutex 解锁
// 然后 notify_all 才会进行, 唤醒不会被遗漏
condVarAny.wait(cusLock);
// 若线程局部中断标识被设置, 则抛出线程中断异常
interruptionPoint();
}
// 中断标识是否设置
// 返回 atomicFlag 值
[[nodiscard]] auto isSet() const -> bool
{
return atomicFlag.load(std::memory_order_relaxed);
}
// 设置条件变量
// 给线程条件变量指针赋值
void setConditionVariable(std::condition_variable &condVar)
{
std::lock_guard<std::mutex> const lock(setClearMutex);
threadCond = &condVar;
}
// 将线程条件变量指针置空
void clearConditionVariable()
{
std::lock_guard<std::mutex> const lock(setClearMutex);
threadCond = nullptr;
}
private:
// 原子标识
std::atomic<bool> atomicFlag;
// 线程条件变量指针
std::condition_variable *threadCond = nullptr;
// 线程任意条件变量指针
std::condition_variable_any *threadCondAny = nullptr;
// 互斥锁设置清除锁
std::mutex setClearMutex;
// 友元类
template <typename Lockable>
friend struct customLock;
};
// 自定义锁初始化,
// 获取中断标识指针, 条件变量,
// 中断标识的清理锁加锁,
// 获取中断标识的线程任意条件变量指针
template <typename Lockable>
customLock<Lockable>::customLock(interruptFlag *self_,
std::condition_variable_any &cond,
Lockable &lock_)
: self(self_)
, theLock(lock_)
{
// 中断标识的清理锁加锁
self->setClearMutex.lock();
// 获取中断标识的线程任意条件变量指针
self->threadCondAny = &cond;
}
// 自定义锁析构,
// 中断标识的线程任意条件变量指针置空,
// 中断标识的清理锁释放锁
template <typename Lockable>
customLock<Lockable>::~customLock()
{
self->threadCondAny = nullptr;
self->setClearMutex.unlock();
}
// 自定义锁解锁,
// 锁成员解锁,
// 中断标识的清理锁解锁,
template <typename Lockable>
void customLock<Lockable>::unlock()
{
theLock.unlock();
self->setClearMutex.unlock();
}
// 自定义锁加锁,
// 锁成员加锁, 中断标识清理锁加锁
template <typename Lockable>
void customLock<Lockable>::lock()
{
std::lock(self->setClearMutex, theLock);
}
以下是可中断线程类的具体实现, 依靠一个线程局部的中断标识确保本线程的中断操作.
除了中断函数, 其余的都与普通线程类无异.
构造可中断线程类时, 需要传入一个状态函数 explicit interruptibleThread(FunctionType func), 通常使用 lambda 进行封装, 因为没有放参数的地方. 对于 lambda 不熟悉的同学, 一定要好好学学.
作为参数的状态函数, 通常会是一个循环结构, 包含 interruptionPoint() 检查中断函数或某个 interruptibleWait() 函数, 当线程设置中断标识后, 抛出异常, 中断线程.
// 线程局部中断标识
thread_local interruptFlag thisThreadInterruptFlag;
// 清除局部线程中断标识的条件变量
// 析构函数的一种运用
struct clearCondVarOnDestruct
{
~clearCondVarOnDestruct()
{
thisThreadInterruptFlag.clearConditionVariable();
}
};
// 可中断线程类
struct interruptibleThread
{
// 默认构造函数
interruptibleThread() = default;
// 构造函数
template <typename FunctionType>
explicit interruptibleThread(FunctionType func)
{
// 初始化中断标识指针 promise
std::promise<interruptFlag *> promFlagPtr;
// 初始化内部线程
internalThread = std::thread([func, &promFlagPtr]() {
// 给 通过 lambda 捕获的 promFlagPtr 赋值局部线程中断标识的指针
promFlagPtr.set_value(&thisThreadInterruptFlag);
// 试图运行 func
try
{
func();
}
// 捕获线程中断异常
catch (const threadInterrupted &thrIntrpt)
{}
});
// 通过 future 传递局部线程中断标识指针给 intrptFlag
intrptFlag = promFlagPtr.get_future().get();
}
// 不可拷贝构造
interruptibleThread(const interruptibleThread &) = delete;
// 不可拷贝赋值
auto operator=(const interruptibleThread &)
-> interruptibleThread & = delete;
// 移动构造
interruptibleThread(interruptibleThread &&rhs) noexcept
: internalThread(std::move(rhs.internalThread))
, intrptFlag(rhs.intrptFlag)
{
rhs.intrptFlag = nullptr;
}
// 移动赋值
auto operator=(interruptibleThread &&rhs) noexcept -> interruptibleThread &
{
internalThread = std::move(rhs.internalThread);
intrptFlag = rhs.intrptFlag;
rhs.intrptFlag = nullptr;
return *this;
}
void join()
{
internalThread.join();
}
void detatch()
{
internalThread.detach();
}
[[nodiscard]] auto joinable() const -> bool
{
return internalThread.joinable();
}
// 中断
void interrupt()
{
// 如果中断标识指针不为空
// 设置中断标识
if (intrptFlag != nullptr)
{
intrptFlag->set();
}
}
private:
// 内部线程
std::thread internalThread;
// 中断标识指针
interruptFlag *intrptFlag = nullptr;
};
// 中断函数,
// 若线程局部中断标识被设置,
// 则抛出线程中断异常
inline void interruptionPoint()
{
// 若线程局部中断标识被设置,
// 则抛出线程中断异常
if (thisThreadInterruptFlag.isSet())
{
throw threadInterrupted();
}
}
// 中断等待
// 封装条件变量的等待, 同时兼有中断功能
// 不足: 过多的伪唤醒
inline void interruptibleWait(std::condition_variable &condVar,
std::unique_lock<std::mutex> &lock)
{
// 判断是否中断
interruptionPoint();
// 设置局部线程中断标识的条件变量指针
// 这样, 当中断标识设置 set() 时, 会通知所有阻塞的条件变量
thisThreadInterruptFlag.setConditionVariable(condVar);
// 析构时清理上面设置的变量指针
// 为了防止下面 wait_for 抛异常, 无法清空中断标识的条件变量指针
const clearCondVarOnDestruct guard;
// 判断是否中断
interruptionPoint();
// 等待唤醒, 如未通知, 1毫秒后唤醒
// 为了防止线程中断标识在判断是否中断和 wait 之间设置 set(),
// 导致通知错过, 无法唤醒
// 此处用 wait_for
condVar.wait_for(lock, std::chrono::milliseconds(1));
// 判断是否中断
// 无中断则继续
interruptionPoint();
}
// 有前置条件的中断等待
// 不足: 前置条件的检查次数过多
template <typename Predicate>
void interruptibleWait(std::condition_variable &condVar,
std::unique_lock<std::mutex> &lock, Predicate pred)
{
// 判断是否中断
interruptionPoint();
// 局部线程中断标识设置条件变量
thisThreadInterruptFlag.setConditionVariable(condVar);
// 析构时清除上面设置的条件变量指针
clearCondVarOnDestruct const guard;
// 当未设置局部线程中断标识, 且未达到前置条件, 进行循环
while (!thisThreadInterruptFlag.isSet() && !pred())
{
// 等待唤醒, 如未通知, 1毫秒后唤醒
condVar.wait_for(lock, std::chrono::milliseconds(1));
}
// 判断是否中断
interruptionPoint();
}
// 通过局部线程中断标识的 wait() 函数实现的中断等待
template <typename Lockable>
void interruptibleWait(std::condition_variable_any &conVar, Lockable &lock)
{
thisThreadInterruptFlag.wait(conVar, lock);
}
// 通过等待 future 完成实现中断等待
template <typename T>
void interruptibleWait(std::future<T> &theFuture)
{
// 如果线程中断标识未设置, 则循环
while (!thisThreadInterruptFlag.isSet())
{
// 如果 theFuture 等待 1 毫秒后 ready 则中断循环
if (theFuture.wait_for(std::chrono::milliseconds(1)) ==
std::future_status::ready)
{
break;
}
}
// 检查是否中断
interruptionPoint();
}
以上是可中断线程及其相关中断函数, 以下是在线程池中使用可中断线程类, 可以根据线程编号中断某个具体线程, 在完成所有任务后, 线程进入等待, 然后线程中断.
另外注意, 线程池中任务队列没有阻塞机制, 也就是说由于任务是一个一个弹出的, 线程池只能被当前执行的任务所阻塞, 一旦线程池析构, 将抛弃所有任务队列中未完成任务, 所以使用线程池, 你需要有一个等待机制, 比如任务数是否为0, 这个应该不难实现.
目前的线程池以经非常复杂了, 但依然有可改善空间, 比如引入无锁队列, 至于是否能提高效率, 还需要具体测试.
template <typename T>
struct threadSafeQueue
{
threadSafeQueue() = default;
threadSafeQueue(const threadSafeQueue &rhs)
{
std::lock_guard<std::mutex> makeLock(rhs.mutx);
dataQueue = rhs.dataQueue;
}
auto operator=(const threadSafeQueue &rhs) -> threadSafeQueue & = delete;
void push(const T &newVal)
{
std::lock_guard<std::mutex> makeLock(mutx);
dataQueue.push(newVal);
dataCond.notify_one();
}
void push(T &&newVal)
{
std::lock_guard<std::mutex> makeLock(mutx);
dataQueue.push(std::move(newVal));
dataCond.notify_one();
}
// 通过返回 true false 判断赋值是否正确
auto tryPop(T &val) -> bool
{
std::lock_guard<std::mutex> const makeLock(mutx);
if (dataQueue.empty())
{
return false;
}
val = std::move(dataQueue.front());
dataQueue.pop();
return true;
}
// 通过返回 nullptr 或有效 shared_ptr 判定是否正确赋值
auto tryPop() -> std::shared_ptr<T>
{
std::lock_guard<std::mutex> makeLock(mutx);
if (dataQueue.empty())
{
// 返回 nullptr
return std::shared_ptr<T>(nullptr);
}
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
void waitAndPop(T &val)
{
std::unique_lock<std::mutex> makeLock(mutx);
// 如果不为空,加锁,执行下面的程序
// 如果为空,等待传来消息,再判断是否为空,为空则继续等
// 不为空则则锁住,继续下面的程序
// 就是没有通知,也会判断是否为空,为空等,不空锁,向下执行
dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
val = dataQueue.front();
dataQueue.pop();
}
// 如果一直为空,没有程序产生队列数据,会一直阻塞,可能会让程序不可结束
auto waitAndPop() -> std::shared_ptr<T>
{
std::unique_lock<std::mutex> makeLock(mutx);
dataCond.wait(makeLock, [this] { return !dataQueue.empty(); });
std::shared_ptr<T> res(std::make_shared<T>(dataQueue.front()));
dataQueue.pop();
return res;
}
auto empty() const -> bool
{
std::lock_guard<std::mutex> makeLock(mutx);
return dataQueue.empty();
}
private:
mutable std::mutex mutx;
std::queue<T> dataQueue;
std::condition_variable dataCond;
};
// 函数类型擦除类基类
struct implBase
{
virtual void call() = 0;
virtual ~implBase() = default;
};
// 函数类型擦除类
template <typename FuncType>
struct implType : implBase
{
explicit implType(FuncType &&rhs)
: func(std::move(rhs))
{}
void call() override
{
func();
}
private:
FuncType func;
};
// 函数包装类
struct functionWrapper
{
functionWrapper() = default;
// 禁用拷贝构造
functionWrapper(const functionWrapper &) = delete;
// 禁用引用拷贝构造
functionWrapper(functionWrapper &) = delete;
// 禁用拷贝赋值
auto operator=(const functionWrapper &) -> functionWrapper & = delete;
// 移动构造
functionWrapper(functionWrapper &&rhs) noexcept
: impl(std::move(rhs.impl))
{}
// 移动拷贝
auto operator=(functionWrapper &&rhs) noexcept -> functionWrapper &
{
impl = std::move(rhs.impl);
return *this;
}
// 通过可执行类对象构造
template <typename FuncType>
explicit functionWrapper(FuncType &&func)
: impl(new implType<FuncType>(std::forward<FuncType>(func)))
{}
// 执行
void operator()()
{
try
{
impl->call();
}
catch (...)
{
throw;
}
}
private:
// 通过智能指针封装, 无需担心资源回收
std::unique_ptr<implBase> impl;
};
// 工作转移队列
struct workStealQueue
{
using dataType = functionWrapper;
workStealQueue() = default;
// 不可拷贝构造
workStealQueue(const workStealQueue &rhs) = delete;
// 不可拷贝赋值
auto operator=(const workStealQueue &rhs) -> workStealQueue & = delete;
// 推入队列
void push(dataType data)
{
const std::lock_guard<std::mutex> lock(theMutex);
theQueue.push_front(std::move(data));
}
// 队列是否为空
auto empty() const -> bool
{
const std::lock_guard<std::mutex> lock(theMutex);
return theQueue.empty();
}
// 试图弹出数据
auto tryPop(dataType &result) -> bool
{
const std::lock_guard<std::mutex> lock(theMutex);
if (theQueue.empty())
{
return false;
}
result = std::move(theQueue.front());
theQueue.pop_front();
return true;
}
// 试图转移数据
auto trySteal(dataType &result) -> bool
{
const std::lock_guard<std::mutex> lock(theMutex);
if (theQueue.empty())
{
return false;
}
result = std::move(theQueue.back());
theQueue.pop_back();
return true;
}
private:
// 双向队列
std::deque<dataType> theQueue;
// 互斥锁, 为了用于 const 函数, 要加 mutable
mutable std::mutex theMutex;
};
// 线程池
struct threadPool
{
// 构造函数, 线程异常安全
threadPool()
: done(false)
{
const unsigned threadCount = std::thread::hardware_concurrency();
poolSize = threadCount;
try
{
queues.reserve(threadCount);
for (unsigned i = 0; i < threadCount; ++i)
{
// 初始化工作转移队列并推入 queues
queues.push_back(std::make_unique<workStealQueue>());
}
threads.reserve(threadCount);
for (unsigned i = 0; i < threadCount; ++i)
{
// 初始化工作线程并推入线程数组
// threads.emplace_back(&threadPool::workerThread, this, i);
threads.emplace_back([this, i]() { workerThread(i); });
}
}
catch (...)
{
done = true;
throw;
}
}
// 析构函数
~threadPool()
{
done = true;
// 当所有任务结束, 通知所有线程, 以便顺利解除阻塞
condVar.notify_all();
for (auto &thread : threads)
{
if (thread.joinable())
{
thread.join();
}
}
}
// 将函数提交到工作队列, 返回 future
template <typename FunctionType>
auto submit(FunctionType func)
-> std::future<typename std::invoke_result<FunctionType>::type>
{
// invoke_result 获取结果类型, 其类型是函数类型而非函数本身,
// 区别是 FunctionType 不带括号, 如有参数用逗号分隔写在后面
using resultType = typename std::invoke_result<FunctionType>::type;
// 封装函数
std::packaged_task<resultType()> task(func);
// 获取 future
std::future<resultType> result(task.get_future());
if (localWorkQueue != nullptr)
{
// 封装 task, 推入工作队列
// packaged_task 类型只可移动不可拷贝
localWorkQueue->push(functionWrapper(std::move(task)));
// 通知以解除阻塞
condVar.notify_one();
}
else
{
// 当没有局部工作队列, 则将任务放入总队列
poolWorkQueue.push(functionWrapper(std::move(task)));
// 通知以解除阻塞
condVar.notify_one();
}
return result;
}
// 将函数均匀的提交到工作队列, 返回 future
template <typename FunctionType>
auto submitAll(FunctionType func)
-> std::future<typename std::invoke_result<FunctionType>::type>
{
// invoke_result 获取结果类型, 其类型是函数类型而非函数本身,
// 区别是 FunctionType 不带括号, 如有参数用逗号分隔写在后面
using resultType = typename std::invoke_result<FunctionType>::type;
// 封装函数
std::packaged_task<resultType()> task(func);
// 获取 future
std::future<resultType> result(task.get_future());
// 封装 task, 推入工作队列
// packaged_task 类型只可移动不可拷贝
queues[taskCnt++ % poolSize]->push(functionWrapper(std::move(task)));
// 通知以解除阻塞
condVar.notify_one();
return result;
}
// 任务运行
auto runPendingTask() -> bool
{
functionWrapper task;
// 从局部队列, 或全局队列, 或其它线程队列获取任务
if (popTaskFromLocalQueue(task) || popTaskFromPoolQueue(task) ||
popTaskFromOtherThreadQueue(task))
{
// 成功则执行任务返回 true
task();
return true;
}
// 如有其它程序需要cpu资源, 让渡
std::this_thread::yield();
// 无任务返回 false
return false;
}
// 中断线程
void interrupt(unsigned threadId)
{
if (threadId < poolSize)
{
threads[threadId].interrupt();
}
}
// 获取线程池中线程数
auto size() const -> unsigned
{
return poolSize;
}
private:
// 工作线程
void workerThread(unsigned rhsMyIndex)
{
// 初始化局部索引
myIndex = rhsMyIndex;
// 初始化局部工作队列
localWorkQueue = queues[myIndex].get();
std::mutex mtx;
std::unique_lock<std::mutex> uLock(mtx);
while (!done)
{
// 线程中断点
// interruptionPoint();
// 如果没有任务执行则等待, 否则继续循环
if (!runPendingTask())
{
// 等待
// condVar.wait_for(uLock, std::chrono::seconds(1));
// condVar.wait(uLock);
interruptibleWait(condVar, uLock);
}
}
}
// 从局部队列弹出任务
static auto popTaskFromLocalQueue(functionWrapper &task) -> bool
{
return (localWorkQueue != nullptr) && localWorkQueue->tryPop(task);
}
// 从全局队列弹出任务
auto popTaskFromPoolQueue(functionWrapper &task) -> bool
{
return poolWorkQueue.tryPop(task);
}
// 从其它线程的局部队列弹出任务
auto popTaskFromOtherThreadQueue(functionWrapper &task) -> bool
{
// 遍历线程池中其它线程的局部队列
for (unsigned i = 0; i < queues.size(); ++i)
{
const unsigned index = (myIndex + i + 1) % queues.size();
// 从其它线程的局部队列弹出任务
if (queues[index]->trySteal(task))
{
return true;
}
}
return false;
}
// 所有任务完成标识
std::atomic<bool> done;
std::atomic<size_t> taskCnt = 0;
// 工作队列
TS::threadSafeQueue<functionWrapper> poolWorkQueue;
// 转移队列数组
std::vector<std::unique_ptr<workStealQueue>> queues;
// 局部静态任务转移队列指针
static thread_local workStealQueue *localWorkQueue;
// 局部静态索引
static thread_local unsigned myIndex;
// 线程数组
std::vector<interruptibleThread> threads;
unsigned poolSize = 0;
// 条件变量
std::condition_variable_any condVar;
};
// 对于静态成员, 必须在类内声明, 类外定义, 一般放在 .cpp 文件中, 防止二次定义
thread_local workStealQueue *threadPool::localWorkQueue;
thread_local unsigned threadPool::myIndex;
} // namespace TS
#endif
以下是一个运行示例, 向线程池压入任务, 并中断线程池中的大部分线程.
你可能会发现, 在任务未结束时, 线程是无法中断的, 当当前任务完成, 再次压入任务, 则只有1个线程运行, 也就是其它线程已经中断, 无法复用.
#include
#include
#include
auto main() -> int
{
TS::threadPool tpl;
int const num = 100;
for (int i = 0; i != num; ++i)
{
tpl.submitAll([i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10 * i));
std::cout << i << ' ';
});
}
for (int i = 0; i != 15; ++i)
{
tpl.interrupt(i);
}
char chr;
std::cout << "input: ";
std::cin >> chr;
for (int i = 0; i != num; ++i)
{
tpl.submitAll([i]() {
std::this_thread::sleep_for(std::chrono::milliseconds(10 * i));
std::cout << i << ' ';
});
}
std::cin >> chr;
return 0;
线程中断部分已经讲解完成, 代码稍显复杂, 可中断线程类是可以不必配合线程池使用的, 但基本的运用方法和线程池差不多.