• c++11 thread


    构造

    构造std::thread对象时,如果不带参则会创建一个空的thread对象,但底层线程并没有真正被创建,一般可将其他std::thread对象通过move移入其中;如

    1. std::thread t3(f2, std::ref(n)); // pass by reference
    2. std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread

    如果带参则会创建新线程,而且会被立即运行。包括函数指针、函数对象、lambda等,能让使用者对其进行函数调用操作

    如 :explicit thread(_Fn&& _Fx, _Args&&... _Ax)。

    1. std::thread thread1(funct1,this);
    2. class Func
    3. {
    4. public:
    5. void operator() () const
    6. {
    7. do_something();
    8. }
    9. };
    10. std::thread myThread((Func()));
    11. std::thread myThread([]{
    12. do_something();
    13. });

    std::thread对象不能被复制和赋值,只能被移动。 

    析构

    std::thread对象析构时,会先判断是否可joinable(),如果可联结,则程序会直接被终止出错。这意味着创建thread对象以后,要在随后的某个地方调用join或detach以便让std::thread处于不可联结状态。

    1. std::thread t2(f1, n + 1); // pass by value
    2. t2.join();

    联结

    joinable():用于判断std::thread对象联结状态,一个std::thread对象只可能处于可联结或不可联结两种状态之一。

          a. 可联结:当线程己运行或可运行、或处于阻塞时是可联结的。注意,如果某个底层线程已经执行完任务,但是没有被join的话,仍然处于joinable状态。

             即std::thread对象(对象由父线程所有)与底层线程保持着关联时,为joinable状态。

          b. 不可联结:

         ① 当不带参构造的std::thread对象为不可联结,因为底层线程还没创建。

         ② 己移动的std::thread对象为不可联结。

         ③ 己调用join或detach的对象为不可联结状态。因为调用join()以后,底层线程己结束,而detach()会把std::thread对象和对应的底层线程之间的连接断开。

    join():等待子线程,调用线程处于阻塞模式。join()执行完成之后,底层线程id被设置为0,即joinable()变为false。join() 仅能调用一次;只要 std::thread 对象曾经调用过 join(),线程就不再可汇合(joinable)

     detach():分离子线程,与当前线程的连接被断开,子线程成为后台线程,被C++运行时库接管。则   *this 不再代表任何的线程执行实例。 joinable() = false     get_id() = std::thread::id()

    用这函数,主要传给子线程的参数不能是当前线程的局部变量,不然生命周期结束了,还在子线程使用,会崩溃。

    函数

    get_id:获取线程ID,返回一个类型为std::thread::id的对象。

    swap:交换两个线程对象所代表的底层句柄。

    std::this_thread::sleep_until: 线程休眠至某个指定的时刻(time point),该线程才被重新唤醒。

    std::this_thread::sleep_for: 线程休眠某个指定的时间片(time span),该线程才被重新唤醒,不过由于线程调度等原因,实际休眠时间可能比 sleep_duration 所表示的时间片更长

    1. std::chrono::milliseconds dura( 2000 );
    2. std::this_thread::sleep_for( dura );

    c++11线程池的基础

    std::mutex互斥量

    lock、unlock

    std::lock_guard    RAII(资源获取即初始化)。

    std::mutex mt;

    std::lock_guard lock(mt);

    std::unique_lock 

    RAII(资源获取即初始化)。std::unique_lock 占用更多的空间,也比 std::lock_guard 略慢。但 std::unique_lock 对象可以不占有关联的互斥,具备这份灵活性需要付出代价:需要存储并且更新互斥信息。

    std::mutex mt;

    std::unique_lock lock(mt);

    条件变量std::condition_variable

    std::condition_variable condit;

    wait函数

    1.只有独占锁一个参数变量,void wait( std::unique_lockstd::mutex& lock );

    wait()将解锁互斥量,并阻塞到本行,阻塞到其他某个线程调用notify_one()成员函数为止。然后wait函数返回,互斥量上锁

    2.带独占锁参数和参数lambda表达式

    a.如果第二个参数的lambda表达式返回值是false,那么wait()将解锁互斥量,并阻塞到本行.

    阻塞到其他某个线程调用notify_one()成员函数并且lambda表达式为true. wait才能返回

    b.如果第二个参数的lambda表达式返回值是true,那么wait()直接返回并继续执行

    互斥量被上锁,效果就是调了notify_one效果一样

    notify_one

    不带参数,wait函数返回,互斥量上锁

    带参数,需要等lambda表达式为true,wait函数才返回,互斥量上锁

    notify_all

    和notify_one一样

    wait_until、wait_for

    可以指定一个时间段,在当前线程收到通知或者指定的时间 rel_time 超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_for 返回。

    其返回值有2种状态:timeout和no_timeout 。即std::cv_status::timeoutstd::cv_status::tno_timeout

    所以wait_for 函数和windows的等待函数waiforsingleobject函数一样。

    1. DWORD WINAPI datathread(PVOID p)
    2. {
    3. CMainFrame *pframe = (CMainFrame*)p;
    4. std::unique_lock<std::mutex> lock((pframe->m_mutex));
    5. while ((pframe->cond.wait_for(lock, std::chrono::seconds(2))) ==
    6. std::cv_status::timeout)
    7. {
    8. int a = 1;
    9. }
    10. return 0;
    11. }
    12. void CMainFrame::OnnewThread()
    13. {
    14. std::thread thr(datathread, this);
    15. thr.detach();
    16. }
    17. void CMainFrame::Onclosethread()
    18. {
    19. std::unique_lock<std::mutex> lock(m_mutex);
    20. cond.notify_all();
    21. }

    std::future

    C++11中的std::future是一个模板类。std::future提供了一种用于访问异步操作结果的机制。std::future所引用的共享状态不能与任何其它异步返回的对象共享.

    async和packaged_task有一个共同点它们都可以返回一个future对象,用户可以通过这个futureget方法获取最终的结果。

    1. std::packaged_task<int (int,int)> task(Add);
    2. future<int> result = task.get_future();
    3. //启动任务,非异步
    4. task(4,8);
    5. cout << "task_thread :" << result.get() << "\n";

    . get函数:

    (1).当共享状态就绪时,返回存储在共享状态中的值(或抛出异常)。

    (2).如果共享状态尚未就绪(即提供者尚未设置其值或异常),则该函数将阻塞调用的线程直到就绪.

    wait函数

    会让当前线程阻塞,直到子线程返回来

    packaged_task

    创建thread线程结合一起使用,使函数对象异步执行。

    类似于funcitonpackaged_task可以绑定一个可调用对象, 并执行,但是它的返回类型是void,获取它的返回值必须用get_future函数。:

    packaged_task类模板也是定义于future头文件中,它包装任何可调用 (Callable) 目标,包括函数、 lambda 表达式、 bind 表达式或其他函数对象,使得能异步调用它,其返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。简言之,将一个普通的可调用函数对象转换为异步执行的任务。

    常用的函数

    1.get_future   

    得到std::future 的值

    2.reset

    1. //包装可调用目标时lambda
    2. packaged_task<int(int,int)> task([](int a, int b){ return a + b;});
    3. //仿函数形式,启动任务,非异步
    4. task(2, 10);
    5. future<int> result = task.get_future();
    6. //获取共享状态中的值,直到ready才能返回结果或者异常
    7. cout << "task_lambda :" << result.get() << "\n";
    8. //包装普通函数
    9. std::packaged_task<int (int,int)> task(Add);
    10. future<int> result = task.get_future();
    11. //启动任务,非异步
    12. task(4,8);
    13. cout << "task_thread :" << result.get() << "\n";
    14. //或者通过线程启动任务,异步启动
    15. thread td(move(task), 2, 10);
    16. td.join();
    17. //获取执行结果
    18. cout << "task_thread :" << result.get() << "\n";

    结论:

    1.packaged_task你需要自己调用thread,而async创建后,就会默认执行,后面直接调用get方法就好了。

    2.想真正实现异步,需要使用线程启动任务。

    async

    C++11中的std::async是个模板函数。std::async异步调用函数,在某个时候以Args作为参数(可变长参数)调用Fn,无需等待Fn执行完成就可返回,返回结果是个std::future对象。Fn返回的值可通过std::future对象的get成员函数获取。一旦完成Fn的执行,共享状态将包含Fn返回的值并ready。

    std::launch::async  创建新线程执行

    1. 默认参数为std::launch::async,创建新线程执行
    2. int cal(int a, int b)
    3. {
    4. return a + b;
    5. }
    6. std::future<int> result = async(cal,1,2);
    7. result.wait();

    launch::deferred   //任务已经创建,线程不创建,不执行函数

    std::future result=std::async(launch::deferred,gatewayFunction,para);

    cout<

    总结:

    async和deferred的不同之处是async强制任务创建新线程执行函数,而deferred不是,所以deferred是在调用处中延迟执行任务。与判断联合使用,可以防止线程过多导致系统崩溃。

    async和packaged_task有一个共同点它们都可以返回一个future对象,用户可以通过这个futureget方法获取最终的结果。

    延伸:c++11线程池

    参考:基于C++11实现线程池 - 知乎

    1. #pragma once
    2. #include <queue>
    3. #include <mutex>
    4. #include <functional>
    5. #include <vector>
    6. #include <thread>
    7. #include <future>
    8. template <typename T>
    9. class SafeQueue
    10. {
    11. private:
    12. std::queue<T> m_queue; //利用模板函数构造队列
    13. std::mutex m_mutex; // 访问互斥信号量
    14. public:
    15. SafeQueue() {}
    16. SafeQueue(SafeQueue &&other) {}
    17. ~SafeQueue() {}
    18. bool empty() // 返回队列是否为空
    19. {
    20. std::unique_lock<std::mutex> lock(m_mutex); // 互斥信号变量加锁,防止m_queue被改变
    21. return m_queue.empty();
    22. }
    23. int size()
    24. {
    25. std::unique_lock<std::mutex> lock(m_mutex); // 互斥信号变量加锁,防止m_queue被改变
    26. return m_queue.size();
    27. }
    28. // 队列添加元素
    29. void enqueue(T &t)
    30. {
    31. std::unique_lock<std::mutex> lock(m_mutex);
    32. m_queue.emplace(t);
    33. }
    34. // 队列取出元素
    35. bool dequeue(T &t)
    36. {
    37. std::unique_lock<std::mutex> lock(m_mutex); // 队列加锁
    38. if (m_queue.empty())
    39. return false;
    40. t = std::move(m_queue.front()); // 取出队首元素,返回队首元素值,并进行右值引用
    41. m_queue.pop(); // 弹出入队的第一个元素
    42. return true;
    43. }
    44. };
    45. class ThreadPool
    46. {
    47. private:
    48. class ThreadWorker // 内置线程工作类
    49. {
    50. private:
    51. int m_id; // 工作id
    52. ThreadPool *m_pool; // 所属线程池
    53. public:
    54. // 构造函数
    55. ThreadWorker(ThreadPool *pool, const int id) : m_pool(pool), m_id(id)
    56. {
    57. }
    58. // 重载()操作
    59. void operator()()
    60. {
    61. std::function<void()> func; // 定义基础函数类func
    62. bool dequeued; // 是否正在取出队列中元素
    63. while (!m_pool->m_shutdown)
    64. {
    65. {
    66. // 为线程环境加锁,互访问工作线程的休眠和唤醒
    67. std::unique_lock<std::mutex> lock(m_pool->m_conditional_mutex);
    68. TRACE(_T("---queue size=%d\n--"), m_pool->m_queue.size());
    69. // 如果任务队列为空,阻塞当前线程
    70. if (m_pool->m_queue.empty())
    71. {
    72. m_pool->m_conditional_lock.wait(lock); // 等待条件变量通知,开启线程
    73. }
    74. // 取出任务队列中的元素
    75. dequeued = m_pool->m_queue.dequeue(func);
    76. }
    77. // 如果成功取出,执行工作函数
    78. if (dequeued)
    79. func();
    80. }
    81. int a = 1;
    82. }
    83. };
    84. bool m_shutdown; // 线程池是否关闭
    85. SafeQueue<std::function<void()>> m_queue; // 执行函数安全队列,即任务队列
    86. std::vector<std::thread> m_threads; // 工作线程队列
    87. std::mutex m_conditional_mutex; // 线程休眠锁互斥变量
    88. std::condition_variable m_conditional_lock; // 线程环境锁,可以让线程处于休眠或者唤醒状态
    89. public:
    90. // 线程池构造函数
    91. ThreadPool(const int n_threads = 4)
    92. : m_threads(std::vector<std::thread>(n_threads)), m_shutdown(false)
    93. {
    94. }
    95. ThreadPool(const ThreadPool &) = delete;
    96. ThreadPool(ThreadPool &&) = delete;
    97. ThreadPool &operator=(const ThreadPool &) = delete;
    98. ThreadPool &operator=(ThreadPool &&) = delete;
    99. // Inits thread pool
    100. void init()
    101. {
    102. for (int i = 0; i < m_threads.size(); ++i)
    103. {
    104. m_threads.at(i) = std::thread(ThreadWorker(this, i)); // 分配工作线程
    105. }
    106. }
    107. void wait()
    108. {
    109. for (int i = 0; i < m_threads.size(); ++i)
    110. {
    111. m_threads.at(i).join(); // 将线程加入到等待队列
    112. }
    113. }
    114. // Waits until threads finish their current task and shutdowns the pool
    115. void shutdown()
    116. {
    117. m_shutdown = true;
    118. m_conditional_lock.notify_all(); // 通知,唤醒所有工作线程
    119. for (int i = 0; i < m_threads.size(); ++i)
    120. {
    121. if (m_threads.at(i).joinable()) // 判断线程是否在等待
    122. {
    123. m_threads.at(i).join(); // 将线程加入到等待队列
    124. }
    125. }
    126. }
    127. // Submit a function to be executed asynchronously by the pool
    128. template <typename F, typename... Args>
    129. auto submit(F &&f, Args &&...args) -> std::future<decltype(f(args...))>
    130. {
    131. // Create a function with bounded parameter ready to execute
    132. std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); // 连接函数和参数定义,特殊函数类型,避免左右值错误
    133. // Encapsulate it into a shared pointer in order to be able to copy construct
    134. auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
    135. // Warp packaged task into void function
    136. std::function<void()> warpper_func = [task_ptr]()
    137. {
    138. (*task_ptr)();
    139. };
    140. // 队列通用安全封包函数,并压入安全队列
    141. m_queue.enqueue(warpper_func);
    142. // 唤醒一个等待中的线程
    143. m_conditional_lock.notify_one();
    144. // 返回先前注册的任务指针
    145. return task_ptr->get_future();
    146. }
    147. };
    1. // test.cpp
    2. #include
    3. #include
    4. #include "thread_pool.h"
    5. std::random_device rd; // 真实随机数产生器
    6. std::mt19937 mt(rd()); //生成计算随机数mt
    7. std::uniform_int_distribution<int> dist(-1000, 1000); //生成-1000到1000之间的离散均匀分布数
    8. auto rnd = std::bind(dist, mt);
    9. // 设置线程睡眠时间
    10. void simulate_hard_computation()
    11. {
    12. std::this_thread::sleep_for(std::chrono::milliseconds(2000 + rnd()));
    13. }
    14. // 添加两个数字的简单函数并打印结果
    15. void multiply(const int a, const int b)
    16. {
    17. simulate_hard_computation();
    18. const int res = a * b;
    19. std::cout << a << " * " << b << " = " << res << std::endl;
    20. }
    21. // 添加并输出结果
    22. void multiply_output(int &out, const int a, const int b)
    23. {
    24. simulate_hard_computation();
    25. out = a * b;
    26. std::cout << a << " * " << b << " = " << out << std::endl;
    27. }
    28. // 结果返回
    29. int multiply_return(const int a, const int b)
    30. {
    31. simulate_hard_computation();
    32. const int res = a * b;
    33. std::cout << a << " * " << b << " = " << res << std::endl;
    34. return res;
    35. }
    36. void example()
    37. {
    38. // 创建3个线程的线程池
    39. ThreadPool pool(3);
    40. // 初始化线程池
    41. pool.init();
    42. // 提交乘法操作,总共30个
    43. for (int i = 1; i <= 3; ++i)
    44. for (int j = 1; j <= 10; ++j)
    45. {
    46. pool.submit(multiply, i, j);
    47. }
    48. // 使用ref传递的输出参数提交函数
    49. int output_ref;
    50. auto future1 = pool.submit(multiply_output, std::ref(output_ref), 5, 6);
    51. // 等待乘法输出完成
    52. future1.get();
    53. std::cout << "Last operation result is equals to " << output_ref << std::endl;
    54. // 使用return参数提交函数
    55. auto future2 = pool.submit(multiply_return, 5, 3);
    56. // 等待乘法输出完成
    57. int res = future2.get();
    58. std::cout << "Last operation result is equals to " << res << std::endl;
    59. // 关闭线程池
    60. pool.shutdown();
    61. }
    62. int main()
    63. {
    64. example();
    65. return 0;
    66. }

  • 相关阅读:
    牛客day 8
    【CQF Math Class 数学笔记】
    计算机毕业设计(附源码)python装修服务分析系统
    4-11 Isomorphic
    LeetCode-380. Insert Delete GetRandom O(1) [C++][Java]
    Vue--解决Scss报错:Syntax Error: TypeError: this.getOptions is not a function
    ChatGPT、AIGC、大语言模型都是啥?
    Leecode 链表
    如何配置 logback?30分钟让你彻底学会代码熬夜敲
    get_post (攻防世界)(简单php)
  • 原文地址:https://blog.csdn.net/baidu_16370559/article/details/127797775