• 多线程消息处理(支持动态调整线程数)


    MultiThreadEx.h

    1. #pragma once
    2. #include
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. using TaskFun = std::function<void(void)>;
    11. class CMultiThreadEx {
    12. public:
    13. CMultiThreadEx();
    14. ~CMultiThreadEx();
    15. bool start();
    16. void stop();
    17. bool post(const TaskFun& task_fun);
    18. private:
    19. void exec();
    20. void add_thread();
    21. void del_thread();
    22. void del_thread_oper();
    23. private:
    24. std::atomic_bool exist_;
    25. std::mutex thread_mutex_;
    26. std::map> threads_;
    27. std::list threads_need_del_;
    28. std::atomic<int32_t> thread_num_;
    29. std::atomic<int32_t> task_num_;
    30. std::queue tasks_;
    31. std::mutex task_mutex_;
    32. std::condition_variable condition_;
    33. int32_t min_thead_num_;
    34. int32_t max_thead_num_;
    35. int32_t task_avg_num_;
    36. };

    MultiThreadEx.cpp:

    1. #include "MultiThreadEx.h"
    2. #include
    3. CMultiThreadEx::CMultiThreadEx()
    4. : exist_(false)
    5. , min_thead_num_(1)
    6. , max_thead_num_(1024)
    7. , task_avg_num_(10)
    8. {
    9. }
    10. CMultiThreadEx::~CMultiThreadEx() {}
    11. bool CMultiThreadEx::start() {
    12. if (exist_) {
    13. return false;
    14. }
    15. exist_ = true;
    16. for (size_t i = 0; i < min_thead_num_; i++)
    17. {
    18. add_thread();
    19. }
    20. return true;
    21. }
    22. void CMultiThreadEx::stop() {
    23. if (!exist_) {
    24. return;
    25. }
    26. exist_ = false;
    27. condition_.notify_all();
    28. for (auto& iter_thread : threads_)
    29. {
    30. if (iter_thread.second->joinable()) {
    31. iter_thread.second->join();
    32. }
    33. }
    34. return;
    35. }
    36. bool CMultiThreadEx::post(const TaskFun& task_fun) {
    37. if (!exist_) {
    38. return false;
    39. }
    40. {
    41. std::lock_guard lock(task_mutex_);
    42. tasks_.push(task_fun);
    43. ++task_num_;
    44. }
    45. if (thread_num_ < min_thead_num_
    46. || task_num_ > thread_num_ * task_avg_num_)
    47. {
    48. add_thread();
    49. }
    50. condition_.notify_one();
    51. return true;
    52. }
    53. void CMultiThreadEx::exec() {
    54. while (exist_)
    55. {
    56. TaskFun task = nullptr;
    57. del_thread_oper();
    58. {
    59. std::unique_lock lock(task_mutex_);
    60. if (tasks_.empty()) {
    61. if (thread_num_ > min_thead_num_) {
    62. auto ret =condition_.wait_for(lock, std::chrono::milliseconds(1000));
    63. if (std::cv_status::timeout == ret)
    64. {
    65. // 退出线程
    66. del_thread();
    67. break;
    68. }
    69. if (!exist_) {
    70. break;
    71. }
    72. }
    73. condition_.wait(lock);
    74. }
    75. if (!exist_) {
    76. break;
    77. }
    78. if (!tasks_.empty())
    79. {
    80. task = tasks_.front();
    81. tasks_.pop();
    82. --task_num_;
    83. }
    84. }
    85. if (nullptr != task) {
    86. task();
    87. }
    88. }
    89. return;
    90. }
    91. void CMultiThreadEx::add_thread() {
    92. std::lock_guard lock(thread_mutex_);
    93. std::shared_ptr sptr_thread =
    94. std::make_shared(std::thread(std::bind(&CMultiThreadEx::exec, this)));
    95. threads_.insert(std::make_pair(sptr_thread->get_id(), sptr_thread));
    96. ++thread_num_;
    97. }
    98. void CMultiThreadEx::del_thread(){
    99. std::lock_guard lock(thread_mutex_);
    100. auto id = std::this_thread::get_id();
    101. auto iter_thread = threads_.find(id);
    102. if (threads_.end() != iter_thread) {
    103. threads_need_del_.push_back(id);
    104. --thread_num_;
    105. }
    106. }
    107. void CMultiThreadEx::del_thread_oper(){
    108. std::lock_guard lock(thread_mutex_);
    109. if (threads_need_del_.empty())
    110. {
    111. return;
    112. }
    113. for (auto& id : threads_need_del_)
    114. {
    115. auto iter_thread = threads_.find(id);
    116. if (threads_.end() != iter_thread){
    117. if (iter_thread->second->joinable()) {
    118. iter_thread->second->join();
    119. }
    120. threads_.erase(id);
    121. }
    122. }
    123. }

    main:

    1. #include
    2. #include "MultiThreadEx.h"
    3. int main() {
    4. std::mutex m;
    5. CMultiThreadEx t;
    6. t.start();
    7. auto fun = [&]()
    8. {
    9. std::lock_guard lock(m);
    10. static int count = 0;
    11. std::cout << "Count: " << ++count << "thread-id: " << std::this_thread::get_id() << std::endl;
    12. std::this_thread::sleep_for(std::chrono::seconds(1));
    13. };
    14. for (int i = 0; i < 20; ++i)
    15. {
    16. t.post(fun);
    17. //std::this_thread::sleep_for(std::chrono::microseconds(500));
    18. }
    19. getchar();
    20. t.post(fun);
    21. getchar();
    22. t.stop();
    23. return 0;
    24. }

  • 相关阅读:
    【图形学】28 更多的透明等式和参数
    检查代码混淆率proguard-rate
    【Python脚本进阶】2.5、编写自己的0day概念验证代码(终)
    项目开发过程中,成员提离职,怎么办?
    Java使用opencv实现人脸识别、人脸比对
    C++设计模式结构型模式———桥接模式
    C#WPF用户控件及自定义控件实例
    【学习笔记】 字符串基础 : 后缀自动机(基础篇)
    6. Python数据类型之浮点数
    社区动态——恭喜海豚调度中国区用户组新晋 9 枚“社群管理员”
  • 原文地址:https://blog.csdn.net/zhaodongdong2012/article/details/134483967