• C++多线程学习06


    参考

    参考资料:恋恋风辰官方博客

    测试异步执行std::async

    1. #include
    2. #include
    3. #include
    4. #include
    5. // 模拟一个从数据库查询的操作
    6. std::string fetchDataFromDB(std::string query)
    7. {
    8. // 模拟一个耗时操作
    9. std::this_thread::sleep_for(std::chrono::seconds(5));
    10. return "Data is: " + query;
    11. }
    12. void use_async()
    13. {
    14. // async异步调用fetchDataFromDB
    15. std::future result_from_DB = std::async(std::launch::async, fetchDataFromDB, "Test Query");
    16. // 主线程做其他事情
    17. std::cout << "Do something else... \n";
    18. // 从future中获取数据
    19. std::string DB_Data = result_from_DB.get();
    20. std::cout << DB_Data << std::endl;
    21. std::cout << "Async test finished! \n";
    22. }
    23. int main()
    24. {
    25. // 1. 测试异步执行
    26. use_async();
    27. std::cout << "Finished!\n";
    28. }

    异步执行时,数据如果没有准备好,取数据时会阻塞。

    使用std::package_task打包任务

    1. int my_task()
    2. {
    3. std::this_thread::sleep_for(std::chrono::seconds(5));
    4. std::cout << "My Task run 5 seconds \n";
    5. return 42;
    6. }
    7. void use_package()
    8. {
    9. // 创建一个包装了任务的 std::packaged_task 对象
    10. // package_task和future一起,可以实现一个线程池操作
    11. std::packaged_task<int()> task(my_task);
    12. // 获取与任务有关的std::future对象, 代表一个异步操作结果
    13. std::future<int> result = task.get_future();
    14. // 在另一个线程上执行
    15. // package_task只支持移动构造
    16. std::thread t(std::move(task));
    17. t.detach(); // 后台执行,和变量t解耦,但是主线程依然是守护线程
    18. // 等待任务执行完毕,获取结果
    19. int value = result.get();
    20. std::cout << "The result is: " << value << std::endl;
    21. }
    22. int main()
    23. {
    24. // 2. 使用std::package_task打包任务
    25. use_package();
    26. std::cout << "Finished!\n";
    27. }

    std::promise获取值

    1. void set_value(std::promise<int> prom)
    2. {
    3. std::this_thread::sleep_for(std::chrono::seconds(5));
    4. prom.set_value(10);
    5. std::cout << "promise set value success. \n";
    6. }
    7. void use_promise()
    8. {
    9. std::promise<int> prom;
    10. // promise与future对象关联
    11. std::future<int> fut = prom.get_future();
    12. // 新线程中使用promise
    13. std::thread t(set_value, std::move(prom));
    14. // 在主线程中等待future的值
    15. std::cout << "Waiting for the thread to set the value...\n";
    16. std::cout << "Value set by the thread: " << fut.get() << '\n';
    17. t.join();
    18. }
    19. int main()
    20. {
    21. // 3. 使用promise
    22. /*
    23. promise类似package,package需要等待函数全部执行完毕后,才可以通过future获取值;
    24. promise:绑定函数执行过程中如果设置了value后,可以在中途获取到.
    25. */
    26. use_promise();
    27. std::cout << "Finished!\n";
    28. }

    shared_future

    1. void myFunction(std::promise<int>&& promise) {
    2. // 模拟一些工作
    3. std::this_thread::sleep_for(std::chrono::seconds(1));
    4. promise.set_value(42); // 设置 promise 的值
    5. }
    6. void threadFunction(std::shared_future<int> future)
    7. {
    8. try {
    9. int result = future.get();
    10. std::cout << "Result: " << result << std::endl;
    11. }
    12. catch (const std::future_error& e) {
    13. std::cout << "Future error: " << e.what() << std::endl;
    14. }
    15. }
    16. void use_shared_future()
    17. {
    18. std::promise<int> promise;
    19. std::shared_future<int> future = promise.get_future();
    20. // 移动promise到线程中,执行类似计算的操作
    21. std::thread my_thread1(myFunction, std::move(promise));
    22. // 定义取结果的线程
    23. std::thread get_result1(threadFunction, future);
    24. std::thread get_result2(threadFunction, future);
    25. my_thread1.join();
    26. get_result1.join();
    27. get_result2.join();
    28. }
    29. void use_shared_future_error()
    30. {
    31. std::promise<int> promise;
    32. // 隐式转换,默认转换为shared_future
    33. std::shared_future<int> future = promise.get_future();
    34. // 移动promise到线程中,执行类似计算的操作
    35. std::thread my_thread1(myFunction, std::move(promise));
    36. // 定义取结果的线程
    37. std::thread get_result1(threadFunction, std::move(future));
    38. std::thread get_result2(threadFunction, std::move(future));
    39. my_thread1.join();
    40. get_result1.join();
    41. get_result2.join();
    42. }
    43. int main()
    44. {
    45. // 3. shared_future使用方法:供多个线程使用
    46. use_shared_future();
    47. /*
    48. 在shared_future中,定义的future不可以被移动构造,否则另一个线程无法使用
    49. */
    50. // use_shared_future_error(); // 错误用法
    51. std::cout << "Finished!\n";
    52. }

    抛出异常

    1. // 定义异常,通过子线程抛出
    2. void set_exception(std::promise<void> prom)
    3. {
    4. try
    5. {
    6. throw std::runtime_error("An error occurred! \n");
    7. }
    8. catch (const std::exception&)
    9. {
    10. prom.set_exception(std::current_exception());
    11. }
    12. }
    13. void use_promise_exception()
    14. {
    15. std::promise<void> prom;
    16. std::future<void> fut = prom.get_future();
    17. // 在新线程中设置promise异常
    18. std::thread t(set_exception, std::move(prom));
    19. // 主线程中获取异常
    20. try
    21. {
    22. std::cout << "Waiting for the thread to set the exception...\n";
    23. fut.get();
    24. }
    25. catch (const std::exception& e)
    26. {
    27. std::cout << "Exception set by the thread: " << e.what() << '\n';
    28. }
    29. t.join();
    30. }
    31. void use_promise_destruct()
    32. {
    33. std::thread t;
    34. std::future<int> fut;
    35. {
    36. std::promise<int> prom;
    37. fut = prom.get_future();
    38. t = std::thread(set_value, std::move(prom));
    39. }
    40. // 作用域结束后,prom可能因为release或者linux系统原因马上释放,导致错误.
    41. // 主线程中获取future值
    42. std::cout << "Waiting for the thread to set the value...\n";
    43. std::cout << "Value set by the thread: " << fut.get() << '\n';
    44. t.join();
    45. }
    46. int main()
    47. {
    48. // 4. 使用promise,在主线程中获取异常
    49. // 子线程抛出异常,主线程一定需要try捕获,否则会导致主线程崩溃。
    50. use_promise_exception();
    51. // 作用域原因的错误用法
    52. // 可能出错,也可能不会出错; 解决方法就是将prom按照智能指针方式传递.
    53. // use_promise_destruct();
    54. std::cout << "Finished!\n";
    55. }

    线程池

    首先在头文件中定义ThreadPool.h:

    1. #ifndef __THREAD_POOL_H__
    2. #define __THREAD_POOL_H__
    3. #include
    4. #include
    5. #include
    6. #include
    7. #include
    8. #include
    9. #include
    10. #include
    11. class ThreadPool
    12. {
    13. public:
    14. ThreadPool& operator=(const ThreadPool&) = delete;
    15. // 实现一个单例模式
    16. static ThreadPool& instance()
    17. {
    18. static ThreadPool instance;
    19. return instance;
    20. }
    21. using Task = std::packaged_task<void()>;
    22. ~ThreadPool()
    23. {
    24. stop();
    25. }
    26. // 一个返回类型后置语法
    27. template<class F, class ...Args>
    28. auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))>
    29. {
    30. using RetType = decltype(f(args...));
    31. // 如果线程池已经结束了,那么直接返回空的future.
    32. if (stop_.load()) {
    33. return std::future{};
    34. }
    35. // 将回调函数f以及可变参数绑定一起,生成一个无参函数。
    36. auto task = std::make_sharedRetType()>>(
    37. std::bind(std::forward(f), std::forward(args)...));
    38. // 通过std::future获取运行结果
    39. std::future ret = task->get_future();
    40. {
    41. // 将需要执行的任务放入任务队列, 等待线程领取.
    42. std::lock_guard cv_mt(cv_mt_);
    43. // 传递一个回调函数, lambda表达式,在弹出时执行
    44. // 通过lambda表达式捕获task,保证task在回调之前避免被销毁,类似伪闭包.
    45. tasks_.emplace([task] { (*task)(); });
    46. }
    47. // 通知挂起的线程执行
    48. cv_lock_.notify_one();
    49. return ret;
    50. }
    51. int idleThreadCount()
    52. {
    53. return thread_num_;
    54. }
    55. private:
    56. ThreadPool(unsigned int num = 5): stop_(false)
    57. {
    58. {
    59. if (num < 1) thread_num_ = 1;
    60. else thread_num_ = num;
    61. }
    62. start();
    63. }
    64. void start()
    65. {
    66. for (int i = 0; i < thread_num_; i++) {
    67. pool_.emplace_back([this]() {
    68. while (!this->stop_.load()) {
    69. Task task;
    70. {
    71. std::unique_lock cv_mt(cv_mt_);
    72. // 通过谓词判断,如果线程池已经停止,或者任务队列为空
    73. // 通过条件变量让线程挂起
    74. this->cv_lock_.wait(cv_mt, [this] {
    75. return this->stop_.load() || !this->tasks_.empty();
    76. });
    77. if (this->tasks_.empty())
    78. return;
    79. task = std::move(this->tasks_.front());
    80. this->tasks_.pop();
    81. }
    82. this->thread_num_--;
    83. task();
    84. this->thread_num_++;
    85. }
    86. });
    87. }
    88. }
    89. void stop()
    90. {
    91. // 原子操作,将停止信号设置为true.
    92. stop_.store(true);
    93. // 通知所有线程,停止前避免卡死.
    94. cv_lock_.notify_all();
    95. for (auto& td : pool_) {
    96. if (td.joinable()) {
    97. std::cout << "join thread " << td.get_id() << std::endl;
    98. td.join();
    99. }
    100. }
    101. }
    102. private:
    103. std::mutex cv_mt_;
    104. std::condition_variable cv_lock_;
    105. std::atomic_bool stop_;
    106. std::atomic_int thread_num_;
    107. std::queue tasks_; // 任务队列
    108. std::vector pool_; // 线程池
    109. };
    110. #endif // __THREAD_POOL_H__

    然后在主函数中测试调用:

    1. #include
    2. #include
    3. #include
    4. #include
    5. #include "ThreadPool.h"
    6. int main()
    7. {
    8. // 5. 使用线程池, 以及使用例子
    9. int m = 0;
    10. ThreadPool::instance().commit([](int& m) {
    11. m = 1024;
    12. std::cout << "Inner set m is " << m << std::endl;
    13. }, m);
    14. std::this_thread::sleep_for(std::chrono::seconds(3));
    15. std::cout << "Main: m is " << m << std::endl;
    16. ThreadPool::instance().commit([](int& m) {
    17. m = 1024;
    18. std::cout << "Inner set m is " << m << std::endl;
    19. }, std::ref(m));
    20. std::this_thread::sleep_for(std::chrono::seconds(3));
    21. std::cout << "Main: m is " << m << std::endl;
    22. /*
    23. 1. 如果任务有顺序要求,不可以用线程池.
    24. 2. 任务是有顺序要求的.
    25. */
    26. std::cout << "Finished!\n";
    27. }

    最后的测试结果:

  • 相关阅读:
    C++指针
    完整实现-通过DelayQueue实现延时任务
    C++----二叉树的进阶
    【网络】计算机网络基础概念入门
    HackTheBox You know 0xDiablos pwn题目
    C++常见容器实现原理
    linux 中的根文件系统
    【 第十四章 网络编程要素(IP和端口,网络通信协议),实现TCP的网络编程】
    spring boot启动报错java.lang.UnsupportedOperationException
    SpringCloud之Feign集成Ribbon
  • 原文地址:https://blog.csdn.net/weixin_42130300/article/details/136291061