• C++11 条件变量


    目录

    条件变量

    成员函数

    wait函数

    wait_for函数

    wait_until函数

    notify_one函数

    notify_all函数

    范例

    原子变量

    异步操作

    std::aysnc和std::future

    future的类型

    future的使用

    std::packaged_task

    std::promise

    小结:

    条件变量

    互斥量是多线程间同时访问某一共享变量时,保证变量可被安全访问的手段。但单靠互斥量无法实现线 程的同步。线程同步是指线程间需要按照预定的先后次序顺序进行的行为。C++11对这种行为也提供了 有力的支持,这就是条件变量。条件变量位于头文件condition_variable下。 http://www.cplusplus.com/reference/condition_variable/condition_variable

    条件变量使用过程:

    1. 拥有条件变量的线程获取互斥量;

    2. 循环检查某个条件,如果条件不满足则阻塞直到条件满足;如果条件满足则向下执行;

    3. 某个线程满足条件执行完之后调用notify_one或notify_all唤醒一个或者所有等待线程。

    条件变量提供了两类操作:wait和notify。这两类操作构成了多线程同步的基础。

    成员函数

    wait函数

    1. void wait (unique_lock& lck);
    2. template <class Predicate>
    3. void wait (unique_lock& lck, Predicate pred);

    包含两种重载,第一种只包含unique_lock对象,另外一个Predicate 对象(等待条件),这里必须使用 unique_lock,因为wait函数的工作原理:

    当前线程调用wait()后将被阻塞并且函数会解锁互斥量,直到另外某个线程调用notify_one或者 notify_all唤醒当前线程;一旦当前线程获得通知(notify),wait()函数也是自动调用lock(),同理不能使用lock_guard对象。

    如果wait没有第二个参数,第一次调用默认条件不成立,直接解锁互斥量并阻塞到本行,直到某一 个线程调用notify_one或notify_all为止,被唤醒后,wait重新尝试获取互斥量,如果得不到,线程会卡在这里,直到获取到互斥量,然后无条件地继续进行后面的操作。

    如果wait包含第二个参数,如果第二个参数不满足,那么wait将解锁互斥量并堵塞到本行,直到某一个线程调用notify_one或notify_all为止,被唤醒后,wait重新尝试获取互斥量,如果得不到,线程会卡在这里,直到获取到互斥量,然后继续判断第二个参数,如果表达式为false,wait对互斥量解锁,然后休眠,如果为true,则进行后面的操作。

    wait_for函数

    1. template <class Rep, class Period>
    2. cv_status wait_for (unique_lock& lck,
    3. const chrono::duration& rel_time);
    4. template <class Rep, class Period, class Predicate>
    5. bool wait_for (unique_lock& lck,
    6. const chrono::duration& rel_time, Predicate
    7. pred);

    和wait不同的是,wait_for可以执行一个时间段,在线程收到唤醒通知或者时间超时之前,该线程都会 处于阻塞状态,如果收到唤醒通知或者时间超时,wait_for返回,剩下操作和wait类似。

    wait_until函数

    1. template <class Clock, class Duration>
    2. cv_status wait_until (unique_lock& lck,
    3. const chrono::time_point& abs_time);
    4. template <class Clock, class Duration, class Predicate>
    5. bool wait_until (unique_lock& lck,
    6. const chrono::time_point& abs_time,
    7. Predicate pred);

    与wait_for类似,只是wait_until可以指定一个时间点,在当前线程收到通知或者指定的时间点超时之 前,该线程都会处于阻塞状态。如果超时或者收到唤醒通知,wait_until返回,剩下操作和wait类似

    notify_one函数

    void notify_one() noexcept;
    

    解锁正在等待当前条件的线程中的一个,如果没有线程在等待,则函数不执行任何操作,如果正在等待 的线程多余一个,则唤醒的线程是不确定的。

    notify_all函数

    void notify_all() noexcept;

    解锁正在等待当前条件的所有线程,如果没有正在等待的线程,则函数不执行任何操作。

    范例

    使用条件变量实现一个同步队列,同步队列作为一个线程安全的数据共享区,经常用于线程之间数据读取。

    1. template<typename T>
    2. class SyncQueue
    3. {
    4. private:
    5. bool IsFull() const
    6. {
    7. return _queue.size() == _maxSize;
    8. }
    9. bool IsEmpty() const
    10. {
    11. return _queue.empty();
    12. }
    13. public:
    14. SyncQueue(int maxSize) : _maxSize(maxSize)
    15. {
    16. }
    17. void Put(const T& x)
    18. {
    19. std::lock_guard locker(_mutex);
    20. while (IsFull())
    21. {
    22. std::cout << "full wait... size " << _queue.size() << std::endl;
    23. _notFull.wait(_mutex);
    24. }
    25. _queue.push_back(x);
    26. _notEmpty.notify_one();
    27. }
    28. void Take(T& x)
    29. {
    30. std::lock_guard locker(_mutex);
    31. while (IsEmpty())
    32. {
    33. std::cout << "empty wait.." << std::endl;
    34. _notEmpty.wait(_mutex);
    35. }
    36. x = _queue.front();
    37. _queue.pop_front();
    38. _notFull.notify_one();
    39. }
    40. bool Empty()
    41. {
    42. std::lock_guard locker(_mutex);
    43. return _queue.empty();
    44. }
    45. bool Full()
    46. {
    47. std::lock_guard locker(_mutex);
    48. return _queue.size() == _maxSize;
    49. }
    50. private:
    51. std::list _queue; //缓冲区
    52. std::mutex _mutex; //互斥量和条件变量结合起来使用
    53. std::condition_variable_any _notEmpty;//不为空的条件变量
    54. std::condition_variable_any _notFull; //没有满的条件变量
    55. int _maxSize; //同步队列最大的size
    56. };

    代码中用到了std::lock_guard,它利用RAII机制可以保证安全释放mutex。

    1. std::lock_guard locker(_mutex);
    2. while (IsFull())
    3. {
    4. std::cout << "full wait..." << std::endl;
    5. _notFull.wait(_mutex);
    6. }

    可以改成

    1. std::lock_guard locker(_mutex);
    2. _notFull.wait(_mutex, [this] {return !IsFull();});

    两种写法效果是一样的,但是后者更简洁,条件变量会先检查判断式是否满足条件,如果满足条件则重 新获取mutex,然后结束wait继续往下执行;如果不满足条件则释放mutex,然后将线程置为waiting状态继续等待。

    这里需要注意的是,wait函数中会释放mutex,而lock_guard这时还拥有mutex,它只会在出了作用域之后才会释放mutex,所以这时它并不会释放,但执行wait时会提前释放mutex。

    从语义上看这里使用lock_guard会产生矛盾,但是实际上并不会出问题,因为wait提前释放锁之后会处 于等待状态,在被notify_one或者notify_all唤醒后会先获取mutex,这相当于lock_guard的mutex在释放之后又获取到了,因此,在出了作用域之后lock_guard自动释放mutex不会有问题。 这里应该用unique_lock,因为unique_lock不像lock_guard一样只能在析构时才释放锁,它可以随时释放锁,因此在wait时让unique_lock释放锁从语义上更加准确。

    使用unique_lock和condition_variable_variable改写1-3-condition-sync-queue,改写为用等待一个判断式的方法来实现一个简单的队列。

    1. template<typename T>
    2. class SyncQueue
    3. {
    4. public:
    5. SyncQueue()
    6. {
    7. }
    8. void Put(const T& x)
    9. {
    10. std::lock_guard locker(_mutex);
    11. _queue.push_back(x);
    12. _notEmpty.notify_one();
    13. }
    14. void Take(T& x)
    15. {
    16. std::unique_lock locker(_mutex);
    17. _notEmpty.wait(locker, [this] {return !_queue.empty(); });
    18. x = _queue.front();
    19. _queue.pop_front();
    20. }
    21. bool Empty()
    22. {
    23. std::lock_guard locker(_mutex);
    24. return _queue.empty();
    25. }
    26. private:
    27. std::list _queue; //缓冲区
    28. std::mutex _mutex; //互斥量和条件变量结合起来使用
    29. std::condition_variable_any _notEmpty;//不为空的条件变量
    30. };

    原子变量

    1. // atomic::load/store example
    2. #include // std::cout
    3. #include // std::atomic, std::memory_order_relaxed
    4. #include // std::thread
    5. //std::atomic count = 0;//错误初始化
    6. std::atomic<int> count(0); // 准确初始化
    7. void set_count(int x)
    8. {
    9. std::cout << "set_count:" << x << std::endl;
    10. count.store(x, std::memory_order_relaxed); // set value atomically
    11. }
    12. void print_count()
    13. {
    14. int x;
    15. do {
    16. x = count.load(std::memory_order_relaxed); // get value atomically
    17. } while (x==0);
    18. std::cout << "count: " << x << '\n';
    19. }
    20. int main ()
    21. {
    22. std::thread t1 (print_count);
    23. std::thread t2 (set_count, 10);
    24. t1.join();
    25. t2.join();
    26. std::cout << "main finish\n";
    27. return 0;
    28. }

    异步操作

    std::future : 异步指向某个任务,然后通过future特性去获取任务函数的返回结果。std::aysnc: 异步运行某个任务函数

    std::packaged_task :将任务和feature绑定在一起的模板,是一种对任务的封装。 std::promise

    std::aysnc和std::future

    std::future期待一个返回,从一个异步调用的角度来说,future更像是执行函数的返回值,C++标准库使用std::future为一次性事件建模,如果一个事件需要等待特定的一次性事件,那么这线程可以获取一个future对象来代表这个事件。 异步调用往往不知道何时返回,但是如果异步调用的过程需要同步,或者说后一个异步调用需要使用前一个异步调用的结果。这个时候就要用到future。 线程可以周期性的在这个future上等待一小段时间,检查future是否已经ready,如果没有,该线程可以 先去做另一个任务,一旦future就绪,该future就无法复位(无法再次使用这个future等待这个事 件),所以future代表的是一次性事件。

    future的类型

    在库的头文件中声明了两种future,唯一future(std::future)和共享future(std::shared_future)这两个是参照。

    std::unique_ptr和std::shared_ptr设立的,前者的实例是仅有的一个指向其关联事件的实例,而后者可以有多个实例指向同一个关联事件,当事件就绪时,所有指向同一事件的std::shared_future实例会变成就绪。

    future的使用

    std::future是一个模板,例如std::future,模板参数就是期待返回的类型,虽然future被用于线程间通信,但其本身却并不提供同步访问,热门必须通过互斥元或其他同步机制来保护访问。 future使用的时机是当你不需要立刻得到一个结果的时候,你可以开启一个线程帮你去做一项任务,并期待这个任务的返回,但是std::thread并没有提供这样的机制,这就需要用到std::async和std::future (都在头文件中声明) std::async返回一个std::future对象,而不是给你一个确定的值(所以当你不需要立刻使用此值的时候才需要用到这个机制)。当你需要使用这个值的时候,对future使用get(),线程就会阻塞直到future就 绪,然后返回该值。

    1. //1-5-future
    2. #include
    3. #include
    4. #include
    5. using namespace std;
    6. int find_result_to_add()
    7. {
    8. std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟的影响
    9. std::cout << "find_result_to_add" << std::endl;
    10. return 1 + 1;
    11. }
    12. int find_result_to_add2(int a, int b)
    13. {
    14. // std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟的影响
    15. return a + b;
    16. }
    17. void do_other_things()
    18. {
    19. std::cout << "do_other_things" << std::endl;
    20. // std::this_thread::sleep_for(std::chrono::seconds(5));
    21. }
    22. int main()
    23. {
    24. // std::future std::async他是异步线程
    25. std::future<int> result = std::async(find_result_to_add); // 不会阻塞32行的运行
    26. // std::future result = std::async(find_result_to_add); // 如果使用decltype 需要把函数写进去
    27. // auto result = std::async(find_result_to_add); // 推荐的写法
    28. do_other_things();
    29. std::cout << "result: " << result.get() << std::endl; // 延迟是否有影响? get会阻塞等待函数返回
    30. // std::future result2 = std::async(find_result_to_add2, 10, 20); //错误
    31. //std::future result2 = std::async(find_result_to_add2, 10, 20); // 正确
    32. //std::cout << "result2: " << result2.get() << std::endl; // 延迟是否有影响?
    33. std::cout << "main finish" << endl;
    34. return 0;
    35. }

    std::async是为了让用户的少费点脑子的,它让这三个对象默契的工作。大概的工作过程是这样的:std::async先将异步操作用std::packaged_task包装起来,然后将异步操作的结果放到std::promise中,这个过程就是创造未来的过程。外面再通过future.get/wait来获取这个未来的结果

    std::async的原型async(std::launch::async | std::launch::deferred, f, args…),第一个参数是线程的创建策略,有两种策略,默认的策略是立即创建线程:

    跟thread类似,async允许你通过将额外的参数添加到调用中,来将附加参数传递给函数。如果传入的函数指针是某个类的成员函数,则还需要将类对象指针传入(直接传入,传入指针,或者是std::ref封装)。 默认情况下,std::async是否启动一个新线程,或者在等待future时,任务是否同步运行都取决于你给的参数。这个参数为std::launch类型

    std::launch::defered表明该函数会被延迟调用,直到在future上调用get()或者wait()为止

    std::launch::async,表明函数会在自己创建的线程上运行

    std::launch::any = std::launch::defered | std::launch::async

    std::launch::sync = std::launch::defered

    1. enum class launch
    2. {
    3. async,deferred,sync=deferred,any=async|deferred
    4. };

    PS:默认选项参数被设置为std::launch::any。如果函数被延迟运行可能永远都不会运行。

    future_status有三种状态:

    deferred:异步操作还没开始
    ready:异步操作已经完成
    timeout:异步操作超时

    std::packaged_task

    如果说std::async和std::feature还是分开看的关系的话,那么std::packaged_task就是将任务和feature 绑定在一起的模板,是一种封装对任务的封装

    The class template std::packaged_task wraps any Callable target (function, lambda expression, bind expression, or another function object) so that it can be invoked asynchronously. Its return value or exception thrown is stored in a shared state which can be accessed through std::future objects.

    可以通过std::packaged_task对象获取任务相关联的future,调用get_future()方法可以获得 std::packaged_task对象绑定的函数的返回值类型的future。std::packaged_task的模板参数是函数签名

    PS:例如int add(int a, intb)的函数签名就是int(int, int)

    1. //1-5-package_task
    2. #include
    3. #include
    4. #include
    5. using namespace std;
    6. int add(int a, int b, int c)
    7. {
    8. std::cout << "call add\n";
    9. return a + b + c;
    10. }
    11. void do_other_things()
    12. {
    13. std::cout << "do_other_things" << std::endl;
    14. }
    15. int main()
    16. {
    17. std::packaged_task<int(int, int, int)> task(add); // 1. 封装任务,还没有运行
    18. std::this_thread::sleep_for(std::chrono::seconds(5)); // 用来测试异步延迟的影响
    19. // do_other_things();
    20. std::future<int> result = task.get_future(); // 这里运行吗?这里只是获取 future
    21. // 这里才真正运行
    22. task(1, 1, 2); //必须要让任务执行,否则在get()获取future的值时会一直阻塞
    23. std::cout << "result:" << result.get() << std::endl;
    24. return 0;
    25. }

    std::promise

    std::promise为获取线程函数中的某个值提供便利,在线程函数中给外面传进来的promise赋值,当线程函数执行完成之后就可以通过promis获取该值了,值得注意的是取值是间接的通过promise内部提供的future来获取的。它的基本用法:

    1. std::promise<int> pr;
    2. std::thread t([](std::promise<int>& p){ p.set_value_at_thread_exit(9); },std::ref(pr));
    3. std::future<int> f = pr.get_future();
    4. auto r = f.get();

    std::promise提供了一种设置值的方式,它可以在这之后通过相关联的std::future对象进行读取。换种说法,之前已经说过std::future可以读取一个异步函数的返回值了,那么这个std::promise就提供一种方式手动让future就绪。

    小结:

    std::future提供了一个访问异步操作结果的机制,它和线程是一个级别的属于低层次的对象,在它之上高一层的是std::packaged_task和std::promise,他们内部都有future以便访问异步操作结果,std::packaged_task包装的是一个异步操作,而std::promise包装的是一个值,都是为了方便异步操作的,因为有时我需要获取线程中的某个值,这时就用std::promise,而有时我需要获一个异步操作的返回值,这时就用std::packaged_task。那std::promise和std::packaged_task之间又是什么关系呢?说他们没关系也没关系,说他们有关系也有关系,都取决于你了,因为我可以将一个异步操作的结果保存到std::promise中。如果读者还没搞清楚他们的关系的话,我就用更通俗的话来解释一下。比如,一个小伙子给一个姑娘表白真心的时候也许会说:”我许诺会给你一个美好的未来“或者”我会努力奋斗为你创造一个美好的未来“。姑娘往往会说:”我等着“。现在我来将这三句话用c++11来翻译一下:

    小伙子说:我许诺会给你一个美好的未来等于c++11中"std::promise a std::future";
    小伙子说:我会努力奋斗为你创造一个美好的未来等于c++11中"std::packaged_task a future";
    姑娘说:我等着等于c++11中"future.get()/wait()";

    小伙子两句话的个中差异,自己琢磨一下,这点差异也是std::promise和std::packaged_task的差异。现实中的山盟海誓靠不靠得住我不知道,但是c++11中的许诺和未来是一定可靠的,发起来了许诺就一定有未来。细想起来c++11标准的制定者选定的关键字真是贴切而有意思!

  • 相关阅读:
    pg数据表同步到hive表数据压缩总结
    内网渗透神器CobaltStrike之配置与基础操作(一)
    从赋能中心到科技成果转化基地:百度飞桨的人工智能探索之路
    NVIDIA DALI学习:数据加载
    I350网卡烧录oprom,通过UEFI PXE引导方案
    JavaAssist的进阶使用
    Unity和UE4两大游戏引擎,你该如何选择?
    几种常见的加密方式
    学习ASP.NET Core Blazor编程系列二十七——JWT登录(1)
    【机器学习】Sklearn导入手写数字数据集 Mnist 失败的解决办法
  • 原文地址:https://blog.csdn.net/jianfeng123123/article/details/126849097