• 消息队列系列


    1. 消息队列系列

    1.1. 如何选型

    1.1.1. 消息队列模式

    消息队列目前主要 2 种模式, 分别为"点对点模式"和"发布/订阅模式"。

    1.1.1.1. 点对点模式

    一个具体的消息只能由一个消费者消费。多个生产者可以向同一个消息队列发送消息; 但是, 一个消息在被一个消息者处理的时候, 这个消息在队列上会被锁住或者被移除并且其他消费者无法处理该消息。需要额外注意的是, 如果消费者处理一个消息失败了, 消息系统一般会把这个消息放回队列, 这样其他消费者可以继续处理。

    1.1.1.2. 发布/订阅模式

    单个消息可以被多个订阅者并发的获取和处理。一般来说, 订阅有两种类型:

    • 临时 (ephemeral) 订阅, 这种订阅只有在消费者启动并且运行的时候才存在。一旦消费者退出, 相应的订阅以及尚未处理的消息就会丢失。
    • 持久 (durable) 订阅, 这种订阅会一直存在, 除非主动去删除。消费者退出后, 消息系统会继续维护该订阅, 并且后续消息可以被继续处理。

    1.1.2. 衡量标准

    对消息队列进行技术选型时, 需要通过以下指标衡量你所选择的消息队列, 是否可以满足你的需求:

    • 消息顺序: 发送到队列的消息, 消费时是否可以保证消费的顺序, 比如 A 先下单, B 后下单, 应该是 A 先去扣库存, B 再去扣, 顺序不能反。
    • 消息路由: 根据路由规则, 只订阅匹配路由规则的消息, 比如有 A/B 两者规则的消息, 消费者可以只订阅 A 消息, B 消息不会消费。
    • 消息可靠性: 是否会存在丢消息的情况, 比如有 A/B 两个消息, 最后只有 B 消息能消费, A 消息丢失。
    • 消息时序: 主要包括"消息存活时间"和"延迟/预定的消息", "消息存活时间"表示生产者可以对消息设置 TTL, 如果超过该 TTL, 消息会自动消失; "延迟/预定的消息"指的是可以延迟或者预订消费消息, 比如延时 5 分钟, 那么消息会 5 分钟后才能让消费者消费, 时间未到的话, 是不能消费的。
    • 消息留存: 消息消费成功后, 是否还会继续保留在消息队列。
    • 容错性: 当一条消息消费失败后, 是否有一些机制, 保证这条消息是一种能成功, 比如异步第三方退款消息, 需要保证这条消息消费掉, 才能确定给用户退款成功, 所以必须保证这条消息消费成功的准确性。
    • 伸缩: 当消息队列性能有问题, 比如消费太慢, 是否可以快速支持库容; 当消费队列过多, 浪费系统资源, 是否可以支持缩容。
    • 吞吐量: 支持的最高并发数。

    1.1.3. 消息队列比较

    下图是从网上摘抄过来的, 可以看到主流 MQ 的对比:

    下面简单介绍常用的消息队列:

    • Kafka: Apache Kafka 它最初由 LinkedIn 公司基于独特的设计实现为一个分布式的提交日志系统 ( a distributed commit log), 之后成为 Apache 项目的一部分。号称大数据的杀手锏, 谈到大数据领域内的消息传输, 则绕不开 Kafka, 这款为大数据而生的消息中间件, 以其百万级 TPS 的吞吐量名声大噪, 迅速成为大数据领域的宠儿, 在数据采集、传输、存储的过程中发挥着举足轻重的作用。
    • RabbitMQ: RabbitMQ 2007 年发布, 是使用 Erlang 语言开发的开源消息队列系统, 基于 AMQP 协议来实现。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP 协议更多用在企业系统内, 对数据一致性、稳定性和可靠性要求很高的场景, 对性能和吞吐量的要求还在其次。
    • RocketMQ: 是阿里开源的消息中间件, 它是纯 Java 开发, 具有高吞吐量、高可用性、适合大规模分布式系统应用的特点。RocketMQ 思路起源于 Kafka, 但并不是 Kafka 的一个 Copy, 它对消息的可靠传输及事务性做了优化, 目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog 分发等场景。
    • ActiveMQ: 是 Apache 出品, 最流行的, 能力强劲的开源消息总线。官方社区现在对 ActiveMQ 5.x 维护越来越少, 较少在大规模吞吐的场景中使用, 所以该消息队列也不是我们文章中重点讨论的内容。

    1.1.4. 优缺点

    1.1.4.1. Kafka

    优点:

    • 高吞吐、低延迟: kakfa 最大的特点就是收发消息非常快, kafka 每秒可以处理几十万条消息, 它的最低延迟只有几毫秒;
    • 高伸缩性: 每个主题 (topic) 包含多个分区 (partition), 主题中的分区可以分布在不同的主机 (broker) 中;
    • 持久性、可靠性: Kafka 能够允许数据的持久化存储, 消息被持久化到磁盘, 并支持数据备份防止数据丢失, Kafka 底层的数据存储是基于 Zookeeper 存储的, Zookeeper 我们知道它的数据能够持久存储;
    • 容错性: 非常高, kafka 是分布式的, 一个数据多个副本, 某个节点宕机, Kafka 集群能够正常工作;
    • 消息有序: 消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
    • 有优秀的第三方 Kafka Web 管理界面 Kafka-Manager, 在日志领域比较成熟, 被多家公司和多个开源项目使用;
    • 功能支持: 功能较为简单, 主要支持简单的 MQ 功能, 在大数据领域的实时计算以及日志采集被大规模使用。

    缺点:

    • Kafka 单机超过 64 个队列/分区, Load 会发生明显的飙高现象, 队列越多, load 越高, 发送消息响应时间变长;
    • 使用短轮询方式, 实时性取决于轮询间隔时间;
    • 消费失败不支持重试;
    • 支持消息顺序, 但是一台代理宕机后, 就会产生消息乱序;
    • 社区更新较慢。

    总结:

    • Kafka 主要特点是基于 Pull 的模式来处理消息消费, 追求高吞吐量, 一开始的目的就是用于日志收集和传输, 适合产生大量数据的互联网服务的数据收集业务。
    • 大型公司建议可以选用, 如果有日志采集功能, 肯定是首选 kafka。

    1.1.4.2. RabbitMQ

    优点:

    • 异步消息传递: 支持多种消息协议, 消息队列, 传送确认, 灵活的路由到队列, 多种交换类型;
    • 支持几乎所有最受欢迎的编程语言: Java, C, C ++, C#, Ruby, Perl, Python, PHP 等等;
    • 可以部署为高可用性和吞吐量的集群; , 跨多个可用区域和区域进行联合;
    • 可插入的身份验证, 授权, 支持 TLS 和 LDAP;
    • 提供了许多插件, 来从多方面进行扩展, 也可以编写自己的插件;
    • 提供了一个易用的用户界面, 使得用户可以监控和管理消息 Broker, 社区活跃度高。

    缺点:

    • erlang 开发, 很难去看懂源码, 基本职能依赖于开源社区的快速维护和修复 bug, 不利于做二次开发和维护;
    • RabbitMQ 确实吞吐量会低一些, 这是因为他做的实现机制比较重;
    • 需要学习比较复杂的接口和协议, 学习和维护成本较高。

    总结:

    • 结合 erlang 语言本身的并发优势, 性能较好, 社区活跃度也比较高, 但是不利于做二次开发和维护。不过 RabbitMQ 的社区十分活跃, 可以解决开发过程中遇到的 bug。
    • 如果你的数据量没有那么大, 小公司优先选择功能比较完备的 RabbitMQ。

    1.1.4.3. RocketMQ

    优点:

    • 支持发布/订阅 (Pub/Sub) 和点对点 (P2P) 消息模型;
    • 在一个队列中可靠的先进先出 (FIFO) 和严格的顺序传递;
    • 支持拉 (pull) 和推 (push) 两种消息模式;
    • 单一队列百万消息的堆积能力;
    • 支持多种消息协议, 如 JMS、MQTT 等;
    • 分布式高可用的部署架构, 满足至少一次消息传递语义;
    • 提供 docker 镜像用于隔离测试和云集群部署;
    • 提供配置、指标和监控等功能丰富的 Dashboard。

    缺点:

    • 支持的客户端语言不多, 目前是 java 及 c++, 其中 c++不成熟
    • 社区活跃度一般
    • 没有在 mq 核心中去实现 JMS 等接口, 有些系统要迁移需要修改大量代码

    总结:

    • 天生为金融互联网领域而生, 对于可靠性要求很高的场景, 尤其是电商里面的订单扣款, 以及业务削峰, 在大量交易涌入时, 后端可能无法及时处理的情况。
    • RoketMQ 在稳定性上可能更值得信赖, 这些业务场景在阿里双 11 已经经历了多次考验, 如果你的业务有上述并发场景, 建议可以选择 RocketMQ。

    1.1.4.4. ActiveMQ

    优点:

    • 支持来自 Java, C, C ++, C#, Ruby, Perl, Python, PHP 的各种跨语言客户端和协议;
    • 完全支持 JMS 客户端和 Message Broker 中的企业集成模式;
    • 支持许多高级功能, 如消息组, 虚拟目标, 通配符和复合目标;
    • 完全支持 JMS 1.1 和 J2EE 1.4, 支持瞬态, 持久, 事务和 XA 消息;
    • Spring 支持, 以便 ActiveMQ 可以轻松嵌入到 Spring 应用程序中, 并使用 Spring 的 XML 配置机制进行配置;
    • 专为高性能集群, 客户端 - 服务器, 基于对等的通信而设计;
    • CXF 和 Axis 支持, 以便 ActiveMQ 可以轻松地放入这些 Web 服务堆栈中以提供可靠的消息传递;
    • 可以用作内存 JMS 提供程序, 非常适合单元测试 JMS;
    • 支持可插拔传输协议, 例如 in-VM, TCP, SSL, NIO, UDP, 多播, JGroups 和 JXTA 传输;
    • 使用 JDBC 和高性能日志支持非常快速的持久性。

    缺点:

    • 官方社区现在对 ActiveMQ 5.x 维护越来越少, 较少在大规模吞吐的场景中使用。

    1.2. 原理初探之 Kafka

    Kafka 是由 Linkedin 公司开发的, 它是一个分布式的, 支持多分区、多副本, 基于 Zookeeper 的分布式消息流平台, 它同时也是一款开源的基于发布订阅模式的消息引擎系统。

    1.2.1. 基本概念

    • 消息: Kafka 中的数据单元被称为消息, 也被称为记录, 可以把它看作数据库表中某一行的记录。
    • 批次: 为了提高效率, 消息会分批次写入 Kafka, 批次就代指的是一组消息。
    • 主题: 消息的种类称为 主题 (Topic), 可以说一个主题代表了一类消息, 相当于是对消息进行分类。主题就像是数据库中的表。
    • 分区: 主题可以被分为若干个分区 (partition), 同一个主题中的分区可以不在一个机器上, 有可能会部署在多个机器上, 由此来实现 kafka 的伸缩性, 单一主题中的分区有序, 但是无法保证主题中所有的分区有序。
    • 生产者: 向主题发布消息的客户端应用程序称为生产者 (Producer), 生产者用于持续不断的向某个主题发送消息。
    • 消费者: 订阅主题消息的客户端程序称为消费者 (Consumer), 消费者用于处理生产者产生的消息。
    • 消费者群组: 生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样, 一个厨师对应多个顾客, 也就是一个生产者对应多个消费者, 消费者群组 (Consumer Group) 指的就是由一个或多个消费者组成的群体。
    • 偏移量: 偏移量 (Consumer Offset) 是一种元数据, 它是一个不断递增的整数值, 用来记录消费者发生重平衡时的位置, 以便用来恢复数据。
    • broker: 一个独立的 Kafka 服务器就被称为 broker, broker 接收来自生产者的消息, 为消息设置偏移量, 并提交消息到磁盘保存。
    • broker 集群: broker 是集群 的组成部分, broker 集群由一个或多个 broker 组成, 每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。
    • 副本: Kafka 中消息的备份又叫做 副本 (Replica), 副本的数量是可以配置的, Kafka 定义了两类副本: 领导者副本 (Leader Replica) 和 追随者副本 (Follower Replica), 前者对外提供服务, 后者只是被动跟随。
    • 重平衡: Rebalance。消费者组内某个消费者实例挂掉后, 其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

    1.2.2. 系统架构

    一个典型的 Kafka 集群中包含若干 Producer(可以是 web 前端产生的 Page View, 或者是服务器日志, 系统 CPU、Memory 等), 若干 broker(Kafka 支持水平扩展, 一般 broker 数量越多, 集群吞吐率越高), 若干 Consumer Group, 以及一个 Zookeeper 集群。Kafka 通过 Zookeeper 管理集群配置, 选举 leader, 以及在 Consumer Group 发生变化时进行 rebalance。Producer 使用 push 模式将消息发布到 broker, Consumer 使用 pull 模式从 broker 订阅并消费消息。

    1.2.3. 生产者

    1.2.3.1. 数据执行流程

    在 Kafka 中, 我们把产生消息的那一方称为生产者, 比如我们经常回去淘宝购物, 你打开淘宝的那一刻, 你的登陆信息, 登陆次数都会作为消息传输到 Kafka 后台, 当你浏览购物的时候, 你的浏览信息, 你的搜索指数, 你的购物爱好都会作为一个个消息传递给 Kafka 后台, 然后淘宝会根据你的爱好做智能推荐, 致使你的钱包从来都禁不住诱惑, 那么这些生产者产生的消息是怎么传到 Kafka 应用程序的呢? 发送过程是怎么样的呢?

    尽管消息的产生非常简单, 但是消息的发送过程还是比较复杂的, 如图:

    我们从创建一个 ProducerRecord 对象开始, ProducerRecord 是 Kafka 中的一个核心类, 它代表了一组 Kafka 需要发送的 key/value 键值对, 它由记录要发送到的主题名称 (Topic Name), 可选的分区号 (Partition Number) 以及可选的键值对构成。在发送 ProducerRecord 时, 我们需要将键值对对象由序列化器转换为字节数组, 这样它们才能够在网络上传输。然后消息到达了分区器。如果发送过程中指定了有效的分区号, 那么在发送记录时将使用该分区。如果发送过程中未指定分区, 则将使用 key 的 hash 函数映射指定一个分区。如果发送的过程中既没有分区号也没有, 则将以循环的方式分配一个分区。选好分区后, 生产者就知道向哪个主题和分区发送数据了。ProducerRecord 还有关联的时间戳, 如果用户没有提供时间戳, 那么生产者将会在记录中使用当前的时间作为时间戳。Kafka 最终使用的时间戳取决于 topic 主题配置的时间戳类型。然后, 这条消息被存放在一个记录批次里, 这个批次里的所有消息会被发送到相同的主题和分区上。由一个独立的线程负责把它们发到 Kafka Broker 上。Kafka Broker 在收到消息时会返回一个响应, 如果写入成功, 会返回一个 RecordMetaData 对象, 它包含了主题和分区信息, 以及记录在分区里的偏移量, 上面两种的时间戳类型也会返回给用户。如果写入失败, 会返回一个错误。生产者在收到错误之后会尝试重新发送消息, 几次之后如果还是失败的话, 就返回错误消息。

    上面写的有点多, 总结一下流程: 创建对象(主题、分区、key/value)-> 序列化数据 -> 到达分区(可自己指定, 也可以通过 key hash)-> 放入批次(相同主题和分区) -> 独立线程发送 -> 返回主题/分区/分区偏移量/时间戳。

    1.2.3.2. 分区策略

    Kafka 对于数据的读写是以分区为粒度的, 分区可以分布在多个主机 (Broker) 中, 这样每个节点能够实现独立的数据写入和读取, 并且能够通过增加新的节点来增加 Kafka 集群的吞吐量, 通过分区部署在多个 Broker 来实现负载均衡的效果, 下面我们看看数据如何选择分区。

    • 方式 1: 顺序轮询顺序分配, 消息是均匀的分配给每个 partition, 即每个分区存储一次消息, 见下图。轮训策略是 Kafka Producer 提供的默认策略, 如果你不使用指定的轮训策略的话, Kafka 默认会使用顺序轮训策略的方式。

    • 方式 2: 随机轮询本质上看随机策略也是力求将数据均匀地打散到各个分区, 但从实际表现来看, 它要逊于轮询策略, 所以如果追求数据的均匀分布, 还是使用轮询策略比较好。事实上, 随机策略是老版本生产者使用的分区策略, 在新版本中已经改为轮询了。

    • 方式 3: key hash 这个策略也叫做 key-ordering 策略, Kafka 中每条消息都会有自己的 key, 一旦消息被定义了 Key, 那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面, 由于每个分区下的消息处理都是有顺序的, 故这个策略被称为按消息键保序策略, 如下图所示:

    1.2.4. 消费者

    1.2.4.1. 消费者群组

    应用程序使用 KafkaConsumer 从 Kafka 中订阅主题并接收来自这些主题的消息, 然后再把他们保存起来。应用程序首先需要创建一个 KafkaConsumer 对象, 订阅主题并开始接受消息, 验证消息并保存结果。一段时间后, 生产者往主题写入的速度超过了应用程序验证数据的速度, 这时候该如何处理? 如果只使用单个消费者的话, 应用程序会跟不上消息生成的速度, 就像多个生产者像相同的主题写入消息一样, 这时候就需要多个消费者共同参与消费主题中的消息, 对消息进行分流处理。Kafka 消费者从属于消费者群组。一个群组中的消费者订阅的都是相同的主题, 每个消费者接收主题一部分分区的消息。下面是一个 Kafka 分区消费示意图。

    上图中的主题 T1 有四个分区, 分别是分区 0、分区 1、分区 2、分区 3, 我们创建一个消费者群组 1, 消费者群组中只有一个消费者, 它订阅主题 T1, 接收到 T1 中的全部消息。由于一个消费者处理四个生产者发送到分区的消息, 压力有些大, 需要帮手来帮忙分担任务, 于是就演变为下图

    这样一来, 消费者的消费能力就大大提高了, 但是在某些环境下比如用户产生消息特别多的时候, 生产者产生的消息仍旧让消费者吃不消, 那就继续增加消费者。
    如上图所示, 每个分区所产生的消息能够被每个消费者群组中的消费者消费, 如果向消费者群组中增加更多的消费者, 那么多余的消费者将会闲置, 如下图所示。

    向群组中增加消费者是横向伸缩消费能力的主要方式。总而言之, 我们可以通过增加消费组的消费者来进行水平扩展提升消费能力。这也是为什么建议创建主题时使用比较多的分区数, 这样可以在消费负载高的情况下增加消费者来提升性能。另外, 消费者的数量不应该比分区数多, 因为多出来的消费者是空闲的, 没有任何帮助。

    Kafka 一个很重要的特性就是, 只需写入一次消息, 可以支持任意多的应用读取这个消息。换句话说, 每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息, 应用需要有不同的消费组。对于上面的例子, 假如我们新增了一个新的消费组 G2, 而这个消费组有两个消费者, 那么就演变为下图这样。在这个场景中, 消费组 G1 和消费组 G2 都能收到 T1 主题的全量消息, 在逻辑意义上来说它们属于不同的应用。

    总结起来就是如果应用需要读取全量消息, 那么请为该应用设置一个消费组; 如果该应用消费能力不足, 那么可以考虑在这个消费组里增加消费者。

    1.2.4.2. 消费者重平衡

    我们从上面的消费者演变图中可以知道这么一个过程: 最初是一个消费者订阅一个主题并消费其全部分区的消息, 后来有一个消费者加入群组, 随后又有更多的消费者加入群组, 而新加入的消费者实例分摊了最初消费者的部分消息, 这种把分区的所有权通过一个消费者转到其他消费者的行为称为重平衡, 英文名也叫做 Rebalance 。如下图所示。

    重平衡非常重要, 它为消费者群组带来了高可用性 和 伸缩性, 我们可以放心的添加消费者或移除消费者, 不过在正常情况下我们并不希望发生这样的行为。在重平衡期间, 消费者无法读取消息, 造成整个消费者组在重平衡的期间都不可用。另外, 当分区被重新分配给另一个消费者时, 消息当前的读取状态会丢失, 它有可能还需要去刷新缓存, 在它重新恢复状态之前会拖慢应用程序。

    消费者通过向组织协调者 (Kafka Broker) 发送心跳来维护自己是消费者组的一员并确认其拥有的分区。对于不同不的消费群体来说, 其组织协调者可以是不同的。只要消费者定期发送心跳, 就会认为消费者是存活的并处理其分区中的消息。当消费者检索记录或者提交它所消费的记录时就会发送心跳。如果过了一段时间 Kafka 停止发送心跳了, 会话 (Session) 就会过期, 组织协调者就会认为这个 Consumer 已经死亡, 就会触发一次重平衡。如果消费者宕机并且停止发送消息, 组织协调者会等待几秒钟, 确认它死亡了才会触发重平衡。在这段时间里, 死亡的消费者将不处理任何消息。在清理消费者时, 消费者将通知协调者它要离开群组, 组织协调者会触发一次重平衡, 尽量降低处理停顿。

    重平衡是一把双刃剑, 它为消费者群组带来高可用性和伸缩性的同时, 还有有一些明显的缺点 (bug), 而这些 bug 到现在社区还无法修改。重平衡的过程对消费者组有极大的影响。因为每次重平衡过程中都会导致万物静止, 参考 JVM 中的垃圾回收机制, 也就是 Stop The World , STW。也就是说, 在重平衡期间, 消费者组中的消费者实例都会停止消费, 等待重平衡的完成, 而且重平衡这个过程很慢。…

    1.2.5. 特性分析

    这里才是内容的重点, 不仅需要知道 Kafka 的特性, 还需要知道支持这些特性的原因:

    • 消息路由(不支持): Kafka 在处理消息之前是不允许消费者过滤一个主题中的消息。一个订阅的消费者在没有异常情况下会接受一个分区中的所有消息。
    • 消息有序(支持): 当消费消息时, 如果消费失败, 消息不会被放回, 所以整个消费过程都是有序进行;
    • 消息时序(不支持): 消息直接发送, 不会延迟发送, 或者指定消息的 TTL。
    • 容错处理(集群支持/消息不支持): 集群容错能力高, 因为是分布式部署, 但是消息容错处理弱, 因为消息消费失败, 需要程序员手动处理, Kafka 不支持消息重新进行消费。
    • 伸缩(非常好): 通过扩充分区和消费者数量, 实现分区扩容, 并提升消费速度。
    • 持久化(非常好): 数据存储在磁盘, 可以随时订阅消费, 消费完后, 数据仍然保留。
    • 消息回溯(支持): 因为消息支持持久化, 就支持回溯, 可以理解是附带的功能。
    • 高吞吐(非常好): 因为 Kafka 内部同一个主题包含多个分区, 所以实现分布式存储, 然后消费者数量可以扩充到和分区数量一致, 保证了 Kafka 的高吞吐。

    1.3. 原理初探之 RabbitMQ

    RabbitMQ 是使用 Erlang 语言来编写的, 并且 RabbitMQ 是基于 AMQP 协议的。Erlang 语言在数据交互方面性能优秀, 有着和原生 Socket 一样的延迟, 这也是 RabbitMQ 高性能的原因所在。可谓"人如其名", RabbitMQ 像兔子一样迅速。

    1.3.1. 基本概念

    提到 RabbitMQ, 就不得不提 AMQP 协议。AMQP 协议是具有现代特征的二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议, 是应用层协议的一个开放标准, 为面向消息的中间件设计。先了解一下 AMQP 协议中间的几个重要概念:

    • Server: 接收客户端的连接, 实现 AMQP 实体服务。
    • Connection: 连接, 应用程序与 Server 的网络连接, TCP 连接。
    • Channel: 信道, 消息读写等操作在信道中进行。客户端可以建立多个信道, 每个信道代表一个会话任务。
    • Message: 消息, 应用程序和服务器之间传送的数据, 消息可以非常简单, 也可以很复杂。由 Properties 和 Body 组成。Properties 为外包装, 可以对消息进行修饰, 比如消息的优先级、延迟等高级特性; Body 就是消息体内容。
    • Virtual Host: 虚拟主机, 用于逻辑隔离。一个虚拟主机里面可以有若干个 Exchange 和 Queue, 同一个虚拟主机里面不能有相同名称的 Exchange 或 Queue。
    • Exchange: 交换器, 接收消息, 按照路由规则将消息路由到一个或者多个队列。如果路由不到, 或者返回给生产者, 或者直接丢弃。RabbitMQ 常用的交换器常用类型有 direct、topic、fanout、headers 四种, 后面详细介绍。
    • Binding: 绑定, 交换器和消息队列之间的虚拟连接, 绑定中可以包含一个或者多个 RoutingKey。
    • RoutingKey: 路由键, 生产者将消息发送给交换器的时候, 会发送一个 RoutingKey, 用来指定路由规则, 这样交换器就知道把消息发送到哪个队列。路由键通常为一个".“分割的字符串, 例如"com.rabbitmq”。
    • Queue: 消息队列, 用来保存消息, 供消费者消费。

    1.3.2. 系统架构

    1.3.2.1. 整体架构

    AMQP 协议模型由三部分组成: 生产者、消费者和服务端。生产者是投递消息的一方, 首先连接到 Server, 建立一个连接, 开启一个信道; 然后生产者声明交换器和队列, 设置相关属性, 并通过路由键将交换器和队列进行绑定。同理, 消费者也需要进行建立连接, 开启信道等操作, 便于接收消息。接着生产者就可以发送消息, 发送到服务端中的虚拟主机, 虚拟主机中的交换器根据路由键选择路由规则, 然后发送到不同的消息队列中, 这样订阅了消息队列的消费者就可以获取到消息, 进行消费。
    总结一下整体过程: 生产者投递消息 -> 和 Server 建立连接, 开启信道 -> 声明交换器和队列, 并通过路由键将交换机和队列绑定 -> 投递消息到虚拟主机 -> 消息发送到消息队列 -> 消费者建立连接 -> 消费消息 -> 关系信道和连接。

    1.3.3. 常用交换器

    RabbitMQ 常用的交换器类型有 direct、topic、fanout、headers 四种:

    • Direct Exchange: 见文知意, 直连交换机意思是此交换机需要绑定一个队列, 要求该消息与一个特定的路由键完全匹配。简单点说就是一对一的, 点对点的发送。
    • Fanout Exchange: 这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播, 每台子网内的主机都获得了一份复制的消息。简单点说就是发布订阅。
    • Topic Exchange: 直接翻译的话叫做主题交换机, 如果从用法上面翻译可能叫通配符交换机会更加贴切。这种交换机是使用通配符去匹配, 路由到对应的队列。通配符有两种: “*” 、 “#”。需要注意的是通配符前面必须要加上"."符号。
    • *符号: 有且只匹配一个词。比如 a. *可以匹配到"a.b"、“a.c”, 但是匹配不了"a.b.c"。
    • #符号: 匹配一个或多个词。比如"rabbit.#“既可以匹配到"rabbit.a.b”、“rabbit.a”, 也可以匹配到"rabbit.a.b.c"。
    • Headers Exchange: 这种交换机用的相对没这么多。它跟上面三种有点区别, 它的路由不是用 routingKey 进行路由匹配, 而是在匹配请求头中所带的键值进行路由。创建队列需要设置绑定的头部信息, 有两种模式: 全部匹配和部分匹配。如上图所示, 交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值, 路由到对应的队列。

    1.3.4. 消费原理

    我们先看几个基本概念:

    • broker: 每个节点运行的服务程序, 功能为维护该节点的队列的增删以及转发队列操作请求。
    • master queue: 每个队列都分为一个主队列和若干个镜像队列。
    • mirror queue: 镜像队列, 作为 master queue 的备份。在 master queue 所在节点挂掉之后, 系统把 mirror queue 提升为 master queue, 负责处理客户端队列操作请求。注意, mirror queue 只做镜像, 设计目的不是为了承担客户端读写压力。

    集群中有两个节点, 每个节点上有一个 broker, 每个 broker 负责本机上队列的维护, 并且 borker 之间可以互相通信。集群中有两个队列 A 和 B, 每个队列都分为 master queue 和 mirror queue(备份)。那么队列上的生产消费怎么实现的呢?

    对于消费队列, 如下图有两个 consumer 消费队列 A, 这两个 consumer 连在了集群的不同机器上。RabbitMQ 集群中的任何一个节点都拥有集群上所有队列的元信息, 所以连接到集群中的任何一个节点都可以, 主要区别在于有的 consumer 连在 master queue 所在节点, 有的连在非 master queue 节点上。因为 mirror queue 要和 master queue 保持一致, 故需要同步机制, 正因为一致性的限制, 导致所有的读写操作都必须都操作在 master queue 上(想想, 为啥读也要从 master queue 中读? 和数据库读写分离是不一样的), 然后由 master 节点同步操作到 mirror queue 所在的节点。即使 consumer 连接到了非 master queue 节点, 该 consumer 的操作也会被路由到 master queue 所在的节点上, 这样才能进行消费。

    对于生成队列, 原理和消费一样, 如果连接到非 master queue 节点, 则路由过去。

    所以, 到这里小伙伴们就可以看到 RabbitMQ 的不足: 由于 master queue 单节点, 导致性能瓶颈, 吞吐量受限。虽然为了提高性能, 内部使用了 Erlang 这个语言实现, 但是终究摆脱不了架构设计上的致命缺陷。

    1.3.5. 高级特性

    1.3.5.1. 过期时间

    Time To Live, 也就是生存时间, 是一条消息在队列中的最大存活时间, 单位是毫秒, 下面看看 RabbitMQ 过期时间特性:

    • RabbitMQ 可以对消息和队列设置 TTL。
    • RabbitMQ 支持设置消息的过期时间, 在消息发送的时候可以进行指定, 每条消息的过期时间可以不同。
    • RabbitMQ 支持设置队列的过期时间, 从消息入队列开始计算, 直到超过了队列的超时时间配置, 那么消息会变成死信, 自动清除。
    • 如果两种方式一起使用, 则过期时间以两者中较小的那个数值为准。
    • 当然也可以不设置 TTL, 不设置表示消息不会过期; 如果设置为 0, 则表示除非此时可以直接将消息投递到消费者, 否则该消息将被立即丢弃。

    1.3.5.2. 消息确认

    为了保证消息从队列可靠地到达消费者, RabbitMQ 提供了消息确认机制。消费者订阅队列的时候, 可以指定 autoAck 参数, 当 autoAck 为 true 的时候, RabbitMQ 采用自动确认模式, RabbitMQ 自动把发送出去的消息设置为确认, 然后从内存或者硬盘中删除, 而不管消费者是否真正消费到了这些消息。当 autoAck 为 false 的时候, RabbitMQ 会等待消费者回复的确认信号, 收到确认信号之后才从内存或者磁盘中删除消息。消息确认机制是 RabbitMQ 消息可靠性投递的基础, 只要设置 autoAck 参数为 false, 消费者就有足够的时间处理消息, 不用担心处理消息的过程中消费者进程挂掉后消息丢失的问题。

    1.3.5.3. 持久化

    消息的可靠性是 RabbitMQ 的一大特色, 那么 RabbitMQ 是如何保证消息可靠性的呢? 答案就是消息持久化。持久化可以防止在异常情况下丢失数据。RabbitMQ 的持久化分为三个部分: 交换器持久化、队列持久化和消息的持久化。交换器持久化可以通过在声明队列时将 durable 参数设置为 true。如果交换器不设置持久化, 那么在 RabbitMQ 服务重启之后, 相关的交换器元数据会丢失, 不过消息不会丢失, 只是不能将消息发送到这个交换器了。队列的持久化能保证其本身的元数据不会因异常情况而丢失, 但是不能保证内部所存储的消息不会丢失。要确保消息不会丢失, 需要将其设置为持久化。队列的持久化可以通过在声明队列时将 durable 参数设置为 true。设置了队列和消息的持久化, 当 RabbitMQ 服务重启之后, 消息依然存在。如果只设置队列持久化或者消息持久化, 重启之后消息都会消失。当然, 也可以将所有的消息都设置为持久化, 但是这样做会影响 RabbitMQ 的性能, 因为磁盘的写入速度比内存的写入要慢得多。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。鱼和熊掌不可兼得, 关键在于选择和取舍。在实际中, 需要根据实际情况在可靠性和吞吐量之间做一个权衡。

    1.3.5.4. 死信队列

    当消息在一个队列中变成死信之后, 他能被重新发送到另一个交换器中, 这个交换器成为死信交换器, 与该交换器绑定的队列称为死信队列。消息变成死信有下面几种情况:

    • 消息被拒绝。
    • 消息过期
    • 队列达到最大长度

    DLX 也是一个正常的交换器, 和一般的交换器没有区别, 他能在任何的队列上面被指定, 实际上就是设置某个队列的属性。当这个队列中有死信的时候, RabbitMQ 会自动将这个消息重新发送到设置的交换器上, 进而被路由到另一个队列, 我们可以监听这个队列中消息做相应的处理。死信队列有什么用? 当发生异常的时候, 消息不能够被消费者正常消费, 被加入到了死信队列中。后续的程序可以根据死信队列中的内容分析当时发生的异常, 进而改善和优化系统。

    1.3.5.5. 延迟队列

    一般的队列, 消息一旦进入队列就会被消费者立即消费。延迟队列就是进入该队列的消息会被消费者延迟消费, 延迟队列中存储的对象是的延迟消息, "延迟消息"是指当消息被发送以后, 等待特定的时间后, 消费者才能拿到这个消息进行消费。延迟队列用于需要延迟工作的场景。最常见的使用场景: 淘宝或者天猫我们都使用过, 用户在下单之后通常有 30 分钟的时间进行支付, 如果这 30 分钟之内没有支付成功, 那么订单就会自动取消。除了延迟消费, 延迟队列的典型应用场景还有延迟重试。比如消费者从队列里面消费消息失败了, 可以延迟一段时间以后进行重试。3.5 特性分析这里才是内容的重点, 不仅需要知道 RabbitMQ 的特性, 还需要知道支持这些特性的原因:

    • 消息路由(支持): RabbitMQ 可以通过不同的交换器支持不同种类的消息路由;
    • 消息有序(不支持): 当消费消息时, 如果消费失败, 消息会被放回队列, 然后重新消费, 这样会导致消息无序;
    • 消息时序(非常好): 通过延时队列, 可以指定消息的延时时间, 过期时间 TTL 等;
    • 容错处理(非常好): 通过交付重试和死信交换器 (DLX) 来处理消息处理故障;
    • 伸缩(一般): 伸缩其实没有非常智能, 因为即使伸缩了, master queue 还是只有一个, 负载还是只有这一个 master queue 去抗, 所以我理解 RabbitMQ 的伸缩很弱(个人理解)。
    • 持久化(不太好): 没有消费的消息, 可以支持持久化, 这个是为了保证机器宕机时消息可以恢复, 但是消费过的消息, 就会被马上删除, 因为 RabbitMQ 设计时, 就不是为了去存储历史数据的。
    • 消息回溯(支持): 因为消息不支持永久保存, 所以自然就不支持回溯。
    • 高吞吐(中等): 因为所有的请求的执行, 最后都是在 master queue, 它的这个设计, 导致单机性能达不到十万级的标准。

    1.4. 原理初探之 RocketMQ

    RocketMQ 是一个纯 Java、分布式、队列模型的开源消息中间件, 前身是 MetaQ, 是阿里参考 Kafka 特点研发的一个队列模型的消息中间件, 后开源给 apache 基金会成为了 apache 的顶级开源项目, 具有高性能、高可靠、高实时、分布式特点。

    1.4.1. 基本概念

    先对常用的词汇有个基本认识, 相关词汇后面会再详细介绍:

    • NameServer: 一个功能齐全的服务器, 其角色类似 Dubbo 中的 Zookeeper。
    • Producer: 消息生产者, 负责产生消息, 一般由业务系统负责产生消息。
    • Consumer: 消息消费者, 负责消费消息, 一般是后台系统负责异步消费。
    • Broker: 消息中转角色, 负责存储消息, 转发消息。
    • Message: 消息, 一条消息必须有一个主题(Topic), 主题可以看做是你的信件要邮寄的地址。(一条消息也可以拥有一个可选的标签(Tag)和额处的键值对, 它们可以用于设置一个业务 Key 并在 Broker 上查找此消息以便在开发期间查找问题。)
    • Topic: 主题, 可以看做消息的归类, 它是消息的第一级类型。(比如一个电商系统可以分为: 交易消息、物流消息等, 一条消息必须有一个 Topic 。Topic 与生产者和消费者的关系非常松散, 一个 Topic 可以有 0 个、1 个、多个生产者向其发送消息, 一个生产者也可以同时向不同的 Topic 发送消息。一个 Topic 也可以被 0 个、1 个、多个消费者订阅。)
    • Tag: 子主题, 它是消息的第二级类型, 用于为用户提供额外的灵活性。(使用标签, 同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为: 交易创建消息、交易完成消息等, 一条消息可以没有 Tag 。标签有助于保持您的代码干净和连贯, 并且还可以为 RocketMQ 提供的查询系统提供帮助。)
    • Group: 分组, 一个组可以订阅多个 Topic。(分为 ProducerGroup, ConsumerGroup, 代表某一类的生产者和消费者, 一般来说同一个服务可以作为 Group, 同一个 Group 一般来说发送和消费的消息都是一样的。)
    • Producer Group: 生产者组, 代表某一类的生产者, 比如我们有多个秒杀系统作为生产者, 这多个合在一起就是一个 Producer Group 生产者组, 它们一般生产相同的消息。
    • Consumer Group: 消费者组, 代表某一类的消费者, 比如我们有多个短信系统作为消费者, 这多个合在一起就是一个 Consumer Group 消费者组, 它们一般消费相同的消息。
    • Queue: 队列, 在 Kafka 中叫 Partition。(每个 Queue 内部是有序的, 在 RocketMQ 中分为读和写两种队列, 一般来说读写队列数量一致, 如果不一致就会出现很多问题。)
    • Message Queue: 消息队列, 主题被划分为一个或多个子主题, 即消息队列。(一个 Topic 下可以设置多个消息队列, 发送消息时执行该消息的 Topic , RocketMQ 会轮询该 Topic 下的所有队列将消息发出去。消息的物理管理单位。一个 Topic 下可以有多个 Queue, Queue 的引入使得消息的存储可以分布式集群化, 具有了水平扩展能力。)

    1.4.2. 消息模型

    RockerMQ 中的消息模型就是按照主题模型所实现的, 在主题模型中, 消息的生产者称为发布者(Publisher), 消息的消费者称为订阅者(Subscriber), 存放消息的容器称为主题(Topic)。RocketMQ 中的主题模型到底是如何实现的呢?

    我们可以看到在整个图中有 Producer Group、Topic、Consumer Group 三个角色, 你可以看到图中生产者组中的生产者会向主题发送消息, 而主题中存在多个队列, 生产者每次生产消息之后是指定主题中的某个队列发送消息的。

    每个主题中都有多个队列(这里还不涉及到 Broker), 集群消费模式下, 一个消费者集群多台机器共同消费一个 topic 的多个队列, 一个队列只会被一个消费者消费。如果某个消费者挂掉, 分组内其它消费者会接替挂掉的消费者继续消费。就像上图中 Consumer1 和 Consumer2 分别对应着两个队列, 而 Consuer3 是没有队列对应的, 所以一般来讲要控制消费者组中的消费者个数和主题中队列个数相同。这个简直和 kafak 一毛一样啊!

    当然也可以消费者个数小于队列个数, 只不过不太建议。如下图:

    每个消费组在每个队列上维护一个消费位置, 为什么呢? 因为我们刚刚画的仅仅是一个消费者组, 我们知道在发布订阅模式中一般会涉及到多个消费者组, 而每个消费者组在每个队列中的消费位置都是不同的。如果此时有多个消费者组, 那么消息被一个消费者组消费完之后是不会删除的(因为其它消费者组也需要呀), 它仅仅是为每个消费者组维护一个消费位移(offset), 每次消费者组消费完会返回一个成功的响应, 然后队列再把维护的消费位移加一, 这样就不会出现刚刚消费过的消息再一次被消费了。

    可能你还有一个问题, 为什么一个主题中需要维护多个队列? 答案是提高并发能力。的确, 每个主题中只存在一个队列也是可行的。你想一下, 如果每个主题中只存在一个队列, 这个队列中也维护着每个消费者组的消费位置, 这样也可以做到发布订阅模式。如下图:

    但是, 这样我生产者是不是只能向一个队列发送消息? 又因为需要维护消费位置所以一个队列只能对应一个消费者组中的消费者, 这样是不是其他的 Consumer 就没有用武之地了? 从这两个角度来讲, 并发度一下子就小了很多。

    所以总结来说, RocketMQ 通过使用在一个 Topic 中配置多个队列, 并且每个队列维护每个消费者组的消费位置, 实现了主题模式/发布订阅模式。

    1.4.3. 系统架构

    讲完了消息模型, 我们理解起 RocketMQ 的技术架构起来就容易多了。RocketMQ 技术架构中有四大角色 NameServer、Broker、Producer、Consumer。这 4 大角色, 已经在基本概念中简单解释过, 对于相关词汇, 这里再重点解释一下。

    • Broker: 主要负责消息的存储、投递和查询以及服务高可用保证。说白了就是消息队列服务器嘛, 生产者生产消息到 Broker, 消费者从 Broker 拉取消息并消费。这里, 我还得普及一下关于 Broker、Topic 和队列的关系。上面我讲解了 Topic 和队列的关系——一个 Topic 中存在多个队列, 那么这个 Topic 和队列存放在哪呢? 一个 Topic 分布在多个 Broker 上, 一个 Broker 可以配置多个 Topic, 它们是多对多的关系。如果某个 Topic 消息量很大, 应该给它多配置几个队列, 并且尽量多分布在不同 Broker 上, 以减轻某个 Broker 的压力。Topic 消息量都比较均匀的情况下, 如果某个 broker 上的队列越多, 则该 broker 压力越大。
    • NameServer: 不知道你们有没有接触过 ZooKeeper 和 Spring Cloud 中的 Eureka, 它其实也是一个注册中心, 主要提供两个功能: Broker 管理和路由信息管理。说白了就是 Broker 会将自己的信息注册到 NameServer 中, 此时 NameServer 就存放了很多 Broker 的信息(Broker 的路由表), 消费者和生产者就从 NameServer 中获取路由表然后照着路由表的信息和对应的 Broker 进行通信(生产者和消费者定期会向 NameServer 去查询相关的 Broker 的信息)。
    • Producer: 消息发布的角色, 支持分布式集群方式部署。
    • Consumer: 消息消费的角色, 支持分布式集群方式部署。支持以 push 推, pull 拉两种模式对消息进行消费, 同时也支持集群方式和广播方式的消费, 它提供实时消息订阅机制。

    听完了上面的解释你可能会觉得, 这玩意好简单。不就是这样的么?

    嗯? 你可能会发现一个问题, 这老家伙 NameServer 干啥用的, 这不多余吗? 直接 Producer、Consumer 和 Broker 直接进行生产消息, 消费消息不就好了么? 但是, 我们上文提到过 Broker 是需要保证高可用的, 如果整个系统仅仅靠着一个 Broker 来维持的话, 那么这个 Broker 的压力会不会很大? 所以我们需要使用多个 Broker 来保证负载均衡。如果说, 我们的消费者和生产者直接和多个 Broker 相连, 那么当 Broker 修改的时候必定会牵连着每个生产者和消费者, 这样就会产生耦合问题, 而 NameServer 注册中心就是用来解决这个问题的。

    当然, RocketMQ 中的技术架构肯定不止前面那么简单, 因为上面图中的四个角色都是需要做集群的。我给出一张官网的架构图, 大家尝试理解一下。
    其实和我们最开始画的那张乞丐版的架构图也没什么区别, 主要是一些细节上的差别, 听我细细道来。

    • 第一、我们的 Broker 做了集群并且还进行了主从部署, 由于消息分布在各个 Broker 上, 一旦某个 Broker 宕机, 则该 Broker 上的消息读写都会受到影响。所以 RocketMQ 提供了 master/slave 的结构, salve 定时从 master 同步数据(同步刷盘或者异步刷盘), 如果 master 宕机, 则 slave 提供消费服务, 但是不能写入消息(后面我还会提到)。
    • 第二、为了保证 HA, 我们的 NameServer 也做了集群部署, 但是请注意它是去中心化的。也就意味着它没有主节点, 你可以很明显地看出 NameServer 的所有节点是没有进行 Info Replicate 的, 在 RocketMQ 中是通过单个 Broker 和所有 NameServer 保持长连接, 并且在每隔 30 秒 Broker 会向所有 Nameserver 发送心跳, 心跳包含了自身的 Topic 配置信息, 这个步骤就对应这上面的 Routing Info。
    • 第三、在生产者需要向 Broker 发送消息的时候, 需要先从 NameServer 获取关于 Broker 的路由信息, 然后通过轮询的方法去向每个队列中生产数据以达到负载均衡的效果。
    • 第四、消费者通过 NameServer 获取所有 Broker 的路由信息后, 向 Broker 发送 Pull 请求来获取消息数据。Consumer 可以以两种模式启动—— 广播(Broadcast)和集群(Cluster)。广播模式下, 一条消息会发送给同一个消费组中的所有消费者, 集群模式下消息只会发送给一个消费者。

    1.4.4. 高级特性&常见问题

    1.4.4.1. 顺序消费

    在上面的技术架构介绍中, 我们已经知道了 RocketMQ 在主题上是无序的、它只有在队列层面才是保证有序的。这又扯到两个概念——普通顺序和严格顺序。所谓普通顺序是指消费者通过同一个消费队列收到的消息是有顺序的, 不同消息队列收到的消息则可能是无顺序的。普通顺序消息在 Broker 重启情况下不会保证消息顺序性(短暂时间)。

    所谓严格顺序是指消费者收到的所有消息均是有顺序的。严格顺序消息即使在异常情况下也会保证消息的顺序性。但是, 严格顺序看起来虽好, 实现它可会付出巨大的代价。如果你使用严格顺序模式, Broker 集群中只要有一台机器不可用, 则整个集群都不可用。你还用啥? 现在主要场景也就在 binlog 同步。一般而言, 我们的 MQ 都是能容忍短暂的乱序, 所以推荐使用普通顺序模式。(这个严格顺序, 感觉没太懂, 后面再查一下相关资料。)

    那么, 我们现在使用了普通顺序模式, 我们从上面学习知道了在 Producer 生产消息的时候会进行轮询(取决你的负载均衡策略)来向同一主题的不同消息队列发送消息。那么如果此时我有几个消息分别是同一个订单的创建、支付、发货, 在轮询的策略下这三个消息会被发送到不同队列, 因为在不同的队列此时就无法使用 RocketMQ 带来的队列有序特性来保证消息有序性了。

    那么, 怎么解决呢? 其实很简单, 我们需要处理的仅仅是将同一语义下的消息放入同一个队列(比如这里是同一个订单), 那我们就可以使用 Hash 取模法来保证同一个订单在同一个队列中就行了。

    1.4.4.2. 重复消费

    就两个字——幂等。在编程中一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。比如说, 这个时候我们有一个订单的处理积分的系统, 每当来一个消息的时候它就负责为创建这个订单的用户的积分加上相应的数值。可是有一次, 消息队列发送给订单系统 FrancisQ 的订单信息, 其要求是给 FrancisQ 的积分加上 500。但是积分系统在收到 FrancisQ 的订单信息处理完成之后返回给消息队列处理成功的信息的时候出现了网络波动(当然还有很多种情况, 比如 Broker 意外重启等等), 这条回应没有发送成功。

    那么, 消息队列没收到积分系统的回应会不会尝试重发这个消息? 问题就来了, 我再发这个消息, 万一它又给 FrancisQ 的账户加上 500 积分怎么办呢? 所以我们需要给我们的消费者实现幂等, 也就是对同一个消息的处理结果, 执行多少次都不变。

    那么如何给业务实现幂等呢? 这个还是需要结合具体的业务的。你可以使用写入 Redis 来保证, 因为 Redis 的 key 和 value 就是天然支持幂等的。当然还有使用数据库插入法, 基于数据库的唯一键来保证重复数据不会被插入多条。不过最主要的还是需要根据特定场景使用特定的解决方案, 你要知道你的消息消费是否是完全不可重复消费还是可以忍受重复消费的, 然后再选择强校验和弱校验的方式。毕竟在 CS 领域还是很少有技术银弹的说法。

    简单了来说, 幂等的校验, 还是需要业务方来支持, 因为你解决不了网络抖动问题哈~~

    1.4.4.3. 分布式事务

    如何解释分布式事务呢? 事务大家都知道吧? 要么都执行要么都不执行。在同一个系统中我们可以轻松地实现事务, 但是在分布式架构中, 我们有很多服务是部署在不同系统之间的, 而不同服务之间又需要进行调用。比如此时我下订单然后增加积分, 如果保证不了分布式事务的话, 就会出现 A 系统下了订单, 但是 B 系统增加积分失败或者 A 系统没有下订单, B 系统却增加了积分。前者对用户不友好, 后者对运营商不利, 这是我们都不愿意见到的。那么, 如何去解决这个问题呢?

    如今比较常见的分布式事务实现有 2PC、TCC 和事务消息(half 半消息机制)。每一种实现都有其特定的使用场景, 但是也有各自的问题, 都不是完美的解决方案。在 RocketMQ 中使用的是事务消息加上事务反查机制来解决分布式事务问题的。

    下面是上图的执行流程:

    1. A 服务先发送个 Half Message 给 Brock 端, 消息中携带 B 服务 即将要+100 元的信息。
    2. 当 A 服务知道 Half Message 发送成功后, 那么开始第 3 步执行本地事务。
    3. 执行本地事务(会有三种情况 1、执行成功。2、执行失败。3、网络等原因导致没有响应)
    4. 如果本地事务成功, 那么 Product 像 Brock 服务器发送 Commit, 这样 B 服务就可以消费该 message。
    5. 如果本地事务失败, 那么 Product 像 Brock 服务器发送 Rollback, 那么就会直接删除上面这条半消息。
    6. 如果因为网络等原因迟迟没有返回失败还是成功, 那么会执行 RocketMQ 的回调接口, 来进行事务的回查。

    1.4.4.4. 消息堆积

    消息中间件的主要功能是异步解耦, 还有个重要功能是挡住前端的数据洪峰, 保证后端系统的稳定性, 这就要求消息中间件具有一定的消息堆积能力, 消息堆积分以下两种情况:

    • 消息堆积在内存 Buffer, 一旦超过内存 Buffer, 可以根据一定的丢弃策略来丢弃消息, 如 CORBA Notification 规范中描述。适合能容忍丢弃消息的业务, 这种情况消息的堆积能力主要在于内存 Buffer 大小, 而且消息堆积后, 性能下降不会太大, 因为内存中数据多少对于对外提供的访问能力影响有限。
    • 消息堆积到持久化存储系统中, 例如 DB, KV 存储, 文件记录形式。当消息不能在内存 Cache 命中时, 要不可避免的访问磁盘, 会产生大量读 IO, 读 IO 的吞吐量直接决定了消息堆积后的访问能力。

    评估消息堆积能力主要有以下四点:

    • 消息能堆积多少条, 多少字节? 即消息的堆积容量。
    • 消息堆积后, 发消息的吞吐量大小, 是否会受堆积影响?
    • 消息堆积后, 正常消费的 Consumer 是否会受影响?
    • 消息堆积后, 访问堆积在磁盘的消息时, 吞吐量有多大?
      简单来说, RocketMQ 支持大量消息堆积, 消息会存在内存, 超出内存的消息会持久化到磁盘中。

    1.4.4.5. 定时消息

    定时消息是指消息发到 Broker 后, 不能立刻被 Consumer 消费, 要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意的时间精度, 在 Broker 层面, 必须要做消息排序, 如果再涉及到持久化, 那么消息排序要不可避免的产生巨大性能开销。RocketMQ 支持定时消息, 但是不支持任意时间精度, 支持特定的 level, 例如定时 5s, 10s, 1m 等。

    简单来说, RocketMQ 支持定时消息, 但是只支持固定时间, 不支持任意精度时间。

    1.4.4.6. 回溯消费

    1.4.4.6.1. 同步刷盘和异步刷盘

    上面我讲了那么多的 RocketMQ 的架构和设计原理, 你有没有好奇, 在 Topic 中的队列是以什么样的形式存在的? 队列中的消息又是如何进行存储持久化的呢? 我在上文中提到的同步刷盘和异步刷盘又是什么呢? 它们会给持久化带来什么样的影响呢? 下面我将给你们一一解释。

    如上图所示, 在同步刷盘中需要等待一个刷盘成功的 ACK, 同步刷盘对 MQ 消息可靠性来说是一种不错的保障, 但是性能上会有较大影响, 一般地适用于金融等特定业务场景。而异步刷盘往往是开启一个线程去异步地执行刷盘操作。消息刷盘采用后台异步线程提交的方式进行, 降低了读写延迟, 提高了 MQ 的性能和吞吐量, 一般适用于如发验证码等对于消息保证要求不太高的业务场景。一般地, 异步刷盘只有在 Broker 意外宕机的时候会丢失部分数据, 你可以设置 Broker 的参数 FlushDiskType 来调整你的刷盘策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。

    简单来说, 同步刷盘是刷盘后请求再返回, 异步刷盘是直接返回请求, 再去慢慢刷盘, 可能会导致数据丢失。

    1.4.4.6.2. 同步复制和异步复制

    上面的同步刷盘和异步刷盘是在单个结点层面的, 而同步复制和异步复制主要是指的 Borker 主从模式下, 主节点返回消息给客户端的时候是否需要同步从节点。

    • 同步复制: 也叫 “同步双写”, 也就是说, 只有消息同步双写到主从结点上时才返回写入成功。
    • 异步复制: 消息写入主节点之后就直接返回写入成功。
      异步复制会不会也像异步刷盘那样影响消息的可靠性呢? 答案是不会的, 因为两者就是不同的概念, 对于消息可靠性是通过不同的刷盘策略保证的, 而像异步同步复制策略仅仅是影响到了可用性。为什么呢? 其主要原因是 RocketMQ 是不支持自动主从切换的, 当主节点挂掉之后, 生产者就不能再给这个主节点生产消息了。比如这个时候采用异步复制的方式, 在主节点还未发送完需要同步的消息的时候主节点挂掉了, 这个时候从节点就少了一部分消息。但是此时生产者无法再给主节点生产消息了, 消费者可以自动切换到从节点进行消费(仅仅是消费), 所以在主节点挂掉的时间只会产生主从结点短暂的消息不一致的情况, 降低了可用性, 而当主节点重启之后, 从节点那部分未来得及复制的消息还会继续复制。

    扩展知识 1: 在单主从架构中, 如果一个主节点挂掉了, 那么也就意味着整个系统不能再生产了。那么这个可用性的问题能否解决呢? 一个主从不行那就多个主从的呗, 别忘了在我们最初的架构图中, 每个 Topic 是分布在不同 Broker 中的。但是这种复制方式同样也会带来一个问题, 那就是无法保证严格顺序。在上文中我们提到了如何保证的消息顺序性是通过将一个语义的消息发送在同一个队列中, 使用 Topic 下的队列来保证顺序性的。如果此时我们主节点 A 负责的是订单 A 的一系列语义消息, 然后它挂了, 这样其他节点是无法代替主节点 A 的, 如果我们任意节点都可以存入任何消息, 那就没有顺序性可言了。(这点我并不认同, 我理解主从的对列信息应该是一样的, 我从主节点读到哪里, 如果主节点挂掉, 应该是可以到从结点去读取的, 如果不能这样, 搞个主从就完全没有意义了。因为主从的信息是一样的, 对队列的顺序是有影响的, 我不可能把不同的信息, 搞两个队列, 分别放到主从机器。)

    扩展知识 2: 在 RocketMQ 中采用了 Dledger 解决主从数据同步问题。他要求在写入消息的时候, 要求至少消息复制到半数以上的节点之后, 才给客⼾端返回写⼊成功, 并且它是⽀持通过选举来动态切换主节点的。这里我就不展开说明了, 读者可以自己去了解。也不是说 Dledger 是个完美的方案, 至少在 Dledger 选举过程中是无法提供服务的, 而且他必须要使用三个节点或以上, 如果多数节点同时挂掉他也是无法保证可用性的, 而且要求消息复制板书以上节点的效率和直接异步复制还是有一定的差距的。

    这个机制, 感觉就像大众化的版本, 基本思路都一样, 为了保证数据可用性, 我还是推荐同步复制, 当大多数节点复制成功, 就认为复制完毕, 和 ETCD 的 Raft 协议的日志同步原理一样。

    1.4.4.7. 容错机制

    在实际使用 RocketMQ 的时候我们并不能保证每次发送的消息都刚好能被消费者一次性正常消费成功, 可能会存在需要多次消费才能成功或者一直消费失败的情况, 那作为发送者该做如何处理呢?

    RocketMQ 提供了 ack 机制, 以保证消息能够被正常消费。发送者为了保证消息肯定消费成功, 只有使用方明确表示消费成功, RocketMQ 才会认为消息消费成功。中途断电, 抛出异常等都不会认为成功——即都会重新投递。当然, 如果消费者不告知发送者我这边消费信息异常, 那么发送者是不会知道的, 所以消费者在设置监听的时候需要给个回调。

    为了保证消息是肯定被至少消费成功一次, RocketMQ 会把这批消息重发回 Broker(topic 不是原 topic 而是这个消费租的 RETRY topic), 在延迟的某个时间点(默认是 10 秒, 业务可设置)后, 再次投递到这个 ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认 16 次), 就会投递到 DLQ 死信队列。应用可以监控死信队列来做人工干预。

    简单来说, 通过 ACK 保证消息一定能正常消费, 对于异常消息, 会重新放回 Broker, 但是这样就会打乱消息的顺序, 所以容错机制和消息严格顺序, 鱼和熊掌不可兼得。

    1.4.5. 特性分析

    这里才是内容的重点, 不仅需要知道 RocketMQ 的特性, 还需要知道支持这些特性的原因:

    • 消息路由(不支持): RocketMQ 在处理消息之前是不允许消费者过滤一个主题中的消息。一个订阅的消费者在没有异常情况下会接受一个队列中的所有消息;
    • 消息有序(部分支持): 需要将同一类的消息 hash 到同一个队列 Queue 中, 才能支持消息的顺序, 如果同一类消息散落到不同的 Queue 中, 就不能支持消息的顺序, 如果设定消息一定要正常消费, 那么就不能保证消息顺序。
    • 消息时序(可以支持): 可以发送定时消息, 但是只能制定系统定义好的时间, 不支持自定义时间;
    • 容错处理(支持): 通过 ACK 机制, 保证消息一定能正常消费, 这个和 RabbitMQ 很像;
    • 伸缩(支持): 整体架构其实和 kafaka 很像, 可以扩容 broker 和内部队列数, 或者增加消费组中的消费组数量, 提高消费能力。
    • 持久化(支持): 消息可以持久化到磁盘中, 所以支持消息的回溯, 和 kafaka 很像。
    • 消息回溯(支持): 因为消息支持持久化, 就支持回溯, 可以理解是附带的功能。
    • 高吞吐(非常好): 借鉴 kafaka 的设计, 不会出现 rabbitMQ 的单 Master 抗压力问题, 可以从多个 borker 写入和消费消息。
  • 相关阅读:
    SpringCloud 三种服务调用方式,你学会了吗?
    HCNP Routing&Switching之代理ARP
    C++中的decltype、std::declval 和 std::decay_t傻傻分不清楚
    sklearn实现多元线性回归 【Python机器学习系列(七)】
    Spring Boot 配置文件
    RestClient操作Elasticsearch(Java)
    java遇到的一些集合相关知识
    unity学习笔记之不通过路径通过拖拽获取lua脚本里定义的UI组件
    杂题——1097: 蛇行矩阵
    【力扣每日一题】2023.10.10 移动机器人
  • 原文地址:https://blog.csdn.net/wan212000/article/details/126177954