c++11虽然加入了线程库thread,然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现。比如备受期待的网络库至今标准库里还没有支持,常用acl或asio替代。鸿蒙OpenHarmony源码中的网络栈模块部分,也是十分漂亮的实现,值得学习研究。
c++的ThreadPool实现,网上有很多个版本,文章的末尾就有两种不同的实现。然而经过对比发现,还是OpenHarmony源码的实现最优雅。代码简练,且直观易懂。写的真漂亮!只是使用起来稍麻烦些,比如不支持lambda的写法。后续可基于此改造,使其支持lambda函数的调用。
简单来说就是有一堆已经创建好的线程(最大数目一定),初始时他们都处于空闲状态。当有新的任务进来,从线程池中取出一个空闲的线程处理任务然后当任务处理完成之后,该线程被重新放回到线程池中,供其他的任务使用。当线程池中的线程都在处理任务时,就没有空闲线程供使用,此时,若有新的任务产生,只能等待线程池中有线程结束任务空闲才能执行。
线程本来就是可重用的资源,不需要每次使用时都进行初始化。因此可以采用有限的线程个数处理无限的任务。既可以提高速度和效率,又降低线程频繁创建的开销。比如要异步干的活,就没必要等待。丢到线程池里处理,结果在回调中处理。频繁执行的异步任务,若每次都创建线程势必造成不小的开销。
该网络模块的github地址:communication_netstack: 网络协议栈
harmonyos\communication_netstack-master\utils\common_utils\include\thread_pool.h
网络协议栈模块作为电话子系统可裁剪部件,主要分为HTTP和socket模块。
网络协议栈模块的源码结构:
- /foundation/communication/netstack
- ├─figures # 架构图
- ├─frameworks # API实现
- │ └─js # JS API实现
- │ ├─builtin # 小型系统JS API实现
- │ └─napi # 标准系统JS API实现
- │ ├─http # http API
- │ ├─socket # socket API
- │ └─websocket # websocket API
- ├─interfaces # JS 接口定义
- ├─test # 测试
- └─utils # 工具
图 socket接口架构图
- /*
- * Copyright (c) 2022 Huawei Device Co., Ltd.
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- #ifndef NETSTACK_THREAD_POOL
- #define NETSTACK_THREAD_POOL
-
- #include
- #include
- #include
- #include
- #include
-
- namespace OHOS::NetStack {
- template <typename Task, const size_t DEFAULT_THREAD_NUM, const size_t MAX_THREAD_NUM> class ThreadPool {
- public:
- /**
- * disallow default constructor
- */
- ThreadPool() = delete;
-
- /**
- * disallow copy and move
- */
- ThreadPool(const ThreadPool &) = delete;
-
- /**
- * disallow copy and move
- */
- ThreadPool &operator=(const ThreadPool &) = delete;
-
- /**
- * disallow copy and move
- */
- ThreadPool(ThreadPool &&) = delete;
-
- /**
- * disallow copy and move
- */
- ThreadPool &operator=(ThreadPool &&) = delete;
-
- /**
- * make DEFAULT_THREAD_NUM threads
- * @param timeout if timeout and runningThreadNum_ < DEFAULT_THREAD_NUM, the running thread should be terminated
- */
- explicit ThreadPool(uint32_t timeout) : timeout_(timeout), idleThreadNum_(0), needRun_(true)
- {
- for (int i = 0; i < DEFAULT_THREAD_NUM; ++i) {
- std::thread([this] { RunTask(); }).detach();
- }
- }
-
- /**
- * if ~ThreadPool, terminate all thread
- */
- ~ThreadPool()
- {
- // set needRun_ = false, and notify all the thread to wake and terminate
- needRun_ = false;
- while (runningNum_ > 0) {
- needRunCondition_.notify_all();
- }
- }
-
- /**
- * push it to taskQueue_ and notify a thread to run it
- * @param task new task to Execute
- */
- void Push(const Task &task)
- {
- PushTask(task);
-
- if (runningNum_ < MAX_THREAD_NUM && idleThreadNum_ == 0) {
- std::thread([this] { RunTask(); }).detach();
- }
-
- needRunCondition_.notify_all();
- }
-
- private:
- bool IsQueueEmpty()
- {
- std::lock_guard
guard(mutex_) ; - return taskQueue_.empty();
- }
-
- bool GetTask(Task &task)
- {
- std::lock_guard
guard(mutex_) ; -
- // if taskQueue_ is empty, means timeout
- if (taskQueue_.empty()) {
- return false;
- }
-
- // if run to this line, means that taskQueue_ is not empty
- task = taskQueue_.top();
- taskQueue_.pop();
- return true;
- }
-
- void PushTask(const Task &task)
- {
- std::lock_guard
guard(mutex_) ; - taskQueue_.push(task);
- }
-
- class NumWrapper {
- public:
- NumWrapper() = delete;
-
- explicit NumWrapper(std::atomic<uint32_t> &num) : num_(num)
- {
- ++num_;
- }
-
- ~NumWrapper()
- {
- --num_;
- }
-
- private:
- std::atomic<uint32_t> &num_;
- };
-
- void Sleep()
- {
- std::mutex needRunMutex;
- std::unique_lock
lock(needRunMutex) ; -
- /**
- * if the thread is waiting, it is idle
- * if wake up, this thread is not idle:
- * 1 this thread should return
- * 2 this thread should run task
- * 3 this thread should go to next loop
- */
- NumWrapper idleWrapper(idleThreadNum_);
- (void)idleWrapper;
-
- needRunCondition_.wait_for(lock, std::chrono::seconds(timeout_),
- [this] { return !needRun_ || !IsQueueEmpty(); });
- }
-
- void RunTask()
- {
- NumWrapper runningWrapper(runningNum_);
- (void)runningWrapper;
-
- while (needRun_) {
- Task task;
- if (GetTask(task)) {
- task.Execute();
- continue;
- }
-
- Sleep();
-
- if (!needRun_) {
- return;
- }
-
- if (GetTask(task)) {
- task.Execute();
- continue;
- }
-
- if (runningNum_ > DEFAULT_THREAD_NUM) {
- return;
- }
- }
- }
-
- private:
- /**
- * other thread put a task to the taskQueue_
- */
- std::mutex mutex_;
- std::priority_queue
taskQueue_; - /**
- * 1 terminate the thread if it is idle for timeout_ seconds
- * 2 wait for the thread started util timeout_
- * 3 wait for the thread notified util timeout_
- * 4 wait for the thread terminated util timeout_
- */
- uint32_t timeout_;
- /**
- * if idleThreadNum_ is zero, make a new thread
- */
- std::atomic<uint32_t> idleThreadNum_;
- /**
- * when ThreadPool object is deleted, wait until runningNum_ is zero.
- */
- std::atomic<uint32_t> runningNum_;
- /**
- * when ThreadPool object is deleted, set needRun_ to false, mean that all thread should be terminated
- */
- std::atomic_bool needRun_;
- std::condition_variable needRunCondition_;
- };
- } // namespace OHOS::NetStack
- #endif /* NETSTACK_THREAD_POOL */
从这份源码里,可以看到queue是如何安全的被使用的。之前博主有篇文章,记录了多线程下使用queue造成的崩溃问题。链接在这里:c++的queue在多线程下崩溃原因分析_特立独行的猫a的博客-CSDN博客_c++ queue 多线程
通过华为鸿蒙源码的学习研究,可以发现queue的安全使用方式top和pop以及empty的判断都是使用了 std::lock_guard互斥量原子操作的保护。也证实了博主上篇文章分析中提到的,类似队列这种操作,要确保在一个原子操作内完成,不可被打断。试想一个线程刚好pop,另外一个线程却刚要执行top会怎样?那样逻辑就错了。
这份源码的实现,没有使用一些较难理解的语法,基本上就是使用线程+优先级队列实现的。提前创建指定数目的线程,每次取一个任务并执行。任务队列负责存放线程需要处理的任务,工作线程负责从任务队列中取出和运行任务,可以看成是一个生产者和多个消费者的模型。
以下是该版本thread_pool的简单使用示例,可以看到使用稍微麻烦了些。必须定义格式如下的task类,必须实现operator<和Execute()方法。
需要注意的是,若有多个同一个实现的task实例放入thread_pool,Execute()方法内的逻辑可是在多线程环境下的,需注意多线程下变量访问的保护。如同以下示例,同一个task类的多个实例放入了thread_pool,不加std::lock_guard打印出的显示是乱的。
- #include "doctest.h"
- DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_BEGIN
- #include
- DOCTEST_MAKE_STD_HEADERS_CLEAN_FROM_WARNINGS_ON_WALL_END
-
- //#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
- //#define DOCTEST_CONFIG_DISABLE
- #include
- #include
- #include "thread_pool.h"
- //
- // Created by Administrator on 2022/8/10.
- //
- class Task {
- public:
- Task() = default;
-
- explicit Task(std::string context){
- mContext = context;
- }
-
- bool operator<(const Task &e) const{
- return priority_ < e.priority_;
- }
-
- void Execute(){
- std::lock_guard
guard(mutex_) ; - std::cout << "task is execute,name is:"<
- }
-
- public:
- uint32_t priority_;
- private:
- std::string mContext;
- static std::mutex mutex_;
- };
-
-
- #define DEFAULT_THREAD_NUM 3
- #define MAX_THREAD_NUM 6
- #define TIME_OUT 500
-
- std::mutex Task::mutex_;
-
- static int myTest(){
- static OHOS_NetStack::ThreadPool
threadPool_(TIME_OUT) ; -
- Task task1("name_1");
- Task task2("name_2");
- Task task3("name_3");
- Task task4("name_4");
- threadPool_.Push(task1);
- threadPool_.Push(task2);
- threadPool_.Push(task3);
- threadPool_.Push(task4);
-
- //system("pause");
-
- return 0;
- }
-
- TEST_CASE("threadPool simple use example, test by doctest unit tool") {
- myTest();
- }
结果输出:
引用