• C++实现一个线程池


    一、为什么使用线程池

    大家都知道C++支持多线程开发,也就是支持多个任务并行运行,我们也知道线程的生命周期中包括创建、就绪、运行、阻塞、销毁等阶段,所以如果要执行的任务很多,每个任务都需要一个线程的话,那么频繁的创建、销毁线程会比较耗性能。

    有了线程池就不用创建更多的线程来完成任务,它可以:

    降低资源的消耗,通过重复利用已经创建的线程降低线程创建和销毁造成的消耗。

    提高相应速度,当任务到达的时候,任务可以不需要等到线程创建就能立刻执行。

    提高线程的可管理性,线程是稀缺资源,使用线程池可以统一的分配、调优和监控。

    二、线程池的原理

    通俗的讲,线程池就是一个线程集合,里面已经提前创建好了若干个线程,当需要线程的时候到线程集合里获取一个即可,这样省去了创建线程的时间,当然也省去了系统回收线程的时间,当线程池里的线程都被使用了后,只能阻塞等待了,等待获取线程池后被释放的线程。

    线程池主要包括任务队列,线程队列,线程池类,线程池管理类和任务类等组成部分。当线程池提交一个任务到线程池后,执行流程如下:

    线程池先判断核心线程池里面的线程是否都在执行任务。如果不是都在执行任务,则创建一个新的工作线程来执行任务。如果核心线程池中的线程都在执行任务,则判断工作队列是否已满。如果工作队列没有满,则将新提交的任务存储到这个工作队列中,如果工作队列满了,线程池则判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理 ,也就是拒接策略。
    一句话:管理一个任务队列,一个线程队列,然后每次去一个任务分配给一个线程去做,循环往复。

    三、代码实现

    有什么问题?线程池一般是要复用线程,所以如果是取一个task分配给某一个thread,执行完之后再重新分配,在语言层面这是基本不能实现的:C++的thread都是执行一个固定的task函数,执行完之后线程也就结束了。所以该如何实现task和thread的分配呢?

    让每一个thread创建后,就去执行调度函数:循环获取task,然后执行。

    这个循环该什么时候停止呢?

    很简单,当线程池停止使用时,循环停止。

    这样一来,就保证了thread函数的唯一性,而且复用线程执行task。

    总结一下,我们的线程池的主要组成部分有二:

    • 任务队列(Task Queue)
    • 线程池(Thread Pool)

    1. #ifndef THREAD_POOL_H
    2. #define THREAD_POOL_H
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. #include
    12. class ThreadPool {
    13. public:
    14. ThreadPool(size_t); //构造函数
    15. template<class F, class... Args> //类模板
    16. auto enqueue(F&& f, Args&&... args)->std::future<decltype(func(args...))>;//任务入队
    17. ~ThreadPool(); //析构函数
    18. private:
    19. std::vector< std::thread > workers; //线程队列,每个元素为一个Thread对象
    20. std::queue< std::function<void()> > tasks; //任务队列,每个元素为一个函数对象
    21. std::mutex queue_mutex; //互斥量
    22. std::condition_variable condition; //条件变量
    23. bool stop; //停止
    24. };
    25. // 构造函数,把线程插入线程队列,插入时调用embrace_back(),用匿名函数lambda初始化Thread对象
    26. inline ThreadPool::ThreadPool(size_t threads) : stop(false){
    27. for(size_t i = 0; i
    28. workers.emplace_back(
    29. [this]
    30. {
    31. for(;;)
    32. {
    33. // task是一个函数类型,从任务队列接收任务
    34. std::function<void()> task;
    35. {
    36. //给互斥量加锁,锁对象生命周期结束后自动解锁
    37. std::unique_lock lock(this->queue_mutex);
    38. //(1)当匿名函数返回false时才阻塞线程,阻塞时自动释放锁。
    39. //(2)当匿名函数返回true且受到通知时解阻塞,然后加锁。
    40. this->condition.wait(lock,[this]{ return this->stop || !this->tasks.empty(); });
    41. if(this->stop && this->tasks.empty())
    42. return;
    43. //从任务队列取出一个任务
    44. task = std::move(this->tasks.front());
    45. this->tasks.pop();
    46. } // 自动解锁
    47. task(); // 执行这个任务
    48. }
    49. }
    50. );
    51. }
    52. // 添加新的任务到任务队列
    53. template<class F, class... Args>
    54. auto ThreadPool::enqueue(F&& f, Args&&... args)->std::future<decltype(func(args...))>
    55. {
    56. // 获取函数返回值类型
    57. using return_type = decltype(func(args...));
    58. // 创建一个指向任务的智能指针
    59. auto task = std::make_shared< std::packaged_task<return_type()> >(
    60. std::bind(std::forward(f), std::forward(args)...)
    61. );
    62. std::future res = task->get_future();
    63. {
    64. std::unique_lock lock(queue_mutex); //加锁
    65. if(stop)
    66. throw std::runtime_error("enqueue on stopped ThreadPool");
    67. tasks.emplace([task](){ (*task)(); }); //把任务加入队列
    68. } //自动解锁
    69. condition.notify_one(); //通知条件变量,唤醒一个线程
    70. return res;
    71. }
    72. // 析构函数,删除所有线程
    73. inline ThreadPool::~ThreadPool()
    74. {
    75. {
    76. std::unique_lock lock(queue_mutex);
    77. stop = true;
    78. }
    79. condition.notify_all();
    80. for(std::thread &worker: workers)
    81. worker.join();
    82. }
    83. #endif

    使用

    1. #include
    2. #include
    3. #include "ThreadPool.h"
    4. void func()
    5. {
    6. std::this_thread::sleep_for(std::chrono::milliseconds(100));
    7. std::cout<<"worker thread ID:"<get_id()<
    8. }
    9. int main()
    10. {
    11. ThreadPool pool(4);
    12. while(1)
    13. {
    14. pool.enqueue(func);
    15. }
    16. }

    打印

    另一种实现,

    其中线程池类 ThreadPool 维护了一个任务队列 tasks_,并且可以动态添加任务到队列中。线程池管理类 ThreadPoolManager 提供了获取线程池实例的接口,使用时只需要通过 ThreadPoolManager::getThreadPool() 获取线程池实例,然后调用 enqueue 方法添加任务即可 

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. using namespace std;
    8. // 任务类
    9. class Task {
    10. public:
    11. Task(function<void()> f) : func(f) {}
    12. void operator()() { func(); }
    13. private:
    14. function<void()> func;
    15. };
    16. // 线程池类
    17. class ThreadPool {
    18. public:
    19. ThreadPool(int size) : stop(false) {
    20. for (int i = 0; i < size; ++i) {
    21. threads.emplace_back([this] {
    22. while (true) {
    23. function<void()> task;
    24. {
    25. unique_lock lock(mutex_);
    26. condition_.wait(lock, [this] { return stop || !tasks_.empty(); });
    27. if (stop && tasks_.empty()) return;
    28. task = move(tasks_.front());
    29. tasks_.pop();
    30. }
    31. task();
    32. }
    33. });
    34. }
    35. }
    36. ~ThreadPool() {
    37. {
    38. unique_lock lock(mutex_);
    39. stop = true;
    40. }
    41. condition_.notify_all();
    42. for (auto& thread : threads) {
    43. thread.join();
    44. }
    45. }
    46. template<typename F>
    47. void enqueue(F&& f) {
    48. {
    49. unique_lock lock(mutex_);
    50. tasks_.emplace(forward(f));
    51. }
    52. condition_.notify_one();
    53. }
    54. private:
    55. vector threads;
    56. queuevoid()>> tasks_;
    57. mutex mutex_;
    58. condition_variable condition_;
    59. bool stop;
    60. };
    61. // 线程池管理类
    62. class ThreadPoolManager {
    63. public:
    64. static ThreadPool& getThreadPool() {
    65. static ThreadPool threadPool(thread::hardware_concurrency());
    66. return threadPool;
    67. }
    68. };
    69. int main() {
    70. ThreadPool& threadPool = ThreadPoolManager::getThreadPool();
    71. for (int i = 0; i < 10; ++i) {
    72. threadPool.enqueue(Task([] { cout << "Hello, World!" << endl; }));
    73. }
    74. return 0;
    75. }

    参考:

    基于C++11实现线程池 - 知乎

    C++实现线程池_蓬莱道人的博客-CSDN博客_c++ 线程池

    C++实现线程池_晓枫寒叶的博客-CSDN博客_c++ 线程池
    C++17future类+可变参模板实现线程池_刚入门的代码spa技师的博客-CSDN博客_c++17 可变参

  • 相关阅读:
    UDS入门至精通系列:Service 27
    C++程序员入门怎么学?
    虚拟机安装 | 远程连接服务器
    DOM--页面渲染流程
    【Spring Boot 】JPA 的基本使用
    二次开发真的很难吗?低代码平台有话说
    在 CelebA 数据集上训练的 PyTorch 中的基本变分自动编码器
    Ps:选区的布尔运算
    1.AUTOSAR的架构及方法论
    gitlab 通过变量连接自建K8S
  • 原文地址:https://blog.csdn.net/sinat_31608641/article/details/126911649