用户打车从派单服务到调度服务,首先将消息以顺序方式扔到RocketMQ中,然后消费的事务就会严格按照放入的顺序进行消费,用户首先拿到从RocketMQ推送的顺序消息,然后保持住,开始轮询检查Redis中的List中是否存在车辆,存在两种情况:
如果没有拉取到车辆,然后会延时一段时间,继续进行拉取,一直拉取不到的话一直进行自旋,一直等到拿到车辆才退出自旋。
如果拉取车辆就会将用户和拿到的车辆绑定到一起,开始后续操作,比如下订单等。
当司机上线后,开启自动接单后,主题流程图下
如果用户点击下车,主体流程如下
用户上车后流程如下
打车需要排队,我们需要让前面的人能够被消费到,不能让这个顺序乱掉,这就需要用到RocketMQ的顺序消息
我们要让车辆在队列中,从MQ拿到一个车辆后,需要再从队列中拿取一个车辆如果拿不到则需要不断的轮询,一直到拿到车辆为止,如果打车玩完成还是需要将车辆归还队列,让其他的用户来打车,将一辆车重复利用起来
无序消息也指普通的消息,Producer 只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并没有保证。
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费
比如 Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费。
在实际开发有些场景中,我并不需要消息完全按照完全按的先进先出,而是某些消息保证先进先出就可以了。
就好比一个打车涉及到不同地区 北京
,上海
、广州
、深圳
,我不用管其它的订单,只保证同一个地区的订单ID能保证这个顺序
就可以了。
RocketMQ可以严格的保证消息有序,但这个顺序,不是全局顺序,只是分区(queue)顺序,要全局顺序只能一个分区。
我们知道 生产的message最终会存放在Queue中,如果一个Topic关联了4个Queue,如果我们不指定消息往哪个队列里放,那么默认是平均分配消息到4个queue,
好比有10条消息,那么这10条消息会平均分配在这4个Queue上,那么每个Queue大概放2个左右,这里有一点很重的是:同一个queue,存储在里面的message 是按照先进先出的原则
之所以出现下面这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不同的queue(分区)
这个时候思路就来了,我们让不同的地区用不同的queue,只要保证同一个地区的订单把他们放到同一个Queue那就保证消费者先进先出了。
这就保证局部顺序了,即同一订单按照先后顺序放到同一Queue,那么获取消息的时候就可以保证先进先出。
这里还有很关键的一点,在一个消费者集群的情况下,消费者1先去Queue拿消息,它拿到了 北京订单1,它拿完后,消费者2去queue拿到的是 北京订单2
拿的顺序是没毛病了,但关键是先拿到不代表先消费完它,会存在虽然你消费者1先拿到北京订单1,但由于网络等原因,消费者2比你真正的先消费消息,这是不是很尴尬了。
Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。
所以最终的消费者这边的逻辑就是
全局顺序与分区顺序对比
Topic消息类型 | 支持事务消息 | 支持定时/延时消息 | 性能 |
---|---|---|---|
无序消息(普通、事务、定时/延时) | 是 | 是 | 最高 |
分区顺序消息 | 否 | 否 | 高 |
全局顺序消息 | 否 | 否 | 一般 |
发送方式对比
Topic消息类型 | 支持可靠同步发送 | 支持可靠异步发送 | 支持Oneway发送 |
---|---|---|---|
无序消息(普通、事务、定时/延时) | 是 | 是 | 是 |
分区顺序消息 | 是 | 否 | 否 |
全局顺序消息 | 是 | 否 | 否 |
- public class SelectorFactory {
- /**
- * 工厂模式获取MessageQueueSelector
- *
- * @param value
- * @return
- */
- public static MessageQueueSelector getMessageQueueSelector(String value) {
- //如果value不为空使用hash选择器
- if (StringUtils.isNotEmpty(value)) {
- return new SelectMessageQueueByHash();
- }
- //如果value为空使用随机选择器
- return new SelectMessageQueueByRandom();
- }
- }
- @Component
- public class MQProducer {
-
- @Autowired
- DefaultMQProducer defaultMQProducer;
-
- /**
- * 同步发送消息
- * @param taxiBO
- */
- public void send(TaxiBO taxiBO) {
- if (null == taxiBO) {
- return;
- }
- SendResult sendResult = null;
-
- try {
- //获取消息对象
- Message message = RocketMQHelper.buildMessage(DispatchConstant.SEQ_TOPIC, taxiBO);
- //根据区域编码获取队列选择器
- MessageQueueSelector selector = SelectorFactory.getMessageQueueSelector(taxiBO.getAreaCode());
- //发送同步消息
- sendResult = defaultMQProducer.send(message, selector, taxiBO.getAreaCode(), 10000);
- } catch (MQClientException e) {
- e.printStackTrace();
- } catch (RemotingException e) {
- e.printStackTrace();
- } catch (MQBrokerException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- if (null != sendResult) {
- System.out.println(sendResult.toString());
- }
- }
- }
消费者真正要达到消费顺序,需要分布式锁,所以这里需要将
MessageListenerOrderly
替换之前的MessageListenerConcurrently,因为它里面实现了分布式锁。
- /**
- * 消费消息
- */
- public abstract class MQConsumeMessageListenerProcessor implements MessageListenerOrderly {
- public static final Logger logger = LoggerFactory.getLogger(MQConsumeMessageListenerProcessor.class);
-
- /**
- * 消费有序消息
- *
- * @param list
- * @param consumeOrderlyContext
- * @return
- */
- @Override
- public ConsumeOrderlyStatus consumeMessage(List
list, ConsumeOrderlyContext consumeOrderlyContext) { -
- if (CollectionUtils.isEmpty(list)) {
- logger.info("MQ接收消息为空,直接返回成功");
- return ConsumeOrderlyStatus.SUCCESS;
- }
- //消费消息
- for (MessageExt messageExt : list) {
- try {
- String topic = messageExt.getTopic();
- String tags = messageExt.getTags();
- String body = new String(messageExt.getBody(), "utf-8");
- //调用具体消费流程
- processMessage(topic, tags, body);
- logger.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
- } catch (Exception e) {
- logger.error("获取MQ消息内容异常{}", e);
- //暂停当前队列
- return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
- }
- }
-
- // TODO 处理业务逻辑
- return ConsumeOrderlyStatus.SUCCESS;
- }
-
- /**
- * 处理消息
- *
- * @param body
- */
- public abstract void processMessage(String topic, String tags, String body);
- }
上面我们介绍了顺序消息,它主要将相同的消息投递到一个队列中的,具体如何投递呢
上面我们介绍了顺序消息,但是RocketMQ还支持那些投递策略呢、
RocketMQ 的消息模型整体并不复杂,如下图所示:
一个Topic(消息主题)
可能对应多个实际的消息队列(MessgeQueue)
在底层实现上,为了提高MQ的可用性和灵活性,一个Topic在实际存储的过程中,采用了多队列的方式,具体形式如上图所示,每个消息队列在使用中应当保证先入先出(FIFO,First In First Out)的方式进行消费。
那么,基于这种模型,就会引申出两个问题:
生产者投递策略就是讲如何将一个消息投递到不同的queue中
默认投递方式:基于
Queue队列
轮询算法投递
默认情况下,采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列
的消息投递数量尽可能均匀,算法如下图所示:
在有些场景下,需要保证同类型消息投递和消费的顺序性。
例如,假设现在有TOPIC topicTest
,该 Topic下有4个Queue队列
,该Topic用于传递订单的状态变迁,假设订单有状态:未支付
、已支付
、发货中(处理中)
、发货成功
、发货失败
。
在时序上,生产者从时序上可以生成如下几个消息:
订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中(处理中) --> 订单T0000001:发货失败
消息发送到MQ中之后,可能由于轮询投递的原因,消息在MQ的存储可能如下:
这种情况下,我们希望消费者
消费消息的顺序和我们发送是一致的,然而,有上述MQ的投递和消费机制,我们无法保证顺序是正确的,对于顺序异常的消息,消费者
即使有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。
基于上述的情况,RockeMQ
采用了这种实现方案:对于相同订单号的消息,通过一定的策略,将其放置在一个 queue队列中
,然后消费者
再采用一定的策略(一个线程独立处理一个queue
,保证处理消息的顺序性),能够保证消费的顺序性
生产者在消息投递的过程中,使用了 MessageQueueSelector
作为队列选择的策略接口,其定义如下:
- public interface MessageQueueSelector {
- /**
- * 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列
- * @param mqs 待选择的MQ队列选择列表
- * @param msg 待发送的消息体
- * @param arg 附加参数
- * @return 选择后的队列
- */
- MessageQueue select(final List
mqs, final Message msg, final Object arg) ; - }
投递策略 | 策略实现类 | 说明 |
---|---|---|
随机分配策略 | SelectMessageQueueByRandom | 使用了简单的随机数选择算法 |
基于Hash分配策略 | SelectMessageQueueByHash | 根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index |
基于机器机房位置分配策略 | SelectMessageQueueByMachineRoom | 开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配 |
RocketMQ对于消费者消费消息有两种形式:
BROADCASTING
:广播式消费,这种模式下,一个消息会被通知到每一个消费者
CLUSTERING
: 集群式消费,这种模式下,一个消息最多只会被投递到一个消费者
上进行消费 模式如下:对于使用了消费模式为MessageModel.CLUSTERING
进行消费时,需要保证一个消息在整个集群中只需要被消费一次,实际上,在RoketMQ底层,消息指定分配给消费者的实现,是通过queue队列分配给消费者的方式完成的:也就是说,消息分配的单位是消息所在的queue队列
将
queue队列
指定给特定的消费者
后,queue队列
内的所有消息将会被指定到消费者
进行消费。
RocketMQ定义了策略接口AllocateMessageQueueStrategy
,对于给定的消费者分组
,和消息队列列表
、消费者列表
,当前消费者
应当被分配到哪些queue队列
,定义如下:
- /**
- * 为消费者分配queue的策略算法接口
- */
- public interface AllocateMessageQueueStrategy {
-
- /**
- * Allocating by consumer id
- *
- * @param consumerGroup 当前 consumer群组
- * @param currentCID 当前consumer id
- * @param mqAll 当前topic的所有queue实例引用
- * @param cidAll 当前 consumer群组下所有的consumer id set集合
- * @return 根据策略给当前consumer分配的queue列表
- */
- List
allocate( - final String consumerGroup,
- final String currentCID,
- final List
mqAll, - final List
cidAll - );
-
- /**
- * 算法名称
- *
- * @return The strategy name
- */
- String getName();
- }
相应地,RocketMQ提供了如下几种实现:
算法名称 | 含义 |
---|---|
AllocateMessageQueueAveragely | 平均分配算法 |
AllocateMessageQueueAveragelyByCircle | 基于环形平均分配算法 |
AllocateMachineRoomNearby | 基于机房临近原则算法 |
AllocateMessageQueueByMachineRoom | 基于机房分配算法 |
AllocateMessageQueueConsistentHash | 基于一致性hash算法 |
AllocateMessageQueueByConfig | 基于配置分配算法 |
为了讲述清楚上述算法的基本原理,我们先假设一个例子,下面所有的算法将基于这个例子讲解。
假设当前同一个topic下有queue队列 10
个,消费者共有4
个,如下图所示:
这里所谓的平均分配算法,并不是指的严格意义上的完全平均,如上面的例子中,10个queue,而消费者只有4个,无法是整除关系,除了整除之外的多出来的queue,将依次根据消费者的顺序均摊。
按照上述例子来看,10/4=2
,即表示每个消费者
平均均摊2个queue;而10%4=2
,即除了均摊之外,多出来2个queue
还没有分配,那么,根据消费者的顺序consumer-1
、consumer-2
、consumer-3
、consumer-4
,则多出来的2个queue
将分别给consumer-1
和consumer-2
。
最终,分摊关系如下:
consumer-1
:3个consumer-2
:3个consumer-3
:2个consumer-4
:2个环形平均算法,是指根据消费者的顺序,依次在由queue队列
组成的环形图中逐个分配
其基本模式如下:
默认消费者使用使用了
AllocateMessageQueueAveragely
平均分配策略
如果需要使用其他分配策略,使用方式如下
- //创建一个消息消费者,并设置一个消息消费者组,并指定使用一致性hash算法的分配策略
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null,"rocket_test_consumer_group",null,new AllocateMessageQueueConsistentHash());
- .....