RocketMQ 设计基于主题的发布与订阅模式,其核心功能包括消息发送、消息存储 (Broker)、消息消费,整体设计追求简单与性能第一,主要体现在如下三个方面。
首先,NameServer 设计极其简单,摒弃了业界常用的使用 Zookeeper 充当信息管理的“注册中心”,而是自研 NameServer 来实现元数据的管理(Topie 路由信息等)。从实际需求出发,因为 Topic路由信息无须在集群之间保持强一致,追求最终一致性,并且能容忍分钟级的不一致。正是基于此种情况,RocketMQ 的 NameServer 集群之间互不通信,极大地降低了 NameServer 实现的复杂程度,对网络的要求也降低了不少,但是性能相比较 Zookeeper 有了极大的提升。
其次是高效的IO 存储机制。RocketMQ 追求消息发送的高吞吐量,RocketMQ 的消息存储文件设计成文件组的概念,组内单个文件大小固定,方便引人内存映射机制,所有主题的消息存储基于顺序写,极大地提供了消息写性能,同时为了兼顾消息消费与消息查找, 引入了消息消费队列文件与索引文件。
最后是容忍存在设计缺陷,适当将某些工作下放给 RocketMQ 使用者。消息中间件的实现者经常会遇到一个难题:如何保证消息一定能被消息消费者消费,并且保证只消费一次。 RocketMQ 的设计者给出的解决办法是不解决这个难题,而是退而求其次,只保证消息被消费者消费,但设计上允许消息被重复消费,这样极大地简化了消息中间件的内核,使得实现消息发送高可用变得非常简单与高效,消息重复问题由消费者在消息消费时实现幂等。
RocketMQ 作为一款消息中间件,需要解决如下问题。
架构模式
RocketMQ 与大部分消息中间件一样,采用发布订阅模式,基本的参与组件主要包括:消息发送者、消息服务器(消息存储)、消息消费、路由发现。
顺序消息
所谓顺序消息,就是消息消费者按照消息达到消息存储服务器的顺序消费。RocketMQ 可以严格保证消息有序。
消息过滤是指在消息消费时,消息消费者可以对同一主题下的消息按照规则只消费自己感兴趣的消息。RocketMQ 消息过滤支持在服务端与消费端的消息过滤机制。
消息存储
消息存储消息中间件的一个核心实现是消息的存储,对消息存储一般有如下两个维度的考量:
消息堆积能力和消息存储性能。RocketMQ 追求消息存储的高性能,引人内存映射机制,所有主题的消,息顺序存储在同一个文件中。同时为了避免消息无限在消息存储服务器中累积, 引入了消息文件过期机制与文件存储空间报警机制。
消息高可用性
通常影响消息可靠性的有以下几种情况。
针对上述情況,情况 1~4 的 RocketMQ 在同步刷盘机制下可以确保不丢失消息,在异步刷盘模式下,会丢失少量消息。情况5~6属于单点故障,一旦发生,该节点上的消息全部丢失,如果开启了异步复制机制,RoketMQ 能保证只丢失少量消息,RocketMQ 在后续版本中将引人双写机制,以满足消息可靠性要求极高的场合。
消息到达(消费)低延迟
RocketMQ 在消息不发生消息堆积时,以长轮询模式实现准实时的消息推送模式。
确保消息必须被消费一次
RocketMQ 通过消息消费确认机制(ACK) 来确保消息至少被消费一次,但由于 ACK 消息有可能丢失等其他原因,RocketMQ 无法做到消息只被消费一次,有重复消费的可能。
回溯消息
回溯消息是指消息消费端已经消费成功的消息,由于业务要求需要重新消费消息。 RocketMQ 支持按时间回湖消息,时间维度可精确到毫秒,可以向前或向后回湖。
消息堆积
消息中间件的主要功能是异步解耦,必须具备应对前端的数据洪峰,提高后端系统的可用性,必然要求消息中间件具备一定的消息堆积能力。RocketMQ 消息存储使用磁盘文件 (内存映射机制),并且在物理布局上为多个大小相等的文件组成逻辑文件组,可以无限循环使用。RocketMQ 消息存储文件并不是永久存储在消息服务器端,而是提供了过期机制,默认保留3天。
定时消息
定时消息是指消息发送到 Broker 后,不能被消息消费端立即消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意精度的定时消息消费,必须在消息服务端对消息进行排序,势必带来很大的性能损耗,故 RocketMQ 不支持任意进度的定时消息,而只支持特定延迟级别。
消息重试机制
消息重试是指消息在消费时,如果发送异常,消息中间件需要支持消息重新投递, RocketMQ 支持消息重试机制。
Nameserver 集群,tobic 的路由注册中心,为客户端根据 Topic 提供路由服务,从而引导客户端向 Broker 发送消息。Nameserver 之间的节点不通信。路由信息在 Nameserver 集群中数据一致性采取的最终一致性。
消息存储服务器,分为两种角色:Master 与 Slave,上图中呈现的就是2主2 从的部署架构,在 RocketMQ 中,主服务承担读写操作,从服务器作为一个备份,当主服务器存在压力时,从服务器可以承担读服务(消息消费)。所有 Broker,包含 Slave 服务器每隔 30s 会向 Nameserver 发送心跳包,心跳包中会包含存在在 Broker 上所有的 topic 的路由信息。
消息客户端,包括 Producer(消息发送者)和 Consumer(消费消费者).客户端在同一时间只会连接一台 nameserver,只有在连接出现异常时才会向尝试连接另外一台。客户端每隔 30s向 Nameserver 发起 topic 的路由信息查询。
温馨提示:Nameserver 是在内存中存储 Topic 的路由信息,持久化 Topic 路由信息的地方是在 Broker 中,即${ ROCKETMQ HOME)/store/config/topics。json。
在RocketMQ4.5.0 版本后引入了多副本机制,即一个复制组(m-s)可以演变为基于 raft 协议的复制组,复制组内部使用 raft 协议保证 broker 节点数据的强一致性,该部署架构在金融行业用的比较多。
在 RocketMQ 的消息消费模式采用的是发布与订阅模式。
topic:一类消息的集合,消息发送者将一类消息发送到一个主题中,例如订单模块将订单发送到 order_topic 中,而用户登录时,将登录事件发送到 user_login_topic 中。
consumegroup:消息消费组,一个消费单位的“群体”,消费组首先在启动时需要订阅需要消费的 topic。一个 topic 可以被多个消费组订阅,同样一个消费组也可以订阅多个主题。一个消费组拥有多个消费者。
例如我们在开发一个订单系统,其中有一个子系统:order-service-app,在该项目中会创建一个消费组 order_consumer 来订阅 order_topic,并且基于分布式部署, order-service-app 的部署情况如下:
即 order-service-app 部署了 3台服务器,每一个jvm 进程可以看做是消费组 order_consumer 消费组的其中一个消费者。
那这三个消费者如何来分工来共同消费 order_topic 中的消息呢?
在 RocketMQ 中支持广播模式与集群模式。
广播模式:一个消费组内的所有消费者每一个都会处理 topic 中的每一条消息,通常用于刷新内存缓存。
集群模式:一个消费组内的所有消费者共同消费一个 topic 中的消息,即分工协作, 个消费者消费一部分数据,启动负载均衡,
集群模式是非常普遍的模式,符合分布式架构的基本理念,即横向扩容,当前消费者如果无法快速及时处理消息时,可以通过增加消费者的个数横向扩容,快速提高消费能力,及时处理挤压的消息。
那集群模式下,消费者是如何来分配消息的呢?
例如上面实例中 order_topic 有16个队列,那一个拥有3个消费者的消费组如何来分配队列中。
在MQ 领域有一个不成文的约定:同一个消费者同一时间可以分配多个队列,但一个队列同一时间只会分配给一个消费者。
RocketMQ 提供了众多的队列负载算法,其中最常用的两种平均分配算法。
为了说明这两种分配算法的分配规则,现在对16 个队列,进行编号,用 q0-q15 表示, 消费者用 c0~c2 表示。
AllocateMessageQueueAveragely 分配算法的队列负载机制如下:
c0: q0 q1 q2 q3 q4 q5
c1: q6 q7 q8 q9 q10
c2: q11 q12 q13 q14 q15
其算法的特点是用总数除以消费者个数,余数按消费者顺序分配给消费者,故 c0 会多分配一个队列,而且队列分配是连续的.
AlocateMessageQueueAveragelyByCircle 分配算法的队列负载机制如下:
c0: q0 q3 q6 q9 q12 q15
c1: q1 q4 q7 q10 q13
c2: q2 q5 q8 q11 q14
该分配算法的特点就是轮流一个一个分配。
温馨提示:如果 topic 的队列个数小于消费者的个数,那有些消费者无法分配到消息。 在RocketMQ 中一个 topic 的队列数直接决定了最大消费者的个数,但 topic 队列个数的增加对 RocketMQ 的性能不会产生影响。
在实际过程中,对主题进行扩容(增加队列个数)或者对消费者进行扩容、缩容是一件非常寻常的事情,那如果新增一个消费者,该消费者消费哪些队列呢?这就涉及到消息消费队列的重新分配,即消费队列重平衡机制。
在RocketMQ 客户端中会每隔 20s 去查询当前 topic 的所有队列、消费者的个数,运用队列负载算法进行重新分配,然后与上一次的分配结果进行对比,如果发生了变化,则进行队列重新分配;如果没有发生变化,则忽略。
消费者消费一条消息后需要记录消费的位置,这样在消费端重启的时候,继续从上一次消费的位点开始进行处理新的消息。在 RocketMQ 中,消息消费位点的存储是以消费组为单位的。
集群模式下,消息消费进度存储在 broker 端,$(ROCKETMQ_HOMEY/store/conf ig/consumerOffset。json 是其具体的存储文件,其中内容截图如下:
可见消费进度的Key 为:topic@consumeGroup,然后每一个队列一个偏移量。
广播模式的消费进度文件存储在用户的主目录,默认文件全路劲名:${USERHOME}/.rocketmq_offsets。
RocketMQ 提供了并发消费、顺序消费两种消费模型。
并发消费:对一个队列中消息,每一个消费者内部都会创建一个线程池,对队列中的消息多线程处理,即偏移量大的消息比偏移量小的消息有可能先消费。
顺序消费:在某一项场景,例如 MySQL binlog 场景,需要消息按顺序进行消费。在 RocketMQ 中提供了基于队列的顺序消费模型,即尽管一个消费组中的消费者会创建一个多线程,但针对同一个 Queue,会加锁。
温馨提示:并发消费模型中,消息消费失败默认会重试16 次,每一次的间隔时间不一样;而顺序消费,如果一条消息消费失败,则会一直消费,直到消费成功。故在顺序消费的
使用过程中,应用程序需要区分系统异常、业务异常,如果是不符合业务规则导致的异常, 则重试多少次都无法消费成功,这个时候一定要告警机制,及时进行人为干预,否则消费会积压。
事务消息并不是为了解决分布式事务,而是提供消息发送与业务落库的一致性,其实现原理就是一次分布式事务的具体运用,请看如下示例:
上述伪代码中,将订单存储关系型数据库中和将消息发送到MQ 这是两个不同介质的两个操作,如果能保证消息发送、数据库存储这两个操作要么同时成功,要么同时失败, RocketMQ 为了解决该问题引入了事务消息。
温馨提示,本节主要的目的是让大家知晓各个术语的概念,由于事务消息的使用,将在该专栏的后续文章中详细介绍。
开源版本的 RocketMQ 目前并不支持任意精度的定时消息。所谓的定时消息就是将消息发送到 Broker,但消费端不会立即消费,而是要到指定延迟时间后才能被消费端消费。
RocketMQ 目前支持指定级别的延迟,其延迟级别如下:
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
消息过滤是指消费端可以根据某些条件对一个 topic 中的消息进行过滤,即只消费一个主题下满足过滤条件的消息。
RocketMQ 目前主要的过滤机制是基于 tag 的过滤与基于消息属性的过滤,基于消息属性的过滤支持 SQL92 表达式,对消息进行过滤。
本文的主要目的是介绍 RocketMQ 常见的术语,例如 nameserver、 broker、主题、 消费组、消费者、队列负载算法、队列重平衡机制、并发消费、顺序消费、消费进度存储、 定时消息、事务消息、消息过滤等基本概念,为后续打下坚实基础。