/*
* @Author : Y. F. Zhang
* @Date : 2022-11-09
* @copyleft Apache 2.0
*/
#ifndef THREADPOOLV2_H
#define THREADPOOLV2_H
#include
#include
#include
#include
#include
#include
class ThreadPoolV2 {
private:
struct Pool {
std::queue<std::function<void()>> tasks_;
std::mutex mtx_;
std::condition_variable cond_;
std::atomic<bool> isClosed_;
std::atomic<int> doTaskNum_;
};
std::shared_ptr<Pool> pool_;
public:
explicit ThreadPoolV2(int threadNum) : pool_(std::make_shared<Pool>()) {
for (int i = 0; i < threadNum; ++i) {
std::thread(
[pool = pool_] () {
while (true) {
if (pool->isClosed_.load()) break;
std::unique_lock<std::mutex> lk(pool->mtx_);
// The consumer implemented by conditional variable uses "while" instead of "if" to prevent false wake-up
while (pool->tasks_.empty()) {
pool->cond_.wait(lk);
}
auto task = std::move(pool->tasks_.front());
pool->tasks_.pop();
lk.unlock(); // before excute task, unlock to allow other threads waken
pool->doTaskNum_.fetch_add(1);
task();
pool->doTaskNum_.fetch_sub(1);
}
}
).detach();
}
}
~ThreadPoolV2() {
shutdown();
}
void shutdown() {
pool_->isClosed_.store(true);
pool_->cond_.notify_all();
}
template<class F> // 完美转发需要用模板
void addTask(F&& t) {
{
std::unique_lock<std::mutex> lk(pool_->mtx_);
pool_->tasks_.push(std::forward<F>(t));
}
pool_->cond_.notify_one();
}
int getDoTaskThreadNum() {
return pool_->doTaskNum_.load();
}
};
#endif
这里使用比较经典的条件变量式的“生产者-消费者”模型实现方式。值得注意的是:
while (pool->tasks_.empty()) {
pool->cond_.wait(lk);
}
这里使用while,而不是if,借此可以避免“虚假唤醒”问题。其次,生产者(见下面一段代码)notify_one()
时,如果没有线程wait在条件变量上(线程池较忙),此时也无妨。因为当某个线程执行完task后再次开始循环会检车任务队列是否为空,若不为空直接跳过wait开始执行task。
再看:
auto task = std::move(pool->tasks_.front());
pool->tasks_.pop();
lk.unlock();
pool->doTaskNum_.fetch_add(1);
task();
pool->doTaskNum_.fetch_sub(1);
这里会在执行task之前手动unlock
,因为task可能往往比较耗时,而task之间是解耦的,不需要在执行task时线程互斥,这样实际上并没有发挥多线程并发执行任务的功能。因此在这里unlock可以使被notify但还没拿到锁的线程拿到锁去取它的task。
再者:
template<class F> // 完美转发需要用模板
void addTask(F&& t) {
{
std::unique_lock<std::mutex> lk(pool_->mtx_);
pool_->tasks_.push(std::forward<F>(t));
}
pool_->cond_.notify_one();
}
这里使用完美转发,完美转发需要配合模板推导规则,因此这里采用函数模板。
#include "threadpoolv2.h"
#include
#include
#include
#include
using namespace std;
void task1() {
cout << "task1 done!" << endl;
}
void task2() {
cout << "task2 done!" << endl;
}
void task3() {
cout << "task3 done!" << endl;
}
void task4() {
cout << "task4 done!" << endl;
}
int main(int argc, char** argv) {
auto p = new ThreadPoolV2(4);
thread([p](){
for(int i = 0; i < 5; ++i) {
p->addTask(bind(task1));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}).detach();
thread([p](){
for(int i = 0; i < 5; ++i) {
p->addTask(bind(task2));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}).detach();
thread([p](){
for(int i = 0; i < 5; ++i) {
p->addTask(bind(task3));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}).detach();
thread([p](){
for(int i = 0; i < 5; ++i) {
p->addTask(bind(task4));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}).detach();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
return 0;
}
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include
#include
#include
#include
#include
#include
#define THREAD_NNUMBER 4
#define MAX_TASK_NUMBER 1024
typedef std::function<void(void)> Task;
class ThreadPool {
public:
static ThreadPool* getInstance() {
if (threadPool_ == nullptr) {
std::unique_lock<std::mutex> lk(singleMutex_);
if (threadPool_ == nullptr) {
threadPool_ = new ThreadPool(THREAD_NNUMBER);
}
}
return threadPool_;
}
void addTask(Task t) {
std::unique_lock<std::mutex> lk(mtx_);
while (queue_.size() >= MAX_TASK_NUMBER) {
condProductor_.wait(lk);
}
queue_.push(t);
lk.unlock();
condConsumer_.notify_one();
}
void stop() {
std::unique_lock<std::mutex> lk(mtx_);
run_.store(false);
condConsumer_.notify_all();
}
private:
ThreadPool(uint32_t threadNum){
threadNum_ = threadNum;
run_.store(true);
for (int i = 0; i < threadNum_; ++i) {
threadVec_.push_back(
std::thread([this]() {
while (run_.load()) {
std::unique_lock<std::mutex> lk(mtx_);
while (queue_.empty()) {
condConsumer_.wait(lk);
if (!run_.load()) {
break;
}
}
Task task = queue_.front();
queue_.pop();
condProductor_.notify_one();
lk.unlock();
task();
}
})
);
threadVec_.back().detach();
}
};
std::queue<Task> queue_;
std::vector<std::thread> threadVec_;
static ThreadPool* threadPool_;
std::mutex mtx_;
static std::mutex singleMutex_;
std::condition_variable condConsumer_;
std::condition_variable condProductor_;
uint32_t threadNum_;
std::atomic<bool> run_;
};
ThreadPool* ThreadPool::threadPool_ = nullptr;
std::mutex ThreadPool::singleMutex_;
#endif