无论是系统间集成或是系统内部组件之间通讯,消息(信息流)都是系统设计最重要的因素。EIP将详细的讲述了从消息的角度进行集成设计考虑方方面面,是系统设计重要的参考资料。
Enterprise Integration Patterns(EIP) 对企业集成进行了分析, 并详细介绍了设计、构建和部署消息系统的模式。
图1 消息和集成映射图
EIP中将消息集成结构表示为三个核心组件:端点、通道、消息和三个关键逻辑:多步传递、路由和转换。 从集成抽象的角度来说, 这些概念不仅仅使用于消息系统,也适用于任何系统集成方式的抽象概念。 后面为保持和EIP名词一致,依然使用以上名字,但需要意识到这些同样适用于其他集成模式。
EIP中将消息集成结构表示为以下核心组件:
端点和客户端的区别:客户端实现的是具体中间件的通讯协议,而端点层在客户端和业务层之间,屏蔽客户端的差异性,提供统一的业务访问接口,将业务层的集成请求转换为具体中间件的通讯请求。
图2 EIP 消息系统模式全景图
消息的设计即系统流转的数据流的设计。
系统中流转消息的目的可以分为:
命令消息通常代表着期望得到命令执行的结果或者异常回复。对于传输响应消息的通道可以使用发送消息通道专门对应的响应通道,或则是共享的响应通道。
响应消息涉及两个设计点: 1)响应通道的指定,可以在请求体中包含返回地址,或则以规范预定的方式确定响应。2)响应和请求的关系: 所有响应消息都应该包含可用于确定请求的标识符。
信封包装器将应用程序数据包装在与消息传递基础架构兼容的结构中。消息到达目的地时解包。在设计某种特定的消息总线时,需要设计总线消息的通用结构,具体的应用业务数据结构包装在总线结构中。如RocketMq消息属性包括:主体、消息类型、消息队列、消息ID、过滤标签等。
在企业和企业的集成中, 某一个应用可能接收来自不同业务伙伴的消息,有不同的格式、含义。应引入消息规范器,将消息转换为通用的消息格式再传输至业务处理系统。
图3 消息规范化器
消息格式规范带来的思考是,在进行系统集成(消息集成)时,需要在不同的业务层次上设计不同级别的通用消息格式,要求每个应用程序以通用格式生成和使用消息。相当于这个系统集成生态中的业务消息协议栈。类似于TCP/IP网络模型中的协议栈。
模式名称 | 内容 |
内容丰富器 | 使用专门的转换器Content Enricher来访问外部数据源,以增加缺少信息的消息。 |
内容过滤器 | 使用内容过滤器从邮件中删除不重要的数据项,只留下重要的项目。 |
声明检查 | 在不牺牲信息内容的情况下减少跨系统发送的消息的数据量,消息数据存储在持久存储中,并将声明检查传递给后续组件。这些组件可以使用Claim Check来检索存储的信息。 |
海量消息
有时应用程序想要传输一个非常大的数据结构,这种数据结构可能不适用于单个消息。在这种情况下,将数据分解成更易于管理的块并将它们作为消息序列发送。这些块必须作为一个序列发送,而不仅仅是一堆消息,以便接收者可以重建原始数据结构
图4 消息序列
消息过期
消息内容可能是时间敏感的,因此如果在截止日期之前没有收到消息,则应该忽略并丢弃它。过期消息的处理可以由消费者确认忽略,或由消息系统处理:丢弃或路由到死信通道。
延迟\定期消息
消息的发送可能是基于时间延期像消费者发送。比如定时关闭未支付的订单,可以再创建订单的时候同时发送一条延迟的关闭未支付订单的消息。 当订单收到该消息时根据订单状态是否支付决定是否关闭订单。
RocketMQ的延迟消息原理:
RocketMQ为消息通道定义延迟消息存储的队列,消费消息,按照0到18级别来,0 表示不延迟,1表示延迟1s,大于等于18表示延迟2h按照级别一次类推。1-18级别的默认值的延迟值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m8m 9m 10m 20m 30m 1h 2h”。当发送者指定消息的延迟级别后,RocketMQ将消息放入对应级别的存储队列,通过内置的时间服务检查各自延迟队列中的消息是否达到发送时间,达到后则进行发送。
周期消息
周期性的消息是指消息按照特定的时间周期重复发送。可以建立专门负责周期投递的消费者,在接收到消息后根据消息的周期投递序列将消息再次发送到延期队列中。
可靠性:
如何处理消息系统崩溃或关闭的场景。
图5 发送端和接收端都支持消息持久化。
容错性/健壮性
死信队列
当消息传递系统确定它不能或不应该传递消息时,它可以选择将消息移动到死信通道。 如多次重试后未能成功发送到消费者或未收到消费者的响应。
无效的消息通道
当需要监控接收方接收消息的正确性是可以设置接收方无法处理的消息的特殊通道。接收方将不正确的消息移至该通道。
特定类型通道/受限传输通道
为每种数据类型使用单独的数据类型通道,以便特定通道上的所有数据属于同一类型。消息集成时可以考虑针对通道增加类型或其他的准入限制,以指明通道目的,并对通道进行保护。
通道清理
需要考虑当消费异常情况下消息得不到有效的消费,造成消息积压的情况下如何处理, 可以引入专门的通道清理器,按照清理规则对消息积压通道进行处理。
可扩展性/兼容性
要为无法改造接入消息系统的应用提供消息集成能力,可以考虑增加一个额外的通道适配层负责按照应用协议同应用进行通讯,并适配接入到消息系统。
同理, 如果涉及多个消息中间件的组网融合,可以引入消息适配或者桥接模式在多个消息系统间传递消息。经过桥接的消息网络可以起到企业消息总线的作用。
图6 通道适配和消息桥接
根据消息内容将每条消息路由到正确的收件人。如下图展示的RocketMq基于Tag将交易主题上的消息根据标签分发到不同系统消费者。
图7 Rocket MQ基于Tag的消息过滤功能
模式 | 描述 |
消息过滤器 | 使用一种特殊的消息路由器,即消息过滤器,根据一组标准从通道中消除不需要的消息。 |
动态路由器 | 在基于内容过滤器的基础上,将分发规则可动态配置 |
收件人名单 | 为每个收件人定义一个渠道。然后使用收件人列表检查传入的邮件,确定所需收件人的列表,并将邮件转发到与列表中收件人关联的所有通道。 |
拆分器 | 使用拆分器将复合消息分解为一系列单独的消息,每个消息都包含与一个项目相关的数据。 |
聚合器 | 使用有聚合器收集和存储单个消息,直到收到一组完整的相关消息,并将单个聚合消息发布到输出通道以供进一步处理。 |
排序器 | 使用排序器来收集和重新排序消息,以便它们可以按指定顺序发布到输出通道。 |
组合消息处理器 | 使用组合消息处理器来处理组合消息。组合消息处理器将消息拆分,将子消息路由到适当的目的地,并将响应重新聚合回单个消息。 |
分散-聚集 | 当一条消息需要发送给多个收件人时,使用Scatter-Gather将消息广播给多个收件人并将响应重新聚合回单个消息。 |
路由单 | 将路由单附加到每条消息,指定处理步骤的顺序。用一个特殊的消息路由器包装每个组件,该路由器读取路由单并将消息路由到列表中的下一个组件 |
流程管理器 | 使用一个中央处理单元,一个Process Manager,来维护序列的状态,并根据中间结果确定下一个处理步骤—动态确定路由单 |
消息代理 | 将消息的目的地与发送者分离并保持对消息流的集中控制。将路由实现逻辑提取,集中到消息代理期的角色上。 |
使客户端与消息系统的会话具有事务性,以便客户端可以指定事务边界。事务性客户端具体表现为消息系统提供事务控制的能力来由本地事务控制消息事务。比如由于本地事务的失败,可以回滚消息的发送。
RabbitMQ提供了发送端和接收端的事务控制,RocketMQ提供发送端的事务控制。
图8 RocketMQ事务消息处理流程
RocketMQ事务消息处理流程:
RabbitMQ接收端开启事务消息后,在接收端手工确认的情况下, 必须进行commit,才能移除消息。
// 手动确认
channel.basicAck(deliveryTag, true);
// 提交事务
channel.txCommit();
本地事务控制消息事务 OR 消息事务控制本地事务
上面表述了本地事务可以控制消息事务来完成事务消息的目的。另外一种类似做法是将消息的发送做完本地事务的最后一个步骤,根据消息的发送结果来确定本地事务是否回滚。这种做法在本地事务仅仅和一次消息发送(或远程调用)时可用,但当本地涉及多个消息的发送时则某一个消息的事务将无法控制到其他消息发送的事务。
比如以下处理流程:
Begin Local Transaction A
Send Message m1;
Send Message m2;
Commit Local Transaction A;
消息发送m2的发送失败,可以出发本地事务A的回滚,但无法改变消息m1消息成功发送的事实。
模式 | 描述 |
消息网关 | 这是一个包装特定于消息传递的方法调用并将使用于特定领域域的方法公开给应用程序的类。 |
消息映射器 | 创建一个单独的消息传递映射器,其中包含消息传递基础结构和域对象之间的映射逻辑。类似ORM框架映射的概念 |
轮询消费者 | 应用程序应该使用轮询消费者,当它想要接收消息时显式地进行调用。 |
事件驱动的消费者 | 应用程序应该使用事件驱动的消费者,即在通道上传递消息时自动传递消息。 |
竞争消费者 | 在单个通道上创建多个竞争消费者,以便消费者可以同时处理多个消息。如RocketMQ的消费组。 |
消息调度程序 | 这是从消费者的角度来看,消费者在通道上创建一个消息调度程序,它将使用来自通道的消息并将它们分发给执行者。类似IO多路复用概念 |
选择性消费者 | 消费者成为选择性消费者,它过滤由其通道传递的消息,以便它只接收符合其条件的消息。 |
持久订阅 | 持久订阅者使消息传递系统在订阅者断开连接时保存发布的消息。Apache RocketMQ 统一管理消息的存储时长,无论消息是否被消费,而RabbitMQ可以指定持久化队列和消息持久化来保证持久订阅 |
架构风格或则说是架构结构,表示了系统间的组件及其相关关系。管道和过滤器是一种常用的架构风格。任何对数据\消息\请求的业务处理过程中需要提供一种可扩展的增加额外处理逻辑的场景下都适用该架构风格,如MVC和Spring中的各类过滤器。处理复杂业务逻辑的场景设计可以考虑使用过道和过滤器风格来保障扩展性。
图9 消息系统中管道和过滤器风格的示意图。
管道和过滤器架构风格的核心组件包括:输入和输出、管道和过滤器。上图展示了输入的消息在管道上经过解密、认证、De-Dup过滤器处理后输出消息。实现该架构风格除了管道和过滤器外,还需要一个管道组装器负责将所需的过滤器组装到特定的处理管道上。
不同应用的集成就是不同应用的适配过程,如消息格式的适配,集成技术协议的适配等。可以考虑使用六边形架构风格。六边架构风格或者微内核风格,体现的是系统架构最重要的任务:识别系统的稳定点和不稳定点。
事件驱动架构在具体实现中是指组成应用的组件之间通过事件机制完成业务功能,EDA中各组件以异步方式响应事件。
响应式编程是以异步编程为核心理念编程模型,一个真正的响应式应用是指整个业务流程执行中各个层都使用响应式编程组件来完成,处理流程中的每一步都是异步的。常见的响应式编程框架有:Spring WebFlux(以Reactor框架基础的响应式web框架),Project Reactor,RxJava,Akka。
背压问题:当上游请求过多,下游服务来不及响应,导致 Buffer 溢出的这样一个问题。在响应式编程,由于线程不阻塞,遇到 IO 就会把当前参数和要做的事情缓存起来,这样无疑增大了很多吞吐量,同时内存占用也大了起来,如果不限制的话,很可能发生OutOfMemory异常。
响应式编程目前主要适用于对性能要求特别高的系统,在业务系统开发中并不是主流模型。