• 【RocketMQ】深入剖析延迟消息核心实现原理


    一、背景

    电商相关业务的时候,有一个常见的需求场景是:用户下单之后,超过半小时不支付,就取消订单。现在我们在淘宝京东买东西,或者通过美团点外卖,下单之后,如果不在指定时间内支付,订单也会取消。 那么,如何实现这样的超时取消逻辑呢,通过消息队列延时消息,是一个非常稳定的实现方案。

    RocketMQ 就提供了这样的延时消息功能,producer 端在发送消息时,设置延迟级别,从秒级到分钟小时等等。消息在发送之后,会在消息队列的服务器进行存储。等过了设定的延迟时间之后,消息才会被 consumer 端消费到。

    如果我们在下单的时候,发送一条设置延时30分钟的消息,这条消息会在30分钟之后被下游系统消费,然后判断订单有没有支付,如果没有支付,则取消订单。那么这样,通过消息队列就完成了一个延迟取消的逻辑了。

    二、原理

    2.1、设置延时

    先来看一下如何设置消息的延时 消息体可以通过setDelayTimeLevel方法来设置延时级别

    1. public void produce() {
    2. Message msg = new Message("TopicTest",
    3. "TagA",
    4. "OrderID188",
    5. "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
    6. msg.setDelayTimeLevel(1)
    7. SendResult sendResult = producer.send(msg);
    8. }
    9. public void consume() {
    10. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    11. consumer.subscribe("TopicTest", "TagA");
    12. consumer.registerMessageListener(new MessageListenerConcurrently() {
    13. @Override
    14. public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    15. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    16. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    17. }
    18. });
    19. consumer.start();
    20. }

    其实是将延迟信息存到 Message 的 property 中(property是一个保存meta信息的hashmap

    1. public void setDelayTimeLevel(int level) {
    2. this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    3. }
    4. void putProperty(final String name, final String value) {
    5. if (null == this.properties) {
    6. this.properties = new HashMap();
    7. }
    8. this.properties.put(name, value);
    9. }

    之后,broker收到 message之后,会根据 message 中设置的延时级别进行处理 可以看看延时级别的具体情况: 一共分为18个级别(1-18),对应时间从1s到2h

    1. public class MessageStoreConfig {
    2. private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    3. }

    那么整个系统是怎么做到让consumer在设定的延时之后,开始消费指定消息的呢?

    不得不说,RocketMQ 的设计还是挺巧妙的,我们接着往下看。

    2.2、消息预存

    对于broker收到的延时消息,并不是和普通消息一样,进入发送端指定的topic中, 而是进入专门的延时topic中,延时topic有18条队列(queueId 编号0-17),queueId 和 delayLevel 的关系是 queueId + 1 = delayLevel,是一一对应的。所以计算延时消息的待执行时间deliverTimestamp之后,会将消息存入对应延时级别的队列中。

    1. // 如果是延迟消息
    2. if (msg.getDelayTimeLevel() > 0) {
    3. if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
    4. msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
    5. }
    6. // 重设延迟消息的topic和queueId,topic为指定的RMQ_SYS_SCHEDULE_TOPIC
    7. topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
    8. queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    9. ...
    10. // 将实际的指定topic和queueId进行存入property,进行备份
    11. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
    12. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
    13. msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    14. msg.setTopic(topic);
    15. msg.setQueueId(queueId);
    16. }

    之后,会由ScheduleMessageService来进行任务处理。ScheduleMessageService是broker启动时就开始执行的,用来处理延迟队列中的消息,处理的逻辑如下所示。

    1. public class ScheduleMessageService extends ConfigManager {
    2. // key: delayLevel | value: delayTimeMillis
    3. private final ConcurrentMap delayLevelTable = new ConcurrentHashMap(32);
    4. public void start() {
    5. // 创建一个Timer,用于执行定时任务
    6. this.timer = new Timer("ScheduleMessageTimerThread", true);
    7. // 这里对每个delayLevel的queue都创建一个DeliverDelayedMessageTimerTask,
    8. // 用来处理对应queue中的消息
    9. for (Map.Entry entry : this.delayLevelTable.entrySet()) {
    10. Integer level = entry.getKey();
    11. Long timeDelay = entry.getValue();
    12. Long offset = this.offsetTable.get(level);
    13. if (null == offset) {
    14. offset = 0L;
    15. }
    16. if (timeDelay != null) {
    17. this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
    18. }
    19. }
    20. }
    21. }

    ScheduleMessageService启动之后,会根据延时队列的数目创建一一对应的DeliverDelayedMessageTimerTask,然后周期执行。该类继承自TimerTask,是JDK的工具类,用于执行定时任务。

    2.3、消息转投

    可以看到DeliverDelayedMessageTimerTask实现的 run 方法,主要逻辑都在executeOnTimeup方法中,从对应的延迟队列中取出时间已到的 message,发送到 message 对应原始topic的队列中。只要队列没有发生消费积压,message 就会马上被消费了。(这部分的代码实现比较复杂,感兴趣可以去看对应的源码)

    1. class DeliverDelayedMessageTimerTask extends TimerTask {
    2. private final int delayLevel;
    3. private final long offset;
    4. public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
    5. this.delayLevel = delayLevel;
    6. this.offset = offset;
    7. }
    8. @Override
    9. public void run() {
    10. try {
    11. if (isStarted()) {
    12. this.executeOnTimeup();
    13. }
    14. } catch (Exception e) {
    15. // XXX: warn and notify me
    16. log.error("ScheduleMessageService, executeOnTimeup exception", e);
    17. ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
    18. this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
    19. }
    20. }
    21. public void executeOnTimeup() {
    22. ConsumeQueue cq =
    23. ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
    24. delayLevel2QueueId(delayLevel));
    25. long failScheduleOffset = offset;
    26. if (cq != null) {
    27. // 这部分是核心逻辑,实现的是 从延时消息队列中取出 deliverTimestamp - now <= 0 的消息,
    28. // 将消息从延时queue移到原本指定Topic的queue中,这些消息就马上会被consumer消费。
    29. }
    30. ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
    31. failScheduleOffset), DELAY_FOR_A_WHILE);
    32. }
    33. }

    总体的原理示意图,如下所示:

    broker 在接收到延时消息的时候,会将延时消息存入到延时TOPIC的队列中,然后ScheduleMessageService中,每个 queue 对应的定时任务会不停地被执行,检查 queue 中哪些消息已到设定时间,然后转发到消息的原始TOPIC,这些消息就会被各自的 producer 消费了。

    三、拓展-消费重试

    平常在使用RocketMQ的时候,一般会依赖consumer的消费重试功能。 而consumer端的消费重试,其实也是通过这个和延时队列差不多的原理来实现的。

    1. public void consume() {
    2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    3. consumer.subscribe("TopicTest", "TagA");
    4. consumer.registerMessageListener(new MessageListenerConcurrently() {
    5. @Override
    6. public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    7. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    8. // 这里如果返回RECONSUME_LATER,就会重试消费
    9. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    10. }
    11. });
    12. consumer.start();
    13. }

    RocketMQ规定,以下三种情况统一按照消费失败处理并会发起重试。

    • 业务消费方返回ConsumeConcurrentlyStatus.RECONSUME_LATER

    • 业务消费方返回null

    • 业务消费方主动/被动抛出异常

    业务代码中,一般会利用重试功能去做下游逻辑的重试。而RocketMQ的重试并不是固定时间间隔重复进行,二是采取的退避式重试,重试的时间间隔会不断变长。 这个时间间隔,和设置delayLevel的延时类似。

    Consumer客户端会通过processConsumeResult方法处理每一条消息的消费结果,如果判断需要进行重试,则会通过sendMessageBack方法将消息发送到broker,重试消息会带上已重试次数的信息。

    broker收到消息之后,SendMessageProcessor会对重试消息进行处理,设置topic为RETRY_TOPIC,具体逻辑如下:

    1. public class SendMessageProcessor
    2. extends AbstractSendMessageProcessor
    3. implements NettyRequestProcessor {
    4. private RemotingCommand asyncConsumerSendMsgBack(ChannelHandlerContext ctx, RemotingCommand request)
    5. throws RemotingCommandException {
    6. // 给重试消息设置新的topic
    7. String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
    8. int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
    9. // 根据已经发生重试的次数确定delayLevel
    10. if (0 == delayLevel) {
    11. delayLevel = 3 + msgExt.getReconsumeTimes();
    12. }
    13. msgExt.setDelayTimeLevel(delayLevel);
    14. // 重试次数+1
    15. msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
    16. // 存储消息
    17. PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
    18. // ...
    19. }
    20. }

    存储消息的时候,CommitLog.putMessage方法内部判断如果设置了delayLevel,就会重设topic为SCHEDULE_TOPIC,然后将消息存储到延时队列中,后续就是和ScheduleMessageService的逻辑相同。

    整个消息重试的逻辑示意图如下所示:

    如图所示

    1. Consumer在消费的时候,都会订阅指定的TOPIC-NORMAL_TOPIC和该ConsumerGroup对应的重试TOPIC-RETRY_GROUP1_TOPIC,同时消费来自这两个topic中的消息。

    2. 当发生消费失败后,Client端会调用sendMessageBack方法将失败消息发送回broker。

    3. broker端的SendMessageProcessor会根据当前的重试次数确定延时级别,将消息存入延时队列-SCHEDULE_TOPIC中。

    4. ScheduleMessageService会将到期的消息重新发送到重试TOPIC-RETRY_GROUP1_TOPIC中,这个时候消息被Consumer消费,就完成了整个重试过程。

    可以对比之前的延时消息流程去看,其实重试消息就是将失败的消息当作延时消息进行处理,只不过最后投入的是专门的重试消息队列中。

    四、总结

    延时消息都是非常日常业务使用中很重要的功能,而RocketMQ通过时间片分级+多队列+定时任务,就实现了这样的功能,设计上是很巧妙的。并且消费重试采用退避式的策略,重试时间的梯度刚好与延时消息策略一致,这样就可以直接利用延时队列去完成消息重试的功能,从策略上来说非常合理(消息消费重复失败,在短时间内重试成功的可能性比较低),并且复用了底层代码,这些是值得去学习和借鉴的。

  • 相关阅读:
    周四见|物流人的一周资讯
    java基础 异常
    Spring之Bean生命周期源码解析
    算法 旋转数组最小数字-(二分查找+反向双指针)
    Raft介绍
    linux命令2
    Task04 吃瓜教程——第五章 神经网络
    深度优先搜索&广度优先搜索
    Java方向面试题(一)
    大数定律与中心极限定理
  • 原文地址:https://blog.csdn.net/weixin_63566550/article/details/134269885