• 一个简单而优雅的C++线程池


    实现技术

    1. 条件变量
    2. unique_lock
    3. thread

    代码

    /*
     * @Author       : Y. F. Zhang
     * @Date         : 2022-11-09
     * @copyleft Apache 2.0
     */ 
    
    #ifndef THREADPOOLV2_H
    #define THREADPOOLV2_H
    
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    class ThreadPoolV2 {
    private:
        struct Pool {
            std::queue<std::function<void()>> tasks_;
            std::mutex mtx_;
            std::condition_variable cond_;
            std::atomic<bool> isClosed_;
            std::atomic<int> doTaskNum_;
        };
        std::shared_ptr<Pool> pool_;
    public:
        explicit ThreadPoolV2(int threadNum) : pool_(std::make_shared<Pool>()) {
            for (int i = 0; i < threadNum; ++i) {
                std::thread(
                    [pool = pool_] () {
                        while (true) { 
                            if (pool->isClosed_.load()) break;
                            std::unique_lock<std::mutex> lk(pool->mtx_);
                            // The consumer implemented by conditional variable uses "while" instead of "if" to prevent false wake-up
                            while (pool->tasks_.empty()) { 
                                pool->cond_.wait(lk);
                            }
                            auto task = std::move(pool->tasks_.front());
                            pool->tasks_.pop();
                            lk.unlock(); // before excute task, unlock to allow other threads waken
                            pool->doTaskNum_.fetch_add(1);
                            task();
                            pool->doTaskNum_.fetch_sub(1);
                        }
                    }
                ).detach();
            }
        }
    
        ~ThreadPoolV2() {
            shutdown();
        }
    
        void shutdown() {
            pool_->isClosed_.store(true);
            pool_->cond_.notify_all();
        }
        
        template<class F> // 完美转发需要用模板
        void addTask(F&& t) {
            {
                std::unique_lock<std::mutex> lk(pool_->mtx_);
                pool_->tasks_.push(std::forward<F>(t));
            }
            pool_->cond_.notify_one();
        }
    
        int getDoTaskThreadNum() {
            return pool_->doTaskNum_.load();
        }
    };
    
    #endif
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    这里使用比较经典的条件变量式的“生产者-消费者”模型实现方式。值得注意的是:

    while (pool->tasks_.empty()) { 
        pool->cond_.wait(lk);
    }
    
    • 1
    • 2
    • 3

    这里使用while,而不是if,借此可以避免“虚假唤醒”问题。其次,生产者(见下面一段代码)notify_one()时,如果没有线程wait在条件变量上(线程池较忙),此时也无妨。因为当某个线程执行完task后再次开始循环会检车任务队列是否为空,若不为空直接跳过wait开始执行task。

    再看:

    auto task = std::move(pool->tasks_.front());
    pool->tasks_.pop();
    lk.unlock();
    pool->doTaskNum_.fetch_add(1);
    task();
    pool->doTaskNum_.fetch_sub(1);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里会在执行task之前手动unlock,因为task可能往往比较耗时,而task之间是解耦的,不需要在执行task时线程互斥,这样实际上并没有发挥多线程并发执行任务的功能。因此在这里unlock可以使被notify但还没拿到锁的线程拿到锁去取它的task。

    再者:

    template<class F> // 完美转发需要用模板
    void addTask(F&& t) {
        {
            std::unique_lock<std::mutex> lk(pool_->mtx_);
            pool_->tasks_.push(std::forward<F>(t));
        }
        pool_->cond_.notify_one();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这里使用完美转发,完美转发需要配合模板推导规则,因此这里采用函数模板

    测试代码

    #include "threadpoolv2.h"
    #include 
    #include 
    #include 
    #include 
    
    using namespace std;
    
    void task1() {
        cout << "task1 done!" << endl;
    }
    
    void task2() {
        cout << "task2 done!" << endl;
    }
    
    void task3() {
        cout << "task3 done!" << endl;
    }
    
    void task4() {
        cout << "task4 done!" << endl;
    }
    
    int main(int argc, char** argv) {
        auto p = new ThreadPoolV2(4);  
        thread([p](){
            for(int i = 0; i < 5; ++i) {
                p->addTask(bind(task1));
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
        }).detach();
        thread([p](){
            for(int i = 0; i < 5; ++i) {
                p->addTask(bind(task2));
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
        }).detach();
        thread([p](){
            for(int i = 0; i < 5; ++i) {
                p->addTask(bind(task3));
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
        }).detach();
        thread([p](){
            for(int i = 0; i < 5; ++i) {
                p->addTask(bind(task4));
                std::this_thread::sleep_for(std::chrono::milliseconds(1));
            }
        }).detach();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        return 0;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    2023.3.10重写

    #ifndef THREAD_POOL_H
    #define THREAD_POOL_H
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #define THREAD_NNUMBER 4
    #define MAX_TASK_NUMBER 1024
    typedef std::function<void(void)> Task;
    class ThreadPool {
    public:
        static ThreadPool* getInstance() {
            if (threadPool_ == nullptr) {
                std::unique_lock<std::mutex> lk(singleMutex_);
                if (threadPool_ == nullptr) {
                    threadPool_ = new ThreadPool(THREAD_NNUMBER);
                }
            }
            return threadPool_;
        }
    
        void addTask(Task t) {
            std::unique_lock<std::mutex> lk(mtx_);
            while (queue_.size() >= MAX_TASK_NUMBER) {
                condProductor_.wait(lk);
            }
            queue_.push(t);
            lk.unlock();
            condConsumer_.notify_one();
        }
    
        void stop() {
            std::unique_lock<std::mutex> lk(mtx_);
            run_.store(false);
            condConsumer_.notify_all();
        }
    
    private:
        ThreadPool(uint32_t threadNum){
            threadNum_ = threadNum;
            run_.store(true);
            for (int i = 0; i < threadNum_; ++i) {
                threadVec_.push_back(
                    std::thread([this]() {
                        while (run_.load()) {
                            std::unique_lock<std::mutex> lk(mtx_);
                            while (queue_.empty()) {
                                condConsumer_.wait(lk);
                                if (!run_.load()) {
                                    break;
                                }
                            }
                            Task task = queue_.front();
                            queue_.pop();
                            condProductor_.notify_one();
                            lk.unlock();
                            task();
                        }
                    })
                );
                threadVec_.back().detach(); 
            }
        };
        std::queue<Task> queue_;
        std::vector<std::thread> threadVec_;
        static ThreadPool* threadPool_;
        std::mutex mtx_;
        
        static std::mutex singleMutex_;
        std::condition_variable condConsumer_;
        std::condition_variable condProductor_;
        uint32_t threadNum_;
        std::atomic<bool> run_;
    };
    
    ThreadPool* ThreadPool::threadPool_ = nullptr;
    std::mutex ThreadPool::singleMutex_;
    
    
    #endif
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
  • 相关阅读:
    Rust 语法
    ESP32S3在VScode中使用USB口调试
    Vue3学习(二十二)- 保存文档内容
    /bin/bash: Resource temporarily unavailable
    并查集-合并集合
    分开的两个程序使用共同的mysql,一端更新了表,另一端怎么及时更新缓存,使用mybatis
    92. 反转链表 II【链表】(链表内指定区间反转)
    iOS 15.5 被曝“偷跑”流量?苹果:建议恢复出厂设置
    猴子也能学会的jQuery第五期——jQuery样式操作
    17:00面试,17:09就出来了 ,问的实在是太...
  • 原文地址:https://blog.csdn.net/weixin_43145941/article/details/127771319