• RocketMQ源码(二十一)之延迟消息


    版本

    1. 基于rocketmq-all-4.3.1版本

    延迟消息

    1. RocketMQ支持发送延迟消息,但不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。Broker内部使用SCHEDULE_TOPIC_XXXX主题所有的延迟消息,根据延迟的level的个数,创建对应数量的ConsumeQueue,在创建ConsumeQueue时将其tagCode保存消息需要投递的时间。通过定时任务扫描ConsumeQueue,将满足条件的消息重新投递到原始的Topic中,这样消费者就可以消费了

    2. 预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

    3. 延迟消息的ConsumeQueue存储的tagsCode与普通消息不同

      • 延时消息的tagCode:存储的是消息到期的时间
      • 非延时消息的tagCode: tags字符串的hashCode
    4. 延迟消息整体交互图
      在这里插入图片描述

    发送延迟消息

    1. Producer发送延迟消息与普通发送没有太大区别,只需要设置延迟一个级别即可。延迟级别并不是时间,只是一个数字,如果超过最大值,则会被重置为最大值

      Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
      //messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
      //这里就表示10s
      message.setDelayTimeLevel(3);
      
      • 1
      • 2
      • 3
      • 4
    2. Message#setDelayTimeLevel的实现可以看到,是向消息扩展属性中添加一个DELAY属性

      public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
      public void setDelayTimeLevel(int level) {
          this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
      }
      
      • 1
      • 2
      • 3
      • 4
    3. Consumer提供消息重试的,在并发模式消费消费失败的情况下,可以返回一个枚举值 ConsumeConcurrentlyStatus.RECONSUME_LATER,那么消息之后将会进行重试。默认会进行重试16次,消息重试。Consumer发送的延迟时间间隔为:10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h。即消息重试的16个级别,即重试16次。

      SendMessageProcessor#consumerSendMsgBack
        
      if (0 == delayLevel) {
        	//从10ms开始
           delayLevel = 3 + msgExt.getReconsumeTimes();
       }
       msgExt.setDelayTimeLevel(delayLevel);
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

    Broker处理延迟消息

    1. Broker端接收处理与普通消息没有区别,只是在存储的时候有一些不同。CommitLog#putMessage对于延迟消息做了相关处理

      • 将消息的Topic更改为延迟消息特定的主题SCHEDULE_TOPIC_XXXX,根据延迟级别获取queueId(等于delayLevel-1
      • 将原始Topic、Queue备份在消息的扩展属性中(为了后续恢复原始,能被消费)
      • 保存消息到CommitLog中,异步生成ConsumeQueue和indexFile,这个和普通消息没什么区别
      final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
      // 事务prepare消息不支持延迟消息
      if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
          || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
          // Delay Delivery 0表示不延迟,大于0表示特定的延迟级别
          if (msg.getDelayTimeLevel() > 0) {
              if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                  msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
              }
              //延迟投递消息的topic
              topic = ScheduleMessageService.SCHEDULE_TOPIC;
              //根据延迟级别获取queueId(等于delayLevel - 1)
              queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
      
              // Backup real topic, queueId
              //存入真实的topic和queueId存入消息属性中
              MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
              MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
              msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
      				//更改Topic和queueId
              msg.setTopic(topic);
              msg.setQueueId(queueId);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
    2. 执行异步转发创建ConsumeQueue时,会对延迟消息进行单独处理。CommitLog#checkMessageAndReturnSize对延迟消息的特殊处理如下。这里将具体的时间保存在ConsumeQueue的TagCode(不再是Tag的hash,而是投递时间ms)好处是不需要再去检查CommitLog文件,定时任务只需要检查ConsumeQueue即可,这样可以大大提高效率。如果满足条件,再去查询CommitLog将消息投递出去

       // Timing message processing
       {
           String t = propertiesMap.get(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
           if (ScheduleMessageService.SCHEDULE_TOPIC.equals(topic) && t != null) {
               int delayLevel = Integer.parseInt(t);
      				 // 如果延迟级别边界溢出,则重置为最大
               if (delayLevel > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                   delayLevel = this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel();
               }
      
               if (delayLevel > 0) {
                   // 计算具体的投递时间,并将改时间保存在ConsumeQueue的tagCode中
                 	// 投递时间=消息存储时间(storeTimestamp) + 延迟级别对应的时间
                   tagsCode = this.defaultMessageStore.getScheduleMessageService().computeDeliverTimestamp(delayLevel,
                       storeTimestamp);
               }
           }
       }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

    Broker延迟消息投递

    1. RocketMQ通过ScheduleMessageService定时扫描ConsumeQueue来判断消息是否需要被投递

      public class ScheduleMessageService extends ConfigManager {
          private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
          // 内置的Topic,用于保存所有的定时消息
          public static final String SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
          // 第一次执行定时任务的延迟时间
          private static final long FIRST_DELAY_TIME = 1000L;
          // 第一次以后定时任务的检查间隔时间,默认100ms
          private static final long DELAY_FOR_A_WHILE = 100L;
          // 延迟消息投递失败,默认10s后再次重新投递
          private static final long DELAY_FOR_A_PERIOD = 10000L;
          // 延迟级别和延迟时间的映射关系
          private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
              new ConcurrentHashMap<Integer, Long>(32);
          // 延迟级别与消费偏移量的关系
          private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
              new ConcurrentHashMap<Integer, Long>(32);
          // 定时线程
          private final Timer timer = new Timer("ScheduleMessageTimerThread", true);
      
        	...省略...
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
    2. DefaultMessageStore#start 启动时会调用ScheduleMessageService#start,启动延迟消息投递线程以及延迟消息偏移量持久化线程

      public void start() {
          // 每个延迟级别都有一个独立的定时任务
          for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
              // 延迟级别
              Integer level = entry.getKey();
              // 延迟时间
              Long timeDelay = entry.getValue();
              // 延迟级别对应的ConsumeQueue的偏移量,从此点开始扫描
              Long offset = this.offsetTable.get(level);
              if (null == offset) {
                  offset = 0L;
              }
              // 延迟时间
              if (timeDelay != null) {
                	 // 从现在起过FIRST_DELAY_TIME毫秒(1000)仅执行一次
                  this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
              }
          }
      
          this.timer.scheduleAtFixedRate(new TimerTask() {
      
              @Override
              public void run() {
                  try {
                      ScheduleMessageService.this.persist();
                  } catch (Throwable e) {
                      log.error("scheduleAtFixedRate flush exception", e);
                  }
              }
          }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
    3. ScheduleMessageService.DeliverDelayedMessageTimerTask#DeliverDelayedMessageTimerTask检查队列中没有投递的第一条消息,如果这条消息没有到期,则之后所有的消息都不会进行检查。如果到期了,则投递,并继续检查下一条消息。如果投递失败,则10s后重新投递。如果延迟消息量比较大,可能会造成消息到期后需要很久才能被消费。

      public void executeOnTimeup() {
          // 延迟级别与queueId是一对一关系,可以相互转换
          ConsumeQueue cq =
              ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
                  delayLevel2QueueId(delayLevel));
      
          long failScheduleOffset = offset;
      
          if (cq != null) {
              SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
              if (bufferCQ != null) {
                  try {
                      long nextOffset = offset;
                      int i = 0;
                      ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                      for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                          long offsetPy = bufferCQ.getByteBuffer().getLong();
                          int sizePy = bufferCQ.getByteBuffer().getInt();
                          // tagsCode存储的是投递时间
                          long tagsCode = bufferCQ.getByteBuffer().getLong();
      
                          if (cq.isExtAddr(tagsCode)) {
                              if (cq.getExt(tagsCode, cqExtUnit)) {
                                  tagsCode = cqExtUnit.getTagsCode();
                              } else {
                                  //can't find ext content.So re compute tags code.
                                  log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                      tagsCode, offsetPy, sizePy);
                                  long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                  tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                              }
                          }
      
                          long now = System.currentTimeMillis();
                          // tagsCode保存的是超时时间,纠正当前消息的真正投递时间
                          long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
      
                          nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                          // 判断延迟消息是否过期
                          long countdown = deliverTimestamp - now;
      
                          if (countdown <= 0) {
                              MessageExt msgExt =
                                  ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                      offsetPy, sizePy);
      
                              if (msgExt != null) {
                                  try {
                                      // 恢复原始消息,清除延迟属性。重新投递消息到原始的Topic和queueId中
                                      MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                      PutMessageResult putMessageResult =
                                          ScheduleMessageService.this.defaultMessageStore
                                              .putMessage(msgInner);
      
                                      if (putMessageResult != null
                                          && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                          continue;
                                      } else {
                                          // XXX: warn and notify me
                                          log.error(
                                              "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
                                              msgExt.getTopic(), msgExt.getMsgId());
                                          // 如果投递失败,则重新投递,并且更新偏移量
                                          ScheduleMessageService.this.timer.schedule(
                                              new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                  nextOffset), DELAY_FOR_A_PERIOD);
                                          ScheduleMessageService.this.updateOffset(this.delayLevel,
                                              nextOffset);
                                          return;
                                      }
                                  } catch (Exception e) {
                                      /*
                                       * XXX: warn and notify me
                                       */
                                      log.error(
                                          "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                              + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                              + offsetPy + ",sizePy=" + sizePy, e);
                                  }
                              }
                          } else {
                              // 重新投递
                              ScheduleMessageService.this.timer.schedule(
                                  new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                  countdown);
                              ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                              return;
                          }
                      } // end of for
      
                      // 继续调度下一个,更新消费偏移量
                      nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                      ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                          this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                      ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                      return;
                  } finally {
      
                      bufferCQ.release();
                  }
              } // end of if (bufferCQ != null)
              else {
                  // 如果偏移量不正确,打印错误日志
                  long cqMinOffset = cq.getMinOffsetInQueue();
                  if (offset < cqMinOffset) {
                      failScheduleOffset = cqMinOffset;
                      log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                          + cqMinOffset + ", queueId=" + cq.getQueueId());
                  }
              }
          } // end of if (cq != null)
      
          ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
              failScheduleOffset), DELAY_FOR_A_WHILE);
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
  • 相关阅读:
    ZFS存储池速度以及RAID说明
    西宁市初中生地会考报名照片尺寸要求及手机自拍方法
    穿越功耗墙,我们该从哪些方面提升“性能”?
    Java 大厂面试必刷题 Day1:何为面向对象编程的思想?面向对象三大特征是什么?
    MySQL 开发规范
    跨数据中心Multi-Fabric解决方案:L2和L3网络的高效连接和扩展
    为什么个人IP对任何行业都至关重要
    Java 基础 面试 多线程
    【ElasticSearch】学习使用DSL和RestClient编写查询语句
    华为OD机试真题-找数字-2024年OD统一考试(C卷D卷)
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126504629