• C++异步:libunifex中的concepts详解!


    21fe6125bc6aaaa64f7c4665b72dbbb5.png

    导语 | 本篇我们将深入libunifex的concepts设计的方方面面,结合libunifex中的各种concepts实现,更加深入地了解整个库的实现。希望对此方面感兴趣的开发者们给予一点经验和帮助。

    前言

    在前文《C++异步变化:libunifex实现!》中我们对libunifex的整体实现做了概要性的介绍,本篇我们将深入libunifex的concepts设计的方方面面,结合libunifex中的各种concepts实现,更加深入了解整个库的实现。

    一、Sender/Receiver机制概述

    上一篇中我们也提到过,与普通函数通过return来返回值相比,libunifex中的Sender和Receiver所表达的是这样一种关系:一个作为生产者的Sender对象通过:

    • set_value。

    • set_done。

    • set_error这三个cpo的其中一个来向作为消费者的Receiver对象传递值的。

    531491314ca9019a4cfe3ac3832322ca.png

    如上图所示,类同于一个function或者一个lambda通过return value来返回调用结果,Sender其实就是一个异步任务中的function/lambda,只不过对比普通函数来说,Sender的对应的值传递的时机更复杂,同时也存在三个不同的分支来代表不同的异步处理状态。

    当然,实际我们要做到为异步任务提供标准接口的同时,我们还需要让我们的异步任务能够自由的被各种通用异步算法组合,同时我们还希望相关的实现是具备lazy性质的。


    (一)Structured Concurrency与Lazy Execution

    比较多Execution的文章,都会聊到Lazy Execution,但Lazy Execution感觉更多是结果,这种特性更多的是来源于Structured Concurrency本身的设计。

    这里我们只给出基本的概念,Structed Concurrency会在专门的一篇文章中进行展开。

    我们先来看一下整个execution的概览图:

    d401c3a826715aba3fc1955a1f8f2354.png

    execution相关的设计可以认为是一种从DSL->Compiler->Execute过程都很完备的小型专用语言。对于一个使用structured concurrency来表达的异步操作来说:

    • DSL(BNF组成)-Concurrency Pipeline::=Sender Factory{ '|' Sender Adapter } '|' Receiver。

    • Compiler-通常情况我们可以将connect()看成是编译过程,借由Compiler Time的特性支持,我们可以通过connect()产生runtime所需的OperationState。

    • Execute-这个阶段就很自然了,OperationState的start() 就是DSL本身执行的入口点,当然,执行结果最后是通过:set_value,set_error,set_done这几个 receiver cpos来传递的。

    所以对于lazy evaluate这个特性,因为设计本身就是有明显的DSL->Compiler->Execute过程的,必然就会具备lazy性质了(直到最后一步才执行Execute操作)。

    structured concurrency本身的完备设计和系统性,为我们定制,或者更进一步支持异构计算打下了良好的基础。

    接下来我们来具体的看一下整个connect-start的细节:

    (二)发起一个异步操作

    libunifex用的方式其实并不复杂,通过connect()操作,Sender和Receiver会生成一个中间态的OperationState对象,这个对象负责持有异步任务需要的所有状态和封装所有需要的逻辑。

    对于pipeline组织的异步任务来说,整个OperationState是一步一步按层组织到组织到一起的,每个OperationState负责自己的状态管理,并负责生成每一步的操作结果,整个connect的过程我们可以简单理解为一个树状的OperationState的组织过程。connect()返回的结果则是这个树的根节点,也是一个OperationState。

    到了真正执行OperationState的start()阶段,整个过程就简单了,我们以整个OperationState树的根节点作为入口,依次触发作为子节点OperationState的执行,直到整个异步任务执行完成。

    所以从这里我们其实也能很明显的看到,通过connect-start机制,状态的保存和生命周期控制都交由OperationState来负责了,connect的时候产生相应的OperationState,在start()执行结束后,相应的OperationState即可释放了,很多时候我们甚至可以很方便的在栈上分配一个OperationState,在start结束后再释放它,这样也避免了过多的堆分配导致的性能下降。

    这种构造,首先是很容易产生lazy evaluation的作用,利用生成的OperationState,我们可以将这个OperationState放在任何需要的地方执行,另外,整个OperationState这个时候也形成了一个类似AST(Abstract Syntax Tree)的存在,我们发现lambda post过程中丢失的节点与节点之间的值类型约束,清晰的上下文关系,因为这颗树的存在都解决了,是不是比大家想的要简单? 其实并不简单了,execution提案发展到这一步,大家看看中间各个版本的差异就能感知到了。只不过一些复杂问题的解决,往往到最后大家回头看,相关的方案总是简洁明了,理解起来相对顺畅的。

    相较于lambda post依赖lambda对异步执行的中间状态进行保存的即时方式,connect-start机制中整个异步任务中的状态管理和生命周期控制都借由OperationState的存在具像化了,一方面代码具备了可复用性,另外整个机制的约束性明显也从自由使用,自行保证正确性,向带约束的使用,误用的地方会有明显的compiler time报错转移了,避免大家踩一些不容易发现又容易写错的坑。另外对于lambda post下我们经常使用智能指针来对状态进行保存和传递的方式比,connect-start机制也能允许我们更多的使用性能更高的栈对象来处理异步逻辑。


    (三)Object-state的生命周期

    通过前面的讲述,不难理解,对于connect-start模型来说,一个OperationState的生命周期,其实是在connect()后,到start()调用set_value,set_done,set_error这三者之间的任何一个的时刻,再已经返回异步调用的结果后,我们都应该保证receiver此时是知道对应的OpreationState已经被销毁,任何对OperationState的操作都是非法的了。

    另外需要注意的一点是,OperationState对象应该是不存在move和copy语义的,正常来说,一个OperationState的构建都应该是作为connect()的返回值,作为值类型即时构建并返回的。


    (四)一个异步操作的完成

    libunifex中要完成一个异步任务,们以connect传入的receiver作为首个参数调用set_value,set_done,set_error这三者中的任何一个,相关的异步任务就执行完成了,三者的区别如下:

    • set_value用来表示一个成功执行的异步任务,同时后续会追加0..N个任务执行成功的返回值。

    • set_error毫无疑问用来表示一个执行失败的异步任务了。

    • set_done比较特殊,用来表示set_value和set_error之外的情况,很多时候是表示相关的结果不再需要了,比如A, B两个节点同时执行,我们只需要其中一个节点的结果,A成功执行,我们希望通过取消机制取消B的执行,这种情况如果B本身是一个可取消的节点,那就可以直接返回set_done了。很多时候你可以将set_done当成无返回值,或者返回值为空的情况。

    这里我们需要特别注意一下 set_value(receiver) 和set_done(receiver)之间的区别,前者表达的意思很明确,就是一次成功的无任何返回值的异步调用,而后者则可能表达的是异步任务并未成功执行的情况。


    二、Receiver Concept

    简单来说,任何能够使用set_value,set_error,set_done这三个receiver cpos中的一个的类型都可以作为一个合法的Receiver。

    我们也能将Receiver作为异步任务中的继续点来考虑,通过多层receiver cpos的级联,最终组成了我们的整个异步任务。

    libunifex中并没有单独存在的通用receiver concept,只有几类针对不同receiver cpos的receiver concept 定义:

    • value_receiver<Values...>-一个能够接受set_value(receiver,Values...)的receiver concept定义。

    • error_receiver<Error>-一个能够接受set_error(receiver,Error)的receiver concept定义。

    • done_receiver-一个能够接受set_done(receiver)receiver concept定义。

    相关的代码如下:

    1. namespace unifex
    2. {
    3. // CPOs
    4. inline constexpr unspecified set_value = unspecified;
    5. inline constexpr unspecified set_error = unspecified;
    6. inline constexpr unspecified set_done = unspecified;
    7. template<typename R>
    8. concept __receiver_common =
    9. std::move_constructible<R> &&
    10. std::destructible<R>;
    11. template<typename R>
    12. concept done_receiver =
    13. __receiver_common<R> &&
    14. requires(R&& r) {
    15. set_done((R&&)r);
    16. };
    17. template<typename R, typename... Values>
    18. concept value_receiver =
    19. __receiver_common<R> &&
    20. requires(R&& r, Values&&... values) {
    21. set_value((R&&)r, (Values&&)values...);
    22. };
    23. template<typename R, typename Error>
    24. concept error_receiver =
    25. __receiver_common<R> &&
    26. requires(R&& r, Error&& error) {
    27. set_error((R&&)r, (Error&&)error);
    28. };
    29. }

    这部分代码还是比较好理解的,receiver首先要同时满足可move构造和析构,然后就是done_receiver,value_receiver,error_receiver这个concept的定义了。

    小技巧: 注意上面的:inline constexpr unspecified set_value= unspecified;

    c++20后,很多cpo的定义都会使用这种方式在文档中出现,这基本都是代表对应的声明是一个cpo对象,因为cpo的声明其实并不简单,直接省略掉具体的cpo实现,会方便我们更好的阅读代码,比如上面的set_value, set_error,set_done这三个receiver cpos。

    不同的Sender类型一般有不同的完成通知,所以对于connect()传入的Receiver会有不一样的约束。很多时候我们会根据相关的Sender实现来组合相关的Receiver约束,例如一个Sender如果在不同的分支处理下同时存在set_value,set_done和set_error调用的话,我们需要同时复合三种类型的receiver约束来组合出最终的receiver约束。


    Receiver Query

    同时,Receiver还有一种特殊的用法,用于作为caller和callee传递上下文信息的存在,比如对于get_stop_token(receiver)这个cpo来说,它可以用来查询当前receiver支持的stop_token类型,从而在相应的sender处调用合适的代码来处理相关的逻辑。


    三、Sender Concept

    对比明确的Receiver定义,libunifex中并没有一个通用的Sender约束,这是因为我们编译期没法知道Sender是否触发了三个receiver cpos中的一个,libunifex通过一种间接的方式来对Sender进行约束:

    1. namespace unifex
    2. {
    3. // Sender CPOs
    4. inline constexpr unspecified connect = unspecified;
    5. // Test whether a given sender and receiver can been connected.
    6. template<typename S, typename R>
    7. concept sender_to =
    8. requires(S&& sender, R&& receiver) {
    9. connect((S&&)sender, (R&&)receiver);
    10. };
    11. }

    libunifex通过对connect接受的sender和receiver来间接的约束sender,也就是满足connect()调用的sender即是合法的sender。

    虽然libunifex官方想通过加sender_traits和is_sender<T>的方式来改善相关实现,但在泛型加上不定时机两者的作用下,相关的实现不可能太智能, 可能依然还是库作者向如TypedSender对应的类型traits实现,也就是约束是由对应Sender的实现者来手动维护的。

    四、TypedSender Concept

    TypedSender其实并不是一个标准的concept实现,它仅仅是提供了一种编译期的traits,让我们有能力在编译期查询对应TypedSender能够支持的set_value和set_error的参数类型,使我们可以利用这些信息更好的约束数据在异步节点间的传递,当类型不匹配的情况下,compiler会直接报错。我们来看一个实际的例子:

    1. struct some_typed_sender {
    2. template<template<typename...> class Variant,
    3. template<typename...> class Tuple>
    4. using value_types = Variant<Tuple<int>,
    5. Tuple<std::string, int>,
    6. Tuple<>>;
    7. template<template<typename...> class Variant>
    8. using error_types = Variant<std::exception_ptr>;
    9. static constexpr bool sends_done = true;
    10. ...
    11. };

    对于一个TypedSender实现,我们可以通过它内嵌的类型:

    • value_types-编译期获取其支持的set_value参数类型。

    • error_types-编译期获取其支持的set_error参数类型。

    • sends_done-该bool值用于判断对应sender是否支持以set_done调用结束异步操作。

    对于上面的例子来说,我们容易知道,对应的TypedSender定义可能会对关联的Receiver调用以下函数重载:

    • set_value(R&&,int)

    • set_value(R&&,std::string,int)

    • set_value(R&&)

    • set_error(R&&,std::exception_ptr)

    • set_done(R&&)

    不过我们的代码实现中,一般很少直接使用这些内嵌类型,而是通过sender_traits<Sender>来使用它们。

    比如:typename unifex::sender_traits<Sender>::template value_types<std::variant, std::tuple>


    五、OperationState Concept

    一个OperationState对象包含了一个异步任务的所有内部状态。它同样也不是一个直接的类型约束。

    我们知道一个OperationState是由connect(Sender,Receiver)返回,同时一个OperationState也是不能被move和copy的。对于一个OperationState对象,它只支持两种操作:

    • start()

    • 析构

    一个OperationState的析构只有在未调用start()前,或者在已经发出任务结束通知的情况下才是合法的。

    1. namespace unifex
    2. {
    3. // CPO for starting an async operation
    4. inline constexpr unspecified start = unspecified;
    5. // CPO for an operation-state object.
    6. template<typename T>
    7. concept operation_state =
    8. std::destructible<T> &&
    9. requires(T& operation) {
    10. start(operation);
    11. };
    12. }

    从上面的代码中可以看到, 对于OperationState的约束, 主要就两点:

    • 支持析构

    • 支持start()操作


    六、Scheduler Concept

    之前的execution概览图中,我们其实也能看到,真正负责异常操作执行的,就是Scheduler和更底层的ExecutionContext。libunifex中的Scheduler其实就是一个轻量的Wrapper,真正负责异步任务执行的是底层的Execution Context实现。对于非异构的实现,这里的Execution Context一般代表一个Cpu线程或者一组Cpu线程(线程池),最简单的情况是相关的任务被投递到一个线程上来执行。

    libunifex有两种方式挂接相关的异步任务到Scheduler对象上:

    • 调用schedule(Scheduler) 这个cpo生成一个sender,然后再利用pipeline关联异步操作到这个sender上。

    • via(sender,Scheduler) 和typed_via(sender,Scheduler)这两个sender adapter来中间切换后续任务执行的Execution Context。

    schedule()的实现机制并不复杂,Execution Context是cpu线程来举例说明,底层会保证在关联的线程上执行一个参数类型为void的set_value(),这样因为sender和sender adapter以及receiver的组合机制,在没有via()或者typed_via()等特殊节点存在的情况下,整个pipeline的相关内容都会在这个线程上被执行。这样也就间接的通过一个set_value()的调用位置决定了整个pipeline执行所归属的Execution Context。

    一个Scheduler Concept的定义如下:

    1. namespace unifex
    2. {
    3. // The schedule() CPO
    4. inline constexpr unspecified schedule = {};
    5. // The scheduler concept.
    6. template<typename T>
    7. concept scheduler =
    8. std::is_nothrow_copy_constructible_v<T> &&
    9. std::is_nothrow_move_constructible_v<T> &&
    10. std::destructible<T> &&
    11. std::equality_comparable<T> &&
    12. requires(const T cs, T s) {
    13. schedule(cs); // TODO: Constraint this returns a sender of void.
    14. schedule(s);
    15. };
    16. }

    (一)TimeScheduler

    TimeScheduler是Scheduler的扩展实现,除了Scheduler本身的支持外,TimeScheduler还支持在指定的时间执行异步任务的特性。扩展的能力包括以下:

    • typename TimeScheduler::time_point

    • now(ts) -> time_point

    • schedule_at(ts,time_point) -> sender_of<void>

    • schedule_after(ts,duration) -> sender_of<void>

    其中:

    • now(ts)-用来返回当前时刻点的cpo

    • schedule_at(ts,time_point)-用于产生一个在指定时刻点调度的sender

    • schedule_after(ts,duration)-用于产生一个在duration指定的时间段后调度的sender

    另外,通过time_point类型,用户可以按需定制实现自己的Scheduler来支持自定义的时间单位,以适配复杂的业务需要。

    具体的TimerScheduler的Concept定义如下:

    1. namespace unifex
    2. {
    3. // TimeScheduler CPOs
    4. inline constexpr unspecified now = unspecified;
    5. inline constexpr unspecified schedule_at = unspecified;
    6. inline constexpr unspecified schedule_after = unspecified;
    7. template<typename T>
    8. concept time_scheduler =
    9. scheduler<T> &&
    10. requires(const T scheduler) {
    11. now(scheduler);
    12. schedule_at(scheduler, now(scheduler));
    13. schedule_after(scheduler, now(scheduler) - now(scheduler));
    14. };
    15. }

    七、其他Concepts实现

    (一)StopToken

    cancellation机制相关的concept,又是一个比较难讲清的话题,本篇暂时不具体展开了,有时间会单独阐述这部分的具体实现。


    (二)ManySender/ManyReceiver Concept

    正常来说,一个Sender只会触发一次完成信号,ManySender允许多次触发完成信号,与ManySender对应,也会有ManyReceiver,这部分更多是对Sender/Receiver的一种扩展,本系列的讲述中也不会实际用到它们,这里先直接略过,感兴趣的同学可以自行查阅相关文档。


    (三)Stream Concept

    可以理解为一种lazy性质的ManySender,当消费者主动调用next()的时候,才会产生值,本系列的讲述中也不会直接使用到,这里直接略过了,感兴趣的读者可以自行查阅相关文档。


    八、总结

    可以说几个关键性的cpo:

    • receiver cpos:

      set_value()、set_done()、set_error()

    • start()

    • connect()

    再加上一系列的Concepts定义:

    • Receiver Concept

    • Sender Concept

    • OperationState Concept

    • Scheduler Concept

    • ...

    将整个execution的执行框架勾勒出来了,使整个异步操作有了一个通用的基础框架,而Execution Context则是框架运行的基础,各种Algorithm实现则是细节,后续我们会分别介绍,带大家深入框架运行的基础以及实现的细节,更进一步的了解libunifex的实现。


    参考资料:

    1.libunifex源码库

    2.Madokakaroto-浅谈The C++ Executors

    3.Madokakaroto -The C++ Executors设计与演化的简史

     作者简介

    e450f0645559508de39fdd24db56c47a.jpeg

    沈芳

    腾讯后台开发工程师

    IEG研发效能部开发人员,毕业于华中科技大学。目前负责CrossEngine Server的开发工作,对GamePlay技术比较感兴趣。

     推荐阅读

    万卷共知,一书一页总关情,TVP读书会带你突围阅读迷障!

    C++异步变化:libunifex实现!

    硬核!Redis知识总结,建议收藏

    C++特殊定制:揭秘cpo与tag_invoke!‍

    0f6436b85da542b82409f5dd30fae134.gif

    温馨提示:因公众号平台更改了推送规则,公众号推送的文章文末需要点一下“赞”和“在看”,新的文章才会第一时间出现在你的订阅列表里噢~

  • 相关阅读:
    centos8stream 编译安装 php-rabbit-mq模块
    弃用 ifconfig 吧,你值得收藏的 IpRoute2 简明指南
    DELL服务器,CPU一直会提示温度超过阈值。针对CPU temperature is greater than the upper crit
    qt使用AES加密、解密字符串
    nvidia-1080服务器上离线安装mmdetection2.23步骤
    关于UIScreen.main.bounds.height的值发生了变化的原因和解决方案
    java-php-net-python-银行招聘信息网计算机毕业设计程序
    【JVM基础14】——垃圾回收-强引用、软引用、弱引用、虚引用的区别
    SHELL编程基础2
    实践一年之久,vivo 如何基于 APISIX 进行业务基础架构的演进
  • 原文地址:https://blog.csdn.net/QcloudCommunity/article/details/125611481