• C++协程


    协程概念

    协程是一个可以暂停执行以便稍后恢复的函数。

    协程是无堆栈的:通过返回给调用者来暂停执行,并且恢复执行所需的数据与堆栈分开存储。

    这允许异步执行顺序的代码(例如,在没有显式回调的情况下处理非阻塞 I/O),并且还支持惰性计算无限序列和其他用途的算法。

    如果函数的定义执行以下任何一项,则该函数是协程:

    1、使用 co_await 运算符暂停执行直到恢复

    1. task<> tcp_echo_server()
    2. {
    3. char data[1024];
    4. while(true)
    5. {
    6. size_t n = co_await socket.async_read_some(buffer[data]);
    7. co_await async_write(socket,buffer(data,n));
    8. }
    9. }

    2、使用关键字 co_yield 暂停执行返回值

    1. generator<int> itoa(int n = 0)
    2. {
    3. while(true)
    4. {
    5. co_yield n++;
    6. }
    7. }

    3、使用关键字 co_return 完成执行返回值

    1. lazy<int> f()
    2. {
    3. co_return 7;
    4. }

    每个协程都必须具有满足许多要求的返回类型,如下所述:

    限制

    协程不能使用可变参数、普通返回语句或占位符返回类型(auto 或 Concept)。

    constexpr 函数、构造函数、析构函数和主函数不能是协程。

    执行

    每个协程都与

    a、从协程内部操纵的承诺对象。 协程通过此对象提交其结果或异常。

    b、协程句柄,从协程外部操作。 这是一个非拥有句柄,用于恢复协程的执行或销毁协程框架。

    c、协程状态,它是一个内部的堆分配(除非分配被优化),对象包含

          1)承诺(promise)对象

         2)参数(全部按值复制)

         3)当前暂停点的某种表示,以便 resume 知道在哪里继续, destroy 知道哪些局部变量在范围内

         4)生命周期跨越当前挂起点的局部变量和临时变量

    当协程开始执行时,它会执行以下操作:

    1、使用 operator new 分配协程状态对象(见下文)

    2、将所有函数参数复制到协程状态:移动或复制按值复制参数,按引用参数保持引用(如果在引用对象的生命周期结束后恢复协程,则可能会变得悬空)

    1. #include <coroutine>
    2. #include <iostream>
    3. struct promise;
    4. struct coroutine : std::coroutine_handle<promise>
    5. { using promise_type = struct promise; };
    6. struct promise {
    7. coroutine get_return_object()
    8. { return {coroutine::from_promise(*this)}; }
    9. std::suspend_always initial_suspend() noexcept { return {}; }
    10. std::suspend_always final_suspend() noexcept { return {}; }
    11. void return_void() {}
    12. void unhandled_exception() {}
    13. };
    14. struct S {
    15. int i;
    16. coroutine f() {
    17. std::cout << i;
    18. co_return;
    19. }
    20. };
    21. void bad1() {
    22. coroutine h = S{0}.f();
    23. // S{0} destroyed
    24. h.resume(); // resumed coroutine executes std::cout << i, uses S::i after free
    25. h.destroy();
    26. }
    27. coroutine bad2() {
    28. S s{0};
    29. return s.f(); // returned coroutine can't be resumed without committing use after free
    30. }
    31. void bad3() {
    32. coroutine h = [i = 0]() -> coroutine { // a lambda that's also a coroutine
    33. std::cout << i;
    34. co_return;
    35. }(); // immediately invoked
    36. // lambda destroyed
    37. h.resume(); // uses (anonymous lambda type)::i after free
    38. h.destroy();
    39. }
    40. void good() {
    41. coroutine h = [](int i) -> coroutine { // make i a coroutine parameter
    42. std::cout << i;
    43. co_return;
    44. }(0);
    45. // lambda destroyed
    46. h.resume(); // no problem, i has been copied to the coroutine frame as a by-value parameter
    47. h.destroy();
    48. }

    3、调用 promise 对象的构造函数。 如果 Promise 类型有一个接受所有协程参数的构造函数,则调用该构造函数,并带有复制后的协程参数。 否则调用默认构造函数。

    4、调用 promise.get_return_object() 并将结果保存在局部变量中。 当协程第一次挂起时,该调用的结果将返回给调用者。 任何在此步骤之前(包括该步骤)引发的异常都会传给调用者,而不是放在 Promise 中。

    5、调用 promise.initial_suspend() 并 co_await 其结果。 典型的 Promise 类型要么返回一个 suspend_always,用于延迟启动的协程,要么返回一个 suspend_never,用于急切启动的协程。

    6、当 co_await promise.initial_suspend() 恢复时,开始执行协程主体。

    7、当协程到达暂停点时,如果需要,在隐式转换为协程的返回类型后,将之前获得的返回对象返回给调用者/恢复者。

    8、当协程到达 co_return 语句时,它会执行以下操作:

    1)调用 promise.return_void() 为

    a、co_return;

    b、co_return expr 其中 expr 的类型为 void

    c、从返回 void 的协程的末尾掉落。 在这种情况下,如果 Promise 类型没有 Promise::return_void() 成员函数,则行为未定义。

    2)或为 co_return expr 调用 promise.return_value(expr),其中 expr 具有非 void 类型

    3)以与创建时相反的顺序销毁所有具有自动存储持续时间的变量。

    4)调用 promise.final_suspend() 并 co_await 其结果。

    9、如果协程以未捕获的异常结束,它将执行以下操作:

    1)捕获异常并从 catch-block 中调用 promise.unhandled_exception()

    2)调用 promise.final_suspend() 并 co_await 结果(例如,恢复协程或获取结果)。 从这一点恢复协程是未定义的行为。

    10、当协程状态因为 co_return 或未捕获的异常终止,或者因为它的句柄被销毁而被销毁时,它会执行以下操作:

    1)调用 promise 对象的析构函数。

    2)调用函数参数副本的析构函数。

    3)调用 operator delete 释放协程状态使用的内存

    4)将执行转移给调用者/resumer。

    堆分配

    协程状态通过非数组 operator new 在堆上分配。

    如果 Promise 类型定义了类级别的替换,则将使用它,否则将使用全局的 operator new。

    如果 Promise 类型定义了一个带有附加参数的 operator new 的放置形式,并且它们匹配一个参数列表,其中第一个参数是请求的大小(类型为 std::size_t),其余的是协程函数参数,这些参数将 传递给 operator new(这使得协程可以使用前导分配器约定)。

    可以优化对 operator new 的调用(即使使用自定义分配器),如果协程状态的生命周期严格嵌套在调用者的生命周期内,并且协程框架的大小在调用站点是已知的。在这种情况下,协程状态嵌入在调用者的堆栈帧中(如果调用者是普通函数)或协程状态(如果调用者是协程)

    如果分配失败,协程抛出 std::bad_alloc,除非 Promise 类型定义了成员函数 Promise::get_return_object_on_allocation_failure()。 如果定义了该成员函数,则分配使用 operator new 的 nothrow 形式,并且在分配失败时,协程立即将从 Promise::get_return_object_on_allocation_failure() 获得的对象返回给调用者。

    Promise

    Promise 类型由编译器使用 std::coroutine_traits 根据协程的返回类型确定。

    形式上,令 R 和 Args... 分别表示协程的返回类型和参数类型列表,Class T 和 /*cv-qual*/(如果有)分别表示协程所属的类类型及其 cv-qualification。如果它被定义为非静态成员函数,它的 Promise 类型由以下决定:

    1、std::coroutine_traits<R, Args...>::promise_type,如果协程没有定义为非静态成员函数,

    2、std::coroutine_traits<R, Class T /*cv-qual*/&, Args...>::promise_type,如果协程被定义为非右值引用限定的非静态成员函数,

    3、std::coroutine_traits<R, Class T /*cv-qual*/&&, Args...>::promise_type,如果协程被定义为右值引用限定的非静态成员函数。

    例如:

    1、如果协程定义为 task<float> foo(std::string x, bool flag);,则其 Promise 类型为 std::coroutine_traits<task<float>, std::string, bool>::promise_type。

    2、如果协程定义为 task<void> my_class::method1(int x) const;,则其 Promise 类型为 std::coroutine_traits<task<void>, const my_class&, int>::promise_type。

    3、如果协程定义为 task<void> my_class::method1(int x) &&;,则其 Promise 类型为 std::coroutine_traits<task<void>, my_class&&, int>::promise_type。

    co_await

    unary operator co_await 挂起协程并将控制权返回给调用者。 它的操作数是一个表达式,其类型必须要么定义 operator co_await,要么可以通过当前协程的 Promise::await_transform 转换为这种类型。

    co_await expr    

    首先,将 expr 转换为可等待对象,如下所示:

    1、如果 expr 由初始挂起点、最终挂起点或 yield 表达式产生,则awaitable是 expr。

    2、否则,如果当前协程的 Promise 类型有成员函数 await_transform,那么 awaitable 就是 promise.await_transform(expr)。

    3、否则,awaitable 就是 expr

    然后,得到awaiter对象,如下:

    1、如果 operator co_await 的重载决议给出了单个最佳重载,则awaiter是该调用的结果(对于成员重载,awaitable.operator co_await(),对于非成员重载,operator co_await(static_cast<Awaitable&&>(awaitable)))

    2、否则,如果重载决议没有找到 operator co_await,则awaiter是可等待的。

    3、否则,如果重载决议不明确,则程序格式错误。

    如果上面的表达式是纯右值,则 awaiter 对象是它的临时物化对象。 否则,如果上面的表达式是一个泛左值,那么 awaiter 对象就是它所引用的对象。

    然后,调用 awaiter.await_ready() (如果知道结果已准备好或可以同步完成,这是避免挂起成本的捷径)。 如果其结果,上下文转换为 bool 为假,则协程被挂起(其协程状态由局部变量和当前挂起点填充)。
    awaiter.await_suspend(handle) 被调用,其中 handle 是表示当前协程的协程句柄。 在该函数内部,挂起的协程状态可以通过该句柄观察到,并且该函数有责任安排它在某个执行程序上恢复或被销毁(返回错误计数作为调度)

    1、如果 await_suspend 返回 void,则立即将控制权返回给当前协程的调用者/恢复者(此协程保持挂起状态)。

    2、如果 await_suspend 返回布尔值,值 true 将控制权返回给当前协程的调用者/恢复者;值 false 恢复当前协程。

    3、如果 await_suspend 返回某个其他协程的协程句柄,则恢复该句柄(通过调用 handle.resume())(注意这可能会链接到最终导致当前协程恢复)

    4、如果 await_suspend 抛出异常,则捕获异常,恢复协程,立即重新抛出异常

    最后,调用 awaiter.await_resume() (无论协程是否挂起),其结果是整个 co_await expr 表达式的结果。

    如果协程在 co_await 表达式中被挂起,然后又被恢复,那么恢复点就在调用 awaiter.await_resume() 之前。

    注意,由于协程在进入 awaiter.await_suspend() 之前已完全挂起,因此该函数可以自由地跨线程传输协程句柄,而无需额外同步。例如,它可以将其放入回调中,计划在异步I/O操作完成时在线程池上运行。在这种情况下,由于当前协同路由可能已经恢复,因此执行了waiter对象的析构函数,当await_suspend()继续在当前线程上执行时,await_suspend()应将*this视为已销毁,并且在句柄发布到其他线程后不访问它。

    Example

    1. #include <coroutine>
    2. #include <iostream>
    3. #include <stdexcept>
    4. #include <thread>
    5. auto switch_to_new_thread(std::jthread& out) {
    6. struct awaitable {
    7. std::jthread* p_out;
    8. bool await_ready() { return false; }
    9. void await_suspend(std::coroutine_handle<> h) {
    10. std::jthread& out = *p_out;
    11. if (out.joinable())
    12. throw std::runtime_error("Output jthread parameter not empty");
    13. out = std::jthread([h] { h.resume(); });
    14. // Potential undefined behavior: accessing potentially destroyed *this
    15. // std::cout << "New thread ID: " << p_out->get_id() << '\n';
    16. std::cout << "New thread ID: " << out.get_id() << '\n'; // this is OK
    17. }
    18. void await_resume() {}
    19. };
    20. return awaitable{&out};
    21. }
    22. struct task{
    23. struct promise_type {
    24. task get_return_object() { return {}; }
    25. std::suspend_never initial_suspend() { return {}; }
    26. std::suspend_never final_suspend() noexcept { return {}; }
    27. void return_void() {}
    28. void unhandled_exception() {}
    29. };
    30. };
    31. task resuming_on_new_thread(std::jthread& out) {
    32. std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << '\n';
    33. co_await switch_to_new_thread(out);
    34. // awaiter destroyed here
    35. std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << '\n';
    36. }
    37. int main() {
    38. std::jthread out;
    39. resuming_on_new_thread(out);
    40. }

     Possible output:

    1. Coroutine started on thread: 140193066129216
    2. New thread ID: 140193045907200
    3. Coroutine resumed on thread: 140193045907200

    注意:awaiter 对象是协程状态的一部分(作为一个临时对象,其生命周期跨越一个暂停点),并在 co_await 表达式完成之前被销毁。 它可用于根据某些异步 I/O API 的要求维护每个操作的状态,而无需求助于额外的堆分配。

    标准库定义了两个简单的等待项:std::suspend_always 和 std::suspend_never。 

    co_yield

     Yield-expression 向调用者返回一个值并暂停当前协程:它是可恢复生成器函数的通用构建块

    1. co_yield expr
    2. co_yield braced-init-list

    它相当于

    co_await promise.yield_value(expr)

    典型的生成器的 yield_value 会将其参数存储(复制/移动或仅存储其地址,因为参数的生命周期跨越 co_await 内的暂停点)到生成器对象中并返回 std::suspend_always,将控制权转移给调用者/恢复者。

    1. #include <coroutine>
    2. #include <exception>
    3. #include <iostream>
    4. template<typename T>
    5. struct Generator {
    6. struct promise_type;
    7. using handle_type = std::coroutine_handle<promise_type>;
    8. struct promise_type { // required
    9. T value_;
    10. std::exception_ptr exception_;
    11. Generator get_return_object() {
    12. return Generator(handle_type::from_promise(*this));
    13. }
    14. std::suspend_always initial_suspend() { return {}; }
    15. std::suspend_always final_suspend() noexcept { return {}; }
    16. void unhandled_exception() { exception_ = std::current_exception(); } // saving
    17. // exception
    18. template<std::convertible_to<T> From> // C++20 concept
    19. std::suspend_always yield_value(From &&from) {
    20. value_ = std::forward<From>(from); // caching the result in promise
    21. return {};
    22. }
    23. void return_void() {}
    24. };
    25. handle_type h_;
    26. Generator(handle_type h) : h_(h) {}
    27. ~Generator() { h_.destroy(); }
    28. explicit operator bool() {
    29. fill();
    30. /*
    31. 唯一可靠地确定我们是否完成协程,是否将通过 C++ getter(下面的operator())在协程中生成下
    32. 一个值(co_yield)的唯一方法是执行/恢复协程,直到下一个 co_yield 点(或让它脱落)。然后
    33. 我们存储/缓存结果以承诺允许getter(下面的operator()在不执行协程的情况下获取它)。
    34. */
    35. return !h_.done();
    36. }
    37. T operator()() {
    38. fill();
    39. full_ = false; // 我们将移出先前缓存的结果以使 promise 再次为空
    40. return std::move(h_.promise().value_);
    41. }
    42. private:
    43. bool full_ = false;
    44. void fill() {
    45. if (!full_) {
    46. h_();
    47. if (h_.promise().exception_)
    48. std::rethrow_exception(h_.promise().exception_);
    49. // 在被调用的上下文中传播协程异常
    50. full_ = true;
    51. }
    52. }
    53. };
    54. Generator<uint64_t>
    55. fibonacci_sequence(unsigned n)
    56. {
    57. if (n==0)
    58. co_return;
    59. if (n>94)
    60. throw std::runtime_error("Too big Fibonacci sequence. Elements would overflow.");
    61. co_yield 0;
    62. if (n==1)
    63. co_return;
    64. co_yield 1;
    65. if (n==2)
    66. co_return;
    67. uint64_t a=0;
    68. uint64_t b=1;
    69. for (unsigned i = 2; i < n; i++)
    70. {
    71. uint64_t s=a+b;
    72. co_yield s;
    73. a=b;
    74. b=s;
    75. }
    76. }
    77. int main()
    78. {
    79. try {
    80. auto gen = fibonacci_sequence(10); // max 94 before uint64_t overflows
    81. for (int j=0; gen; j++)
    82. std::cout << "fib("<<j <<")=" << gen() << '\n';
    83. }
    84. catch (const std::exception& ex)
    85. {
    86. std::cerr << "Exception: " << ex.what() << '\n';
    87. }
    88. catch (...)
    89. {
    90. std::cerr << "Unknown exception.\n";
    91. }
    92. }

    Output:

    1. fib(0)=0
    2. fib(1)=1
    3. fib(2)=1
    4. fib(3)=2
    5. fib(4)=3
    6. fib(5)=5
    7. fib(6)=8
    8. fib(7)=13
    9. fib(8)=21
    10. fib(9)=34

  • 相关阅读:
    逆向分析 工具、加壳、安全防护篇
    RocketMQ—MAC单节点安装下载RocketMQ步骤
    无人机快递(物流)技术方案,无人机快递(物流)基础知识
    分享几个.NET开源的AI和LLM相关项目框架
    【附源码】Python计算机毕业设计特大城市地铁站卫生防疫系统
    CSP-J 2023 入门级 第一轮 阅读程序(3)
    Redis 全景图(3)--- Redis 应用于缓存
    C编译环境和预处理(非常详细,建议收藏)
    CommandInvokationFailure: Failed to update Android SDK package list. 报错的解决方法
    【毕业设计】基于java+SSH+jsp的物资租赁系统设计与实现(毕业论文+程序源码)——物资租赁系统
  • 原文地址:https://blog.csdn.net/qq_32285693/article/details/125594461