• 闭关之 C++ 函数式编程笔记(五):系统设计和程序测试


    第12章 并发系统的函数式设计

    • Code
      • Actor_Web_Service

    12.1 Actor 模型:组件思想

    • Actor 是一个可以接收和发送消息的独立组件
      • 对消息逐个处理
    • 最小的 Actor 类应该具备接收和发送消息的能力
    • C++ 的 Actor 框架
      • 在 actor-framework.org 可以找到该模型的完整实现
      • 代替品,SObjectizer库,性能更好,但是没有内置的跨进程分布式 Actor 的支持
    • 简化类型的 Actor
      • 它不需要关心谁向谁发送消息
        • 将由外部控制器负责
      • 设计如下所述
        • Actor 只能收发一种类型的消息(收发消息类型不一定相同)
          • 如果需要支持输入输出不同类型的消息
            • 使用 std::variantstd::any
        • 在外部的控制器中指定某个 Actor 向哪个 Actor 发送消息,以便以函数式的方式组合 Actor
          • 而不是由 Actor 自行选择发送消息的对象
          • 尾部的控制器可以指定 Actor 的监听源(发送消息的 Actor)
        • 外部的控制器还可以决定哪些消息异步处理,哪些不需要异步处理
    • 注意
      • 现在大多数软件使用了一种事件循环,用于异步传递消息。但这里并不实现这样的系统,这里重点关注易于调整,适用于任何事件驱动的系统设计
    • Actor 分三种
      • 宿 sink
        • 只接收消息的 Actor
      • 源 sources
        • 只发送消息的 Actor
      • 一般 Actor
        • 即可以接收消息,也可以发送消息
    • Code_12_1_1

    12.2 创建简单的消息源

    • 现代 C++ Json 库
      • https://gitee.com/learnlov/mirrors_nlohmann_json.git
    • 继承 std::enable_shared_from_this
      • 允许 std::shared_ptr 管理实例,安全创建自己的共享指针
      • shared_from_this()
        • 创建该实例另一个共享指针
    • Code
      • service.ixx

    12.3 将反应流建模为 monad

    • 异步流(Asynchronous Stream) 或 反应流 (Reactive Stream)
      • 可以有任意数目的值,对于每个新到达的值,都可以调用延续函数
      • 看起来像集合
        • 它们包含相同类型的元素
        • 只是并不是所有的元素都可立即获取
    • 反应流是不是 monad
      • monad 应符合的条件
        • 它必须是一个通用类型
        • 需要一个构造函数,创建包含给定值的反应流实例的函数
        • 需要转换函数,返回反应流的函数,该函数把来自源流的值转换后发出
        • 需要一个连接函数,从给定的流接收所有的消息,并把它们逐个发送
        • 需要遵守 monad 法则
    • 可以执行以下操作使反应流成为 monad
      • 创建一个流转换 Actor
      • 创建一个按给定值创建流的 Actor
      • 创建一个可以同时监听多个流的 Actor,并发送这些流

    12.3.1 创建宿 (sink) 接收消息

    • 宿
      • 是一个只接收消息而不发送的 Actor
    • 单一所有者设计
      • 使用移动语义和右值引用,保证宿对象是 sender 的唯一所有者
      • 相似的,其他的 Actor 变成了它们各自发送者的所有者
        • 者意味着,当流水线销毁时,所有Actor也被消耗
      • 缺点
        • 系统中不能有多个组件监听一个 Actor
        • 也不能在不同的数据流中共享 Actor
      • 解决思路
        • 允许共享 Actor 的所有关系,并允许每个 sender 持有多个监听器
    • Code
      • sink.ixx
      • 分离 sink 和 sink_impl 的原因在于
        • 支持反应流中类 range 语法
        • 两个 sink 函数根据传递参数数目的不同返回不同的类型
        • 如果 sink 不是一个合适的函数,这是很难实现的
    • 为每个创建的转换定义 operator |
      • 每个函数接收任何发送者对象最为第一参数和定义了转换的_helper类作为参数,可以增强 main 函数的可读性
      • Code
        • main.ixx

    12.3.2 转换反应流

    • 把反应流变成一个 monad
    • 最重要的任务是创建 transform 流修改器
      • 它应接收一个反应流和任意一个函数作为参数
      • 并返回一个新流
        • 使用给定的函数对原来的流进行转换,并发送转换后的消息
      • 换言之,transform 转换器是一个即可接收消息又可发送消息的 Actor
    • 流转换器的实现
      • transform.ixx
      • 与宿 Actor 不同, transform 不会立即连接到它的发送者
      • 如果没有人发送消息,也就不需要处理
      • 只有当 on_message 被调用时
        • 也就是当需要监听消息时,才需要监听发送者发送的消息
    • 使用示例
      • main.ixx

    12.3.3 创建给定值的流

    • transform 函数使得反应流成了一个仿函数,为了使它是一个正常的 monad,需要能够从给定的值创建流,而且需要 join 函数
    • 实现一个简单的功能
      • 给定一个值,或一系列值,根据给定值创建一个发送它们的流
        • 这个流不接收任何消息,只是发送消息
    • Code
      • values.ixx
      • 这个类可以用作反应流的 monad 构造函数,
      • 示例
        • 通过向宿对象发送消息,检查它能否正常工作
        • main.ixx
    • std::initializer_list
      • 对 STL 的 container 的初始化

    12.3.4 连接流

    • 把反应流作为 monad 的最后一件事就是定义 join 函数
      • 对于 join 函数,接收的消息都是新的流,需要监听来自这些流消息,并把它们进行传递
    • Code
      • join.ixx
    • 示例
      • main.ixx

    12.4 过滤反应流

    • 为了过滤,创建一个类似于 transform 的流修改器
      • 它可以接收消息并只发送符合谓词要求的消息
      • 与 transform 和 join 不同的是, 过滤器监听和发送的是同一类型的消息
      • 用于需要剔除无效数据或不感兴趣的数据
    • Code
      • filter
    • 示例
      • main.ixx

    12.5 反应流的错误处理

    • 使用 expected 作为错误处理数据结构
      • 添加 mtry 函数
    • Code
      • mtry.ixx
    • 示例
      • main.ixx
    • 注意
      • 这节代码没有跑起来

    12.6 响应客户端

    • 到目前为止,Web 服务只能接收客户端的请求但不能响应
    • 需要创建一个类模板,保存消息和 socket 指针
    • 还需要创建一个 reply 成员函数
      • 可以向客户端发送信息
      template <typename MessageType>
      struct with_client {
          MessageType value;
          tcp::socket *socket;
      
          void reply(const std::string& message) const
          {
              asio::async_write(
                      *socket,
                      asio::buffer(message, message.length()),
                                      [](auto, auto) {});
          }
      };
      
      • 它是一个含有而外信息的通用类型
    • with_client 创建 join 函数
    • 需要逐个修改所有的转换,直到它们全部理解引入的 with_client 类型
    • 应用示例
          auto transform = [](auto f) {
                  return reactive::operators::transform(lift_with_client(f));
              };
          auto filter = [](auto f) {
                  return reactive::operators::filter(apply_with_client(f));
              };
      
          asio::io_service event_loop;
      
          auto pipeline =
              service(event_loop)
                  | transform(trim)
      
                  // Ignoring comments and empty messages
                  | filter([] (const std::string &message) {
                      return message.length() > 0 && message[0] != '#';
                  })
      
                  // Trying to parse the input
                  | transform([] (const std::string &message) {
                      return mtry([&] {
                          return json::parse(message);
                      });
                  })
      
                  // Converting the result into the bookmark
                  | transform([] (const auto& exp) {
                      return mbind(exp, bookmark_from_json);
                  })
      
                  | sink([] (const auto &message) {
                      const auto exp_bookmark = message.value;
      
                      if (!exp_bookmark) {
                          message.reply("ERROR: Request not understood\n");
                          return;
                      }
      
                      if (exp_bookmark->text.find("C++") != std::string::npos) {
                          message.reply("OK: " + to_string(exp_bookmark.get()) + "\n");
                      } else {
                          message.reply("ERROR: Not a C++-related link\n");
                      }
                  });
      
      
    • 注意:
      • 这节的代码也没有跑起来。不过不影响,已经理解其思想

    12.7 创建状态可修改的 Actor

    • 可变状态是实现 join 转换的灵魂
      • 需要保持源的活跃
    • 应用场景
      • 为了保证 service 的响应速度,不能对所有消息指定相同的优先级
      • 客户端试图发起 DoS 攻击
        • 消息限制
          • 如果客户端收到一条需要处理的消息,则在一定时间间隔拒绝后续发来的消息
          • 需要创建一个 Actor 接收消息并记住发送消息的客户端,并限制时间间隔。在一段时间后,才再次接收客户端消息。
          • 这就是 Actor 具有可变状态
        • 在普通并发系统中,可变需要同步,但是在基于 Actor 的系统中却不是这样,Actor 是一个独立于其他 Actor的单线程组件,因此不需要任何同步

    12.8 用 Actor 编写分布式系统

    • Actor 并不关心它们是否位于同一线程、同一进程中的不同线程,同一计算机的不同进程或不同的计算机中,只要它们可以相互发送消息即可
    • 因此 Actor 很容易地扩展服务的模式,无须修改它的主逻辑
    • 如果改为分布式,只需要改变系统的消息投递机制

    第十三章 测试与调试

    13.1 程序编译正确吗?

    • 把错误从运行时转移到编译时
      • 编译时检测到的错误越多,运行时的错误就会越少
    • 示例
      • 使用更强的类型而不是原始值进行编码
      • 创建一个处理距离的类
        template < typename Representation
                , typename Ratio = std::ratio<1>
                >
        class distance {
        public:
            explicit constexpr distance(Representation value)
                : value(value)
            {
            }
        
            distance operator+ (const distance &other) const
            {
                return distance(value + other.value);
            }
        
            Representation value;
        };
        
      • 根据不同的测量单位创建不同的类型
        template <typename Representation>
        using meters = distance<Representation>;
        
        template <typename Representation>
        using kilometers = distance<Representation, std::kilo>;
        
        template <typename Representation>
        using centimeters = distance<Representation, std::centi>;
        
        template <typename Representation>
        using miles = distance<Representation, std::ratio<1609>>;
        
        
        namespace distance_literals {
        
            constexpr kilometers<long double> operator ""_km(long double distance)
            {
                return kilometers<long double>(distance);
            }
        
            constexpr miles<long double> operator ""_mi(long double distance)
            {
                return miles<long double>(distance);
            }
        
        }
        
        
      • 当试图混合匹配不同的单位时,会触发编译错误
      • 这时提供一个转换函数
      • 以这种方式把运行时错误提前到编译期

    13.2 单元测试与纯函数

    13.3 自动产生测试

    13.4 测试基于 monad 的并发系统

    • 这三节的测试方法都是常规技巧
  • 相关阅读:
    多重背包问题 ← 规模小时可转化为0-1背包问题
    windows 下编译redis7 最新版,提供下载地址
    CSS三种样式表、样式表优先级、CSS选择器
    将 RDBMS 恐龙迁移到云端
    在linux下(CentOS7中)配置MySQL5.7数据库,且实现远程访问
    Flask 项目创建:
    如何完成一次 git pr
    2023版 STM32实战1 LED灯驱动(电路与代码都讲解)
    【前端面试】(特别篇)面试准备——相互了解阶段
    Golang基础-面向过程篇
  • 原文地址:https://blog.csdn.net/jiamada/article/details/127126790