• c++的ThreadPool,OpenHarmony源码实现版赏析和使用


    前言

    c++11虽然加入了线程库thread,然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现。比如备受期待的网络库至今标准库里还没有支持,常用acl或asio替代。鸿蒙OpenHarmony源码中的网络栈模块部分,也是十分漂亮的实现,值得学习研究。

    c++的ThreadPool实现,网上有很多个版本,文章的末尾就有两种不同的实现。然而经过对比发现,还是OpenHarmony源码的实现最优雅。代码简练,且直观易懂。写的真漂亮!只是使用起来稍麻烦些,比如不支持lambda的写法。后续可基于此改造,使其支持lambda函数的调用。

    关于线程池

    简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态。当有新的任务进来,从线程池中取出一个空闲的线程处理任务然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用。当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行。

    线程池优点

    线程本来就是可重用的资源,不需要每次使用时都进行初始化。因此可以采用有限的线程个数处理无限的任务。既可以提高速度和效率,又降低线程频繁创建的开销。比如要异步干的活,就没必要等待。丢到线程池里处理,结果在回调中处理。频繁执行的异步任务,若每次都创建线程势必造成不小的开销。

    源码位置

    OpenHarmony,智能终端设备操作系统的框架和平台

    该网络模块的github地址:communication_netstack: 网络协议栈

    harmonyos\communication_netstack-master\utils\common_utils\include\thread_pool.h

    网络协议栈模块作为电话子系统可裁剪部件,主要分为HTTP和socket模块。

    网络协议栈模块的源码结构: 

    1. /foundation/communication/netstack
    2. ├─figures # 架构图
    3. ├─frameworks # API实现
    4. │ └─js # JS API实现
    5. │ ├─builtin # 小型系统JS API实现
    6. │ └─napi # 标准系统JS API实现
    7. │ ├─http # http API
    8. │ ├─socket # socket API
    9. │ └─websocket # websocket API
    10. ├─interfaces # JS 接口定义
    11. ├─test # 测试
    12. └─utils # 工具

     socket接口架构图 

    ThreadPool源码

    1. /*
    2. * Copyright (c) 2022 Huawei Device Co., Ltd.
    3. * Licensed under the Apache License, Version 2.0 (the "License");
    4. * you may not use this file except in compliance with the License.
    5. * You may obtain a copy of the License at
    6. *
    7. * http://www.apache.org/licenses/LICENSE-2.0
    8. *
    9. * Unless required by applicable law or agreed to in writing, software
    10. * distributed under the License is distributed on an "AS IS" BASIS,
    11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12. * See the License for the specific language governing permissions and
    13. * limitations under the License.
    14. */
    15. #ifndef NETSTACK_THREAD_POOL
    16. #define NETSTACK_THREAD_POOL
    17. #include
    18. #include
    19. #include
    20. #include
    21. #include
    22. namespace OHOS::NetStack {
    23. template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
    24. public:
    25. /**
    26. * disallow default constructor
    27. */
    28. ThreadPool() = delete;
    29. /**
    30. * disallow copy and move
    31. */
    32. ThreadPool(const ThreadPool &) = delete;
    33. /**
    34. * disallow copy and move
    35. */
    36. ThreadPool &operator=(const ThreadPool &) = delete;
    37. /**
    38. * disallow copy and move
    39. */
    40. ThreadPool(ThreadPool &&) = delete;
    41. /**
    42. * disallow copy and move
    43. */
    44. ThreadPool &operator=(ThreadPool &&) = delete;
    45. /**
    46. * make DEFAULT_THREAD_NUM threads
    47. * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
    48. */
    49. explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
    50. {
    51. for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
    52. std::thread([this] { RunTask(); }).detach();
    53. }
    54. }
    55. /**
    56. * if ~ThreadPool, terminate all thread
    57. */
    58. ~ThreadPool()
    59. {
    60. // set needRun_ = false, and notify all the thread to wake and terminate
    61. needRun_ = false;
    62. while (runningNum_ > 0) {
    63. needRunCondition_.notify_all();
    64. }
    65. }
    66. /**
    67. * push it to taskQueue_ and notify a thread to run it
    68. * @param task new task to Execute
    69. */
    70. void Push(const Task &task)
    71. {
    72. PushTask(task);
    73. if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
    74. std::thread([this] { RunTask(); }).detach();
    75. }
    76. needRunCondition_.notify_all();
    77. }
    78. private:
    79. bool IsQueueEmpty()
    80. {
    81. std::lock_guard guard(mutex_);
    82. return taskQueue_.empty();
    83. }
    84. bool GetTask(Task &task)
    85. {
    86. std::lock_guard guard(mutex_);
    87. // if taskQueue_ is empty, means timeout
    88. if (taskQueue_.empty()) {
    89. return false;
    90. }
    91. // if run to this line, means that taskQueue_ is not empty
    92. task = taskQueue_.top();
    93. taskQueue_.pop();
    94. return true;
    95. }
    96. void PushTask(const Task &task)
    97. {
    98. std::lock_guard guard(mutex_);
    99. taskQueue_.push(task);
    100. }
    101. class NumWrapper {
    102. public:
    103. NumWrapper() = delete;
    104. explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
    105. {
    106. ++num_;
    107. }
    108. ~NumWrapper()
    109. {
    110. --num_;
    111. }
    112. private:
    113. std::atomic<uint32_t> &num_;
    114. };
    115. void Sleep()
    116. {
    117. std::mutex needRunMutex;
    118. std::unique_lock lock(needRunMutex);
    119. /**
    120. * if the thread is waiting, it is idle
    121. * if wake up, this thread is not idle:
    122. * 1 this thread should return
    123. * 2 this thread should run task
    124. * 3 this thread should go to next loop
    125. */
    126. NumWrapper idleWrapper(idleThreadNum_);
    127. (void)idleWrapper;
    128. needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
    129. [this] { return !needRun_ || !IsQueueEmpty(); });
    130. }
    131. void RunTask()
    132. {
    133. NumWrapper runningWrapper(runningNum_);
    134. (void)runningWrapper;
    135. while (needRun_) {
    136. Task task;
    137. if (GetTask(task)) {
    138. task.Execute();
    139. continue;
    140. }
    141. Sleep();
    142. if (!needRun_) {
    143. return;
    144. }
    145. if (GetTask(task)) {
    146. task.Execute();
    147. continue;
    148. }
    149. if (runningNum_ > DEFAULT_THREAD_NUM) {
    150. return;
    151. }
    152. }
    153. }
    154. private:
    155. /**
    156. * other thread put a task to the taskQueue_
    157. */
    158. std::mutex mutex_;
    159. std::priority_queue taskQueue_;
    160. /**
    161. * 1 terminate the thread if it is idle for timeout_ seconds
    162. * 2 wait for the thread started util timeout_
    163. * 3 wait for the thread notified util timeout_
    164. * 4 wait for the thread terminated util timeout_
    165. */
    166. uint32_t timeout_;
    167. /**
    168. * if idleThreadNum_ is zero, make a new thread
    169. */
    170. std::atomic<uint32_t> idleThreadNum_;
    171. /**
    172. * when ThreadPool object is deleted, wait until runningNum_ is zero.
    173. */
    174. std::atomic<uint32_t> runningNum_;
    175. /**
    176. * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
    177. */
    178. std::atomic_bool needRun_;
    179. std::condition_variable needRunCondition_;
    180. };
    181. } // namespace OHOS::NetStack
    182. #endif /* NETSTACK_THREAD_POOL */

    源码赏析

    从这份源码里,可以看到queue是如何安全的被使用的。之前博主有篇文章,记录了多线程下使用queue造成的崩溃问题。链接在这里:c++的queue在多线程下崩溃原因分析_特立独行的猫a的博客-CSDN博客_c++ queue 多线程

    通过华为鸿蒙源码的学习研究,可以发现queue的安全使用方式top和pop以及empty的判断都是使用了 std::lock_guard互斥量原子操作的保护。也证实了博主上篇文章分析中提到的,类似队列这种操作,要确保在一个原子操作内完成,不可被打断。试想一个线程刚好pop,另外一个线程却刚要执行top会怎样?那样逻辑就错了。 

    这份源码的实现,没有使用一些较难理解的语法,基本上就是使用线程+优先级队列实现的。提前创建指定数目的线程,每次取一个任务并执行。任务队列负责存放线程需要处理的任务,工作线程负责从任务队列中取出和运行任务,可以看成是一个生产者和多个消费者的模型。

    ThreadPool使用

    以下是该版本thread_pool的简单使用示例,可以看到使用稍微麻烦了些。必须定义格式如下的task类,必须实现operator<和Execute()方法。

    需要注意的是,若有多个同一个实现的task实例放入thread_pool,Execute()方法内的逻辑可是在多线程环境下的,需注意多线程下变量访问的保护。如同以下示例,同一个task类的多个实例放入了thread_pool,不加std::lock_guard打印出的显示是乱的。

    1. #include "doctest.h"
    2. DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN
    3. #include
    4. DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END
    5. //#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
    6. //#define DOCTEST_CONFIG_DISABLE
    7. #include
    8. #include
    9. #include "thread_pool.h"
    10. //
    11. // Created by Administrator on 2022/8/10.
    12. //
    13. class Task {
    14. public:
    15. Task() = default;
    16. explicit Task(std::string context){
    17. mContext = context;
    18. }
    19. bool operator<(const Task &e) const{
    20. return priority_ < e.priority_;
    21. }
    22. void Execute(){
    23. std::lock_guard guard(mutex_);
    24. std::cout << "task is execute,name is:"<
    25. }
    26. public:
    27. uint32_t priority_;
    28. private:
    29. std::string mContext;
    30. static std::mutex mutex_;
    31. };
    32. #define DEFAULT_THREAD_NUM 3
    33. #define MAX_THREAD_NUM 6
    34. #define TIME_OUT 500
    35. std::mutex Task::mutex_;
    36. static int myTest(){
    37. static OHOS_NetStack::ThreadPool threadPool_(TIME_OUT);
    38. Task task1("name_1");
    39. Task task2("name_2");
    40. Task task3("name_3");
    41. Task task4("name_4");
    42. threadPool_.Push(task1);
    43. threadPool_.Push(task2);
    44. threadPool_.Push(task3);
    45. threadPool_.Push(task4);
    46. //system("pause");
    47. return 0;
    48. }
    49. TEST_CASE("threadPool simple use example, test by doctest unit tool") {
    50. myTest();
    51. }

    结果输出:

    引用

    c++11线程池的实现原理及回调函数的使用_特立独行的猫a的博客-CSDN博客_c++多线程回调

  • 相关阅读:
    c++的4中类型转换操作符(static_cast,reinterpret_cast,dynamic_cast,const_cast),RTTI
    【云原生 | 58】Docker三剑客之Docker Swarm中的调度器
    运筹说 第71期|论文速读之时间背包问题
    算法通关村第二关终于学会链表反转了
    Grafana监控安装和监控看板创建
    【offer拿到手软系列】面试小贴士
    微信小程序全局水印组件
    .NET周报【10月第2期 2022-10-17】
    Java学习----JUC包和信号量
    数据库数据转json字符串及ajax请求数据渲染
  • 原文地址:https://blog.csdn.net/qq8864/article/details/126266933