• 架构师系列-消息中间件(七)- RocketMQ 进阶(一)


    1. 能力目标

    • 能够理解和使用RocketMQ的顺序消息
    • 能够理解和使用RocketMQ的生产者保证策略
    • 能够理解和使用RocketMQ的消息投递策略
    • 能够理解和使用RocketMQ的消费者的重试策略

    2. 车辆调度

    2.1 业务分析
    2.1.1 车辆调度分析

     

    用户打车从派单服务到调度服务,首先将消息以顺序方式扔到RocketMQ中,然后消费的事务就会严格按照放入的顺序进行消费,用户首先拿到从RocketMQ推送的顺序消息,然后保持住,开始轮询检查Redis中的List中是否存在车辆,存在两种情况:

    2.1.1.1 没有拉取到车辆

    如果没有拉取到车辆,然后会延时一段时间,继续进行拉取,一直拉取不到的话一直进行自旋,一直等到拿到车辆才退出自旋。

    2.1.1.2 拉取到车辆

    如果拉取车辆就会将用户和拿到的车辆绑定到一起,开始后续操作,比如下订单等。

    2.1.2 司机自动接单

    当司机上线后,开启自动接单后,主题流程图下

    1. 会先将车辆状态设置为在Ready状态,
    2. 当车辆接到用户后会将车辆设置为Running状态,
    3. 用户下车后,会将车辆继续设置为Ready状态,并将车辆push进list
    2.1.3 用户下车

    如果用户点击下车,主体流程如下

    1. 会先将用户状态设置为Stop状态
    2. 然后会解除车辆和用户的绑定,
    3. 之后车辆会将会push到list的尾端,让其他的用户可以拉取到车辆信息。
    2.1.4 用户打车

    用户上车后流程如下

    1. 校验用户状态,然后将发送顺序消息到RabbitMQ
    2. 消费者获取到用户消息,开始轮询来拉取车辆信息,如果拉取不到休眠一会继续拉取,一直到拉取到
    3. 拉取到后校验是否超时,如果超时直接结束打车,否则删除RabbitMQ的超时检测Key,失效超时通知
    4. 设置用户状态为Running,后续就到了司机自动接单的流程了
    2.2 技术分析
    2.2.1 RocketMQ顺序消息

    打车需要排队,我们需要让前面的人能够被消费到,不能让这个顺序乱掉,这就需要用到RocketMQ的顺序消息

    2.2.2 Redis 轮询队列

    我们要让车辆在队列中,从MQ拿到一个车辆后,需要再从队列中拿取一个车辆如果拿不到则需要不断的轮询,一直到拿到车辆为止,如果打车玩完成还是需要将车辆归还队列,让其他的用户来打车,将一辆车重复利用起来

    3. 顺序消息

    3.1 顺序类型
    3.1.1 无序消息

    无序消息也指普通的消息,Producer 只管发送消息,Consumer 只管接收消息,至于消息和消息之间的顺序并没有保证。

    • Producer 依次发送 orderId 为 1、2、3 的消息
    • Consumer 接到的消息顺序有可能是 1、2、3,也有可能是 2、1、3 等情况,这就是普通消息。
    3.1.2 全局顺序

    对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费

     

    比如 Producer 发送orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费。

    3.1.3 局部顺序

    在实际开发有些场景中,我并不需要消息完全按照完全按的先进先出,而是某些消息保证先进先出就可以了。

     

    就好比一个打车涉及到不同地区 北京上海广州深圳,我不用管其它的订单,只保证同一个地区的订单ID能保证这个顺序就可以了。

    3.2 Rocket顺序消息

    RocketMQ可以严格的保证消息有序,但这个顺序,不是全局顺序,只是分区(queue)顺序,要全局顺序只能一个分区。

    3.2.1 问题梳理

    我们知道 生产的message最终会存放在Queue中,如果一个Topic关联了4个Queue,如果我们不指定消息往哪个队列里放,那么默认是平均分配消息到4个queue,

    好比有10条消息,那么这10条消息会平均分配在这4个Queue上,那么每个Queue大概放2个左右,这里有一点很重的是:同一个queue,存储在里面的message 是按照先进先出的原则

     

    之所以出现下面这个场景看起来不是顺序的,是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不同的queue(分区)

    3.2.2 解决方案

    这个时候思路就来了,我们让不同的地区用不同的queue,只要保证同一个地区的订单把他们放到同一个Queue那就保证消费者先进先出了。

    这就保证局部顺序了,即同一订单按照先后顺序放到同一Queue,那么获取消息的时候就可以保证先进先出。

    3.2.3 如何保证集群有序

    这里还有很关键的一点,在一个消费者集群的情况下,消费者1先去Queue拿消息,它拿到了 北京订单1,它拿完后,消费者2去queue拿到的是 北京订单2

    拿的顺序是没毛病了,但关键是先拿到不代表先消费完它,会存在虽然你消费者1先拿到北京订单1,但由于网络等原因,消费者2比你真正的先消费消息,这是不是很尴尬了。

    3.2.3.1 分布式锁

    Rocker采用的是分段锁,它不是锁整个Broker而是锁里面的单个Queue,因为只要锁单个Queue就可以保证局部顺序消费了。

    所以最终的消费者这边的逻辑就是

    • 消费者1去Queue拿 北京订单1,它就锁住了整个Queue,只有它消费完成并返回成功后,这个锁才会释放。
    • 然后下一个消费者去拿到 北京订单2 同样锁住当前Queue,这样的一个过程来真正保证对同一个Queue能够真正意义上的顺序消费,而不仅仅是顺序取出。
    3.2.4 消息类型对比

    全局顺序与分区顺序对比

    Topic消息类型支持事务消息支持定时/延时消息性能
    无序消息(普通、事务、定时/延时)最高
    分区顺序消息
    全局顺序消息一般

    发送方式对比

    Topic消息类型支持可靠同步发送支持可靠异步发送支持Oneway发送
    无序消息(普通、事务、定时/延时)
    分区顺序消息
    全局顺序消息
    3.2.5 注意事项
    1. 顺序消息暂不支持广播模式。
    2. 顺序消息不支持异步发送方式,否则将无法严格保证顺序。
    3. 建议同一个 Group ID 只对应一种类型的 Topic,即不同时用于顺序消息和无序消息的收发。
    4. 对于全局顺序消息,建议创建broker个数 >=2。
    3.3 代码示例
    3.3.1 队列选择器
    1. public class SelectorFactory {
    2. /**
    3. * 工厂模式获取MessageQueueSelector
    4. *
    5. * @param value
    6. * @return
    7. */
    8. public static MessageQueueSelector getMessageQueueSelector(String value) {
    9. //如果value不为空使用hash选择器
    10. if (StringUtils.isNotEmpty(value)) {
    11. return new SelectMessageQueueByHash();
    12. }
    13. //如果value为空使用随机选择器
    14. return new SelectMessageQueueByRandom();
    15. }
    16. }
     3.3.2 消息发送者

    1. @Component
    2. public class MQProducer {
    3. @Autowired
    4. DefaultMQProducer defaultMQProducer;
    5. /**
    6. * 同步发送消息
    7. * @param taxiBO
    8. */
    9. public void send(TaxiBO taxiBO) {
    10. if (null == taxiBO) {
    11. return;
    12. }
    13. SendResult sendResult = null;
    14. try {
    15. //获取消息对象
    16. Message message = RocketMQHelper.buildMessage(DispatchConstant.SEQ_TOPIC, taxiBO);
    17. //根据区域编码获取队列选择器
    18. MessageQueueSelector selector = SelectorFactory.getMessageQueueSelector(taxiBO.getAreaCode());
    19. //发送同步消息
    20. sendResult = defaultMQProducer.send(message, selector, taxiBO.getAreaCode(), 10000);
    21. } catch (MQClientException e) {
    22. e.printStackTrace();
    23. } catch (RemotingException e) {
    24. e.printStackTrace();
    25. } catch (MQBrokerException e) {
    26. e.printStackTrace();
    27. } catch (InterruptedException e) {
    28. e.printStackTrace();
    29. }
    30. if (null != sendResult) {
    31. System.out.println(sendResult.toString());
    32. }
    33. }
    34. }
    3.3.3 消息消费者

    消费者真正要达到消费顺序,需要分布式锁,所以这里需要将MessageListenerOrderly替换之前的MessageListenerConcurrently,因为它里面实现了分布式锁。

    1. /**
    2. * 消费消息
    3. */
    4. public abstract class MQConsumeMessageListenerProcessor implements MessageListenerOrderly {
    5. public static final Logger logger = LoggerFactory.getLogger(MQConsumeMessageListenerProcessor.class);
    6. /**
    7. * 消费有序消息
    8. *
    9. * @param list
    10. * @param consumeOrderlyContext
    11. * @return
    12. */
    13. @Override
    14. public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) {
    15. if (CollectionUtils.isEmpty(list)) {
    16. logger.info("MQ接收消息为空,直接返回成功");
    17. return ConsumeOrderlyStatus.SUCCESS;
    18. }
    19. //消费消息
    20. for (MessageExt messageExt : list) {
    21. try {
    22. String topic = messageExt.getTopic();
    23. String tags = messageExt.getTags();
    24. String body = new String(messageExt.getBody(), "utf-8");
    25. //调用具体消费流程
    26. processMessage(topic, tags, body);
    27. logger.info("MQ消息topic={}, tags={}, 消息内容={}", topic, tags, body);
    28. } catch (Exception e) {
    29. logger.error("获取MQ消息内容异常{}", e);
    30. //暂停当前队列
    31. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
    32. }
    33. }
    34. // TODO 处理业务逻辑
    35. return ConsumeOrderlyStatus.SUCCESS;
    36. }
    37. /**
    38. * 处理消息
    39. *
    40. * @param body
    41. */
    42. public abstract void processMessage(String topic, String tags, String body);
    43. }

    上面我们介绍了顺序消息,它主要将相同的消息投递到一个队列中的,具体如何投递呢

    4. 消息投递策略

    上面我们介绍了顺序消息,但是RocketMQ还支持那些投递策略呢、

    RocketMQ 的消息模型整体并不复杂,如下图所示:

    一个Topic(消息主题)可能对应多个实际的消息队列(MessgeQueue)

    在底层实现上,为了提高MQ的可用性和灵活性,一个Topic在实际存储的过程中,采用了多队列的方式,具体形式如上图所示,每个消息队列在使用中应当保证先入先出(FIFO,First In First Out)的方式进行消费。

    那么,基于这种模型,就会引申出两个问题:

    • 生产者 在发送相同Topic的消息时,消息体应当被放置到哪一个消息队列(MessageQueue)中?
    • 消费者 在消费消息时,应当从哪些消息队列中拉取消息?
    4.1 生产者投递策略

    生产者投递策略就是讲如何将一个消息投递到不同的queue中

    4.1.1 轮询算法投递

    默认投递方式:基于Queue队列轮询算法投递

    默认情况下,采用了最简单的轮询算法,这种算法有个很好的特性就是,保证每一个Queue队列的消息投递数量尽可能均匀,算法如下图所示:

    4.1.2 顺序投递策略

    在有些场景下,需要保证同类型消息投递和消费的顺序性。

    例如,假设现在有TOPIC topicTest,该 Topic下有4个Queue队列,该Topic用于传递订单的状态变迁,假设订单有状态:未支付已支付发货中(处理中)发货成功发货失败

    在时序上,生产者从时序上可以生成如下几个消息:

    订单T0000001:未支付 --> 订单T0000001:已支付 --> 订单T0000001:发货中(处理中) --> 订单T0000001:发货失败

    消息发送到MQ中之后,可能由于轮询投递的原因,消息在MQ的存储可能如下:

    这种情况下,我们希望消费者消费消息的顺序和我们发送是一致的,然而,有上述MQ的投递和消费机制,我们无法保证顺序是正确的,对于顺序异常的消息,消费者 即使有一定的状态容错,也不能完全处理好这么多种随机出现组合情况。

    基于上述的情况,RockeMQ采用了这种实现方案:对于相同订单号的消息,通过一定的策略,将其放置在一个 queue队列中,然后消费者再采用一定的策略(一个线程独立处理一个queue,保证处理消息的顺序性),能够保证消费的顺序性

    生产者在消息投递的过程中,使用了 MessageQueueSelector 作为队列选择的策略接口,其定义如下:

    1. public interface MessageQueueSelector {
    2. /**
    3. * 根据消息体和参数,从一批消息队列中挑选出一个合适的消息队列
    4. * @param mqs 待选择的MQ队列选择列表
    5. * @param msg 待发送的消息体
    6. * @param arg 附加参数
    7. * @return 选择后的队列
    8. */
    9. MessageQueue select(final List mqs, final Message msg, final Object arg);
    10. }

    4.1.3 自带实现类
    投递策略策略实现类说明
    随机分配策略SelectMessageQueueByRandom使用了简单的随机数选择算法
    基于Hash分配策略SelectMessageQueueByHash根据附加参数的Hash值,按照消息队列列表的大小取余数,得到消息队列的index
    基于机器机房位置分配策略SelectMessageQueueByMachineRoom开源的版本没有具体的实现,基本的目的应该是机器的就近原则分配
    4.2 消费者分配队列

    RocketMQ对于消费者消费消息有两种形式:

    • BROADCASTING:广播式消费,这种模式下,一个消息会被通知到每一个消费者
    • CLUSTERING: 集群式消费,这种模式下,一个消息最多只会被投递到一个消费者上进行消费 模式如下:

    对于使用了消费模式为MessageModel.CLUSTERING进行消费时,需要保证一个消息在整个集群中只需要被消费一次,实际上,在RoketMQ底层,消息指定分配给消费者的实现,是通过queue队列分配给消费者的方式完成的:也就是说,消息分配的单位是消息所在的queue队列

    queue队列指定给特定的消费者后,queue队列内的所有消息将会被指定到消费者进行消费。

    RocketMQ定义了策略接口AllocateMessageQueueStrategy,对于给定的消费者分组,和消息队列列表消费者列表当前消费者应当被分配到哪些queue队列,定义如下:

    1. /**
    2. * 为消费者分配queue的策略算法接口
    3. */
    4. public interface AllocateMessageQueueStrategy {
    5. /**
    6. * Allocating by consumer id
    7. *
    8. * @param consumerGroup 当前 consumer群组
    9. * @param currentCID 当前consumer id
    10. * @param mqAll 当前topic的所有queue实例引用
    11. * @param cidAll 当前 consumer群组下所有的consumer id set集合
    12. * @return 根据策略给当前consumer分配的queue列表
    13. */
    14. List allocate(
    15. final String consumerGroup,
    16. final String currentCID,
    17. final List mqAll,
    18. final List cidAll
    19. );
    20. /**
    21. * 算法名称
    22. *
    23. * @return The strategy name
    24. */
    25. String getName();
    26. }

    相应地,RocketMQ提供了如下几种实现:

    算法名称含义
    AllocateMessageQueueAveragely平均分配算法
    AllocateMessageQueueAveragelyByCircle基于环形平均分配算法
    AllocateMachineRoomNearby基于机房临近原则算法
    AllocateMessageQueueByMachineRoom基于机房分配算法
    AllocateMessageQueueConsistentHash基于一致性hash算法
    AllocateMessageQueueByConfig基于配置分配算法

    为了讲述清楚上述算法的基本原理,我们先假设一个例子,下面所有的算法将基于这个例子讲解。

    假设当前同一个topic下有queue队列 10个,消费者共有4个,如下图所示:

    4.2.1 平均分配算法

    这里所谓的平均分配算法,并不是指的严格意义上的完全平均,如上面的例子中,10个queue,而消费者只有4个,无法是整除关系,除了整除之外的多出来的queue,将依次根据消费者的顺序均摊。

    按照上述例子来看,10/4=2,即表示每个消费者平均均摊2个queue;而10%4=2,即除了均摊之外,多出来2个queue还没有分配,那么,根据消费者的顺序consumer-1consumer-2consumer-3consumer-4,则多出来的2个queue将分别给consumer-1consumer-2

    最终,分摊关系如下:

    • consumer-1:3个
    • consumer-2:3个
    • consumer-3:2个
    • consumer-4:2个

    4.2.2 环形平均分配

    环形平均算法,是指根据消费者的顺序,依次在由queue队列组成的环形图中逐个分配

    其基本模式如下:

     

    4.2.3 使用方式

    默认消费者使用使用了AllocateMessageQueueAveragely平均分配策略

    如果需要使用其他分配策略,使用方式如下

    1. //创建一个消息消费者,并设置一个消息消费者组,并指定使用一致性hash算法的分配策略
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(null,"rocket_test_consumer_group",null,new AllocateMessageQueueConsistentHash());
    3. .....

  • 相关阅读:
    ESP8266-Arduino编程实例-TSL2561亮度传感器驱动
    Xilinx ZYNQ 7000学习笔记五(Xilinx SDK 烧写镜像文件)
    启动应用程序出现mfc90u.dll找不到问题解决
    文本摘要实战:基于seq2seq+attention实现文本摘要
    Django模型(三)
    AtCoder Beginner Contest 276 (A~E)
    python基本语法
    ArcGIS API for JavaScript 4.x 实现动态脉冲效果
    botocore.exceptions.NoCredentialsError: Unable to locate credentials
    【微信小程序开发】页面导航与传参
  • 原文地址:https://blog.csdn.net/dengwei_dw/article/details/138199707