• 【C++并发编程】(九)线程池


    文章目录

    (九)线程池

    线程池是一种用于管理和复用多个线程的技术,可以提高多线程程序的性能和资源利用率。它的基本概念是将一组预先创建的线程放入一个池中,需要执行任务时,从池中获取空闲线程来处理任务,任务完成后,线程不会被销毁,而是返回池中等待下一个任务。

    线程池的核心组成部分包括:

    1. 任务队列:任务是线程池执行的基本工作单元。通常任务是可调用的对象,例如函数,函数指针、函数对象、lambda表达式等。 任务队列是一个线程安全的队列,用于存储等待执行的任务。当线程池中的工作线程空闲时,它们会从任务队列中取出任务并执行。

    2. 工作线程:工作线程负责执行分配给线程池的任务。这些线程在线程池初始化时被创建,并在整个线程池的生命周期中保持运行。

    3. 线程管理: 负责管理工作线程的创建和销毁。线程池在初始化时创建一定数量的工作线程,并在销毁时确保所有工作线程都正确结束。

    4. 任务提交接口: 提供一个接口,用于将任务提交到线程池中。这个接口可以接收各种可调用对象,并将它们添加到任务队列中。

    此外,线程池还需要使用同步机制(例如互斥锁和条件变量)来保护任务队列的访问,确保线程安全。

    下面使用C++实现一个简单的线程池:

    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    #include 
    
    // 线程池类
    class ThreadPool {
    private:
    
        // 1.工作线程(存储工作线程的容器)  
        std::vector<std::thread> workers;
        // 2 任务队列(用于存储待执行任务的队列)
        std::queue<std::function<void()>> tasks;
    
        std::mutex queue_mutex;  // 互斥锁用于保护任务队列的并发访问
        std::condition_variable condition;  // 条件变量用于线程间同步
        bool stop;   // 停止标志,用于控制线程池的关闭 
    public:
    	// 3 线程管理
        // 构造函数,初始化线程池并创建指定数量的工作线程
        ThreadPool(size_t threads);
        // 析构函数,停止所有工作线程
        ~ThreadPool();
    
        // 4 任务提交接口,接受可调用对象和其参数,返回一个 future 对象
        template<class F, class... Args>
        auto enqueue(F&& f, Args&&... args)-> std::future<std::invoke_result_t<F, Args...>>;
        
    };
    
    ThreadPool::ThreadPool(size_t threads): stop(false) {
        for (size_t i = 0; i < threads; ++i) {
            // ,创建了指定数量的工作线程,每个线程都会运行一个 lambda 表达式
            workers.emplace_back([this] {
                while(true) {
                    std::function<void()> task;
                    {
                        // 锁定互斥锁
                        std::unique_lock<std::mutex> lock(queue_mutex);
                        //  如果任务队列为空或线程池未停止,线程会等待条件变量通知
                        condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                        // 如果线程池停止且任务队列为空,退出循环
                        if (this->stop && this->tasks.empty())
                            return;
                        // 取出队列中的任务
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    // 执行任务
                    task();
                }
            });
        }
    }
    
    
    ThreadPool::~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true; // 设置停止标志
        }
        // 通知所有等待的任务线程
        condition.notify_all();
        // 等待所有工作线程结束
        for (std::thread &worker : workers)
            worker.join();
    }
    
    
    template<class F, class... Args>
    auto ThreadPool::enqueue(F&& f, Args&&... args)-> std::future<std::invoke_result_t<F, Args...>> {
        using return_type = std::invoke_result_t<F, Args...>;
        // 创建一个 packaged_task 对象,用于封装任务和捕获返回值
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        // 获取 future 对象,它将与 packaged_task 对象关联
        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            // 如果线程池已经停止,抛出异常
            if (stop)  throw std::runtime_error("enqueue on stopped ThreadPool");
            // 将任务添加到任务队列
            tasks.emplace([task]() { (*task)(); });
        }
        // 通知一个工作线程有新任务
        condition.notify_one();
        // 返回 future 对象,外部可以通过它获取任务结果
        return res;
    }
    
    
    int main() {
        // 创建一个包含4个线程的线程池
        ThreadPool pool(4);
    
        // 提交第一个任务,带有参数,返回值为 int; result1为std::future类型
        auto result1 = pool.enqueue([](int answer) { return answer; }, 42);
        // 提交第二个任务,无参数,返回值为 std::string; result2为std::future类型,
        auto result2 = pool.enqueue([]()  {
            std::this_thread::sleep_for(std::chrono::seconds(1));
            return std::string("Hello from thread");
        });
    
        // 获取并输出任务结果
        std::cout << "Result 1: " << result1.get() << std::endl; // Result 1: 42
    
        std::cout << "Result 2: " << result2.get() << std::endl;// Result 2: Hello from thread
    
        return 0;
    }
    
    1. 任务队列std::queue> tasks 是一个队列,用于存储等待执行的任务。

    2. 工作线程: 在构造函数中,创建了指定数量的工作线程,每个线程都会运行一个 lambda 表达式。在这个表达式中, 工作线程在无限循环中等待新任务: 每个工作线程在等待时,会尝试获取任务队列的互斥锁。 如果任务队列为空或线程池未停止,线程会等待条件变量通知。一旦有任务添加到任务队列中,或者接收到停止信号,线程会被唤醒,并执行后续的指令。

    3. 线程管理:工作线程在 ThreadPool::ThreadPool 构造函数中创建,所有工作线程被加入到 workers 向量中进行管理。每一个工作线程在 ThreadPool::~ThreadPool 析构函数中安全退出。 析构函数将 stop 标志设为 true,并通知所有等待的任务线程检查条件。 工作线程在收到停止信号且任务队列为空时退出循环并终止。

    4. 任务提交接口 template auto enqueue(F&& f, Args&&... args)-> std::future>; 是一个模板方法,用于提交任务。 该方法接受一个可调用对象 f 和参数 args,并返回一个 std::future 对象。当调用 enqueue 函数提交任务时: 使用 std::bindstd::packaged_task 将任务封装起来,然后获取与任务关联的 std::future 对象,以便在将来获取任务的返回值。 最后将封装好的任务添加到任务队列中并通知一个工作线程有新任务可执行。

    stop标志和队列的pushpopfront等操作在多线程环境中不是原子的,当多个线程同时尝试访问和修改stop标志或同一个任务队列时,可能会引发数据竞争和未定义的行为。互斥锁确保了在任何时候只有一个线程可以修改stop标志和队列。条件变量 condition 用于在任务队列为空时阻塞线程,并在有新任务时通知线程。

  • 相关阅读:
    JAVASE零基础到高级教程(1)------ 集成开发环境安装使用
    spring seccurity OAuth 2.0授权服务器工作流程
    vue3中通过ref获取子组件实例,取值为undefined
    Linux 网络巨型帧设置方法
    python家庭个人理财记账收支系统django558
    【windows】Windows电脑怎么卸载服务/删除服务?
    为了直播焊接,我准备了这些装备
    openssl命令行:RSA的用法-- 终极版
    【leetCode:剑指 Offer】20. 表示数值的字符串
    【EMQX】2.1.5 EMQ X 消息服务器功能列表
  • 原文地址:https://blog.csdn.net/weixin_44378835/article/details/138532250