RocketMQ是一款纯java、分布式、队列模型的消息中间件。支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
消息生产者,作用就是将消息发送到Broker。提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息。
消息消费者,负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户角度而言提供了拉取式消费和推动式消费。
消息中转角色,负责存储消息、转发消息。Broker也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
充当路由消息的提供者。接收broker的请求,注册broker的路有消息;接收client(producer/consumer)的请求,根据topic获取到broker ip列表。可集群部署,节点间无任何信息同步。
消息系统所传输信息的物理载体,生产和消费数据的最小单位。每个消息拥有唯一的Message ID,且可以携带具有业务标识的key。系统提供了通过Message ID和key查询消息的功能。
2.2、Topic
是一种消息的逻辑分类
2.3、Tag
用于同一Topic下区分不同类型的消息
2.4、Producer Group:
同一类producer的集合,这类producer发送同一类消息,且发送逻辑一致
2.5、Consumer Group:
同一类consumer的集合,这类consumer通常消费同一类消息且消费逻辑一致。消费者组使得消息消费方面实现负载均衡和容错的目标变得非常容易。RocketMQ支持两种消费模式:集群消费和广播消费。
(1)启动NameServer,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心
(2)Broker启动,跟所有的NameServer保持长连接,定时发动心跳包。心跳包中包含当前Broker信息(IP+Port)以及存储所有topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
(3)收发消息前,先创建topic,创建topic时需要指定该topic存储在哪些Broker上,也可以在发送消息时自动创建topic
(4)Producer发送消息,启动时先跟NameServer集群中的一台建立长连接,并从NameServer中获取当前发送的topic存在哪些Broker上,轮询从队列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息
(5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后跟Broker建立连接,开始消费消息
1.1、同步消息:
消息发送方发出数据后,会阻塞,直到Broker发回响应消息
1.2、异步消息:
消息发送方发出数据后,不等待Broker发回响应,接着发送下个数据包。MQ的异步发送,需要用户实现异步发送回调接口,在执行消息的异步发送时,应用不需要等待Broker响应即可直接返回,通过回调接口接收broker的响应
1.3、单向消息:
只负责发送消息,不等待broker回应且没有回调函数触发,即只发送请求,不等待应答
2.1、普通消息
2.2、顺序消息:
可以严格的保证消息有序,可以分为分区有序或全局有序。在默认情况下消息发送会采取轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序,但是如果控制发送的顺序消息发送到同一个queue中,就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序,如果是多个,则为分区有序
2.3、延时消息:
指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。broker有配置延时时间等级,不可以自定义任意时间
2.4、事务消息:
指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
(1)Push: broker主动向消费者推送
(2)Pull: 消费者在需要时,主动去broker拉取
注:Push和Pull都是基于拉模式来获取消息的,push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又马不停蹄的继续向服务器再次尝试拉取消息
(1)集群模式: 相同Consumer Group的每个Consumer实例平均分摊消息
(2)广播消费: 相同Consumer Group的每个Consumer实例接收全量消息
Producer的send方法本身支持内部重试:至多重试2次(同步发送为2次,异步发送为0次);如果发送失败,则轮转到下一个broker,这个方法的总耗时时间不超过sendMessageTimeout设置的值;如果本身向broker发送消息产生超时异常,就不会重试。
RocketMQ无法避免消息重复,如果业务对重复消费敏感,业务端必须做去重处理。可以借助关系数据库,首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。在消费之前判断唯一键是否存在。在实际使用中,可能会存在相同的消息有两个不同的msgId,这种情况就需要业务字段进行重复消费。
消息者在进行消息订阅时,除了可以指定要订阅消息的Topic外,还可以对指定Topic中的消息根据指定条件进行过滤,即可以订阅比Topic更加细粒度的消息类型。对于指定Topic消息的过滤有两种过滤方式:Tag过滤与SQL过滤。
通过consumer的subscribe()方法指定要订阅消息的Tag。如果订阅多个Tag的消息,Tag间使用或运算符(双竖线||)连接。
SQL过滤是一种通过特定表达式对事先埋入到消息中的用户属性进行筛选过滤的方式。通过SQL过滤,可以实现对消息的复杂过滤。不过,只有使用PUSH模式的消费者才能使用SQL过滤。默认情况下Broker没有开启消息的SQL过滤功能,需要在Broker加载的配置文件中添加如下属性,以开启该功能:
enablePropertyFilter = true
在启动Broker时需要指定这个修改过的配置文件。例如对于单机Broker的启动,其修改的配置文件是conf/broker.conf,启动时使用如下命令:
sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
SQL过滤表达式中支持多种常量类型与运算符。
(1)支持的常量类型
- 数值:比如:123,3.1415
- 字符:必须用单引号包裹起来,比如:‘abc’
- 布尔:TRUE 或 FALSE
- NULL:特殊的常量,表示空
(2)支持的运算符:
- 数值比较:>,>=,<,<=,BETWEEN,=
- 字符比较:=,<>,IN
- 逻辑运算 :AND,OR,NOT
- NULL判断:IS NULL 或者 IS NOT NULL