这一节我们来看下RocketMQ的延迟消息
- public class Producer {
-
- public static void main(String[] args) throws Exception {
- // 实例化一个生产者来产生延时消息
- DefaultMQProducer producer = new DefaultMQProducer("DELAY_P_G");
-
- producer.setNamesrvAddr("127.0.0.1:9876");
-
- // 启动生产者
- producer.start();
-
- for (int i = 0; i < 1; i++) {
- Message message = new Message(MQConstant.DELAY_TOPIC, ("Hello scheduled message " + i).getBytes());
-
- /**
- * MessageStoreConfig
- * messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
- *
- * 共18个等级,依次是从1-18
- * 比如,level=3, 表示延迟10s 消费
- */
- message.setDelayTimeLevel(4);
-
- // 发送消息
- SendResult send = producer.send(message);
- System.out.println("send = " + send);
- }
- // 关闭生产者
- producer.shutdown();
- }
- }
-
延迟消息的标志就是在发送时,通过消息对象Message的setDelayTimeLevel(int level)方法设置一个延迟等级,这样该条消息就是一个延迟消息了。那么延迟等级与延迟时间是如何对应的呢?

其实延迟消息和普通消息并没有多大的差异,只不过broker在存储消息时,会判断消息的延迟属性是否为空,如果不为空,则判断是延迟消息,进而会做一些额外的处理,那么我们就看下broker存储时判断是否为延迟消息的逻辑:
CommitLog#asyncPutMessage(..)
- public CompletableFuture
asyncPutMessage(final MessageExtBrokerInner msg) { - // Set the storage time
- msg.setStoreTimestamp(System.currentTimeMillis());
- // Set the message body BODY CRC (consider the most appropriate setting
- // on the client)
- msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
- // Back to Results
- AppendMessageResult result = null;
-
- StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
-
- String topic = msg.getTopic();
- int queueId = msg.getQueueId();
-
- final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
- if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
- //TODO:延迟消息的判定
- if (msg.getDelayTimeLevel() > 0) {
- if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
- msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
- }
-
- //TODO:将延迟消息的topic替换为broker固定的topic: SCHEDULE_TOPIC_XXXX
- topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
- //TODO: 将queueid替换为(延迟级别-1)
- queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
-
- //TODO:备份原始的topic/queueid, 留着后面解析
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
- msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
-
- //TODO:将消息topic设置为延迟topic,这样订阅该topic的消费者不能及时去消费了
- //TODO:等到延迟时间到了,将延迟topic在替换成原始topic,消费者就可以消费了
- msg.setTopic(topic);
- msg.setQueueId(queueId);
- }
- }
-
- //TODO:....省略后续存储逻辑,和普通消息一样
- }
-
其实它要做的事情很简单,简单总结下:
SCHEDULE_TOPIC_XXXX所有的延时消息共用这一个topic
相同延迟级别的消息会在同一个队列中
这样就处理完了一条延迟消息,然后就是存储消息,和普通一样,这里就不展示了。

上面broker将延迟消息写到了commitlog中,由于broker替换了我们的原始topic,所以订阅该topic的消费者此时还无法消费该消息,只有当时间到了消费者才可以消费,那么我们就看下broker是如何处理的。 首先处理延迟消息的是ScheduleMessageService类,我们简单看下它的类结构:
broker启动时,会启动该类
- public class ScheduleMessageService extends ConfigManager {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
-
- private static final long FIRST_DELAY_TIME = 1000L;
- private static final long DELAY_FOR_A_WHILE = 100L;
- private static final long DELAY_FOR_A_PERIOD = 10000L;
-
- //TODO:broker启动时会初始化这个Map,key是延迟等级,共计18个,value就是延迟等级对应的时间
- private final ConcurrentMap
/* level */, Long/* delay timeMillis */> delayLevelTable = - new ConcurrentHashMap
(32); -
- private final ConcurrentMap
/* level */, Long/* offset */> offsetTable = - new ConcurrentHashMap
(32); -
- //TODO:省略其他属性和方法
-
- //TODO:broker启动时,会调用该方法
- public void start() {
- if (started.compareAndSet(false, true)) {
- super.load();
- this.timer = new Timer("ScheduleMessageTimerThread", true);
- for (Map.Entry
entry : this.delayLevelTable.entrySet()) { - Integer level = entry.getKey();
- Long timeDelay = entry.getValue();
- Long offset = this.offsetTable.get(level);
- if (null == offset) {
- offset = 0L;
- }
-
- if (timeDelay != null) {
- //TODO:处理延迟消息
- this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
- }
- }
-
- this.timer.scheduleAtFixedRate(new TimerTask() {
-
- @Override
- public void run() {
- try {
- //TODO:持久化
- if (started.get()) ScheduleMessageService.this.persist();
- } catch (Throwable e) {
- log.error("scheduleAtFixedRate flush exception", e);
- }
- }
- }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
- }
- }
-
- //TODO:省略其他方法
- }
-
关注的地方主要就是2个,一个是处理延迟消息,一个是持久化,那么我们就分别看下:
Broker中同一等级的所有延时消息会被写入到consumequeue 目录中SCHEDULE_TOPIC_XXXX目录下相同Queue中。即一个Queue中消息投递时间的延迟等级时间是相同的。那么投递时间就取决于于 消息存储时间了。即按照消息被发送到Broker的时间进行排序的。
- //TODO:遍历延迟等级列表
- for (Map.Entry
Long> entry : this.delayLevelTable.entrySet()) { - Integer level = entry.getKey();
- Long timeDelay = entry.getValue();
- Long offset = this.offsetTable.get(level);
- if (null == offset) {
- offset = 0L;
- }
-
- if (timeDelay != null) {
- this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
- }
- }
-
delayLevelTable存放的就是如下图:

key是延迟等级,value是对应的延迟时间。共计18个
每一个延迟级别对应一个DeliverDelayedMessageTimerTask,所以相同延迟级别的消息共用同一个线程。
接下来我们就看下DeliverDelayedMessageTimerTask的逻辑:
- class DeliverDelayedMessageTimerTask extends TimerTask {
- private final int delayLevel;
- private final long offset;
-
- public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
- this.delayLevel = delayLevel;
- this.offset = offset;
- }
-
- @Override
- public void run() {
- try {
- if (isStarted()) {
- //TODO:核心逻辑
- this.executeOnTimeup();
- }
- } catch (Exception e) {
- // XXX: warn and notify me
- log.error("ScheduleMessageService, executeOnTimeup exception", e);
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
- this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
- }
- }
-
- //TODO:....省略其他方法.......
- }
-
继续看下executeOnTimeup()方法的逻辑,内容比较多,不过还是很容易理解
- public void executeOnTimeup() {
- //TODO:根据延迟topic和延迟queueid 去获取Consumequeue
- ConsumeQueue cq =
- ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
- delayLevel2QueueId(delayLevel));
-
- long failScheduleOffset = offset;
-
- if (cq != null) {
- SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
- if (bufferCQ != null) {
- try {
- //TODO:offset用来标记队列读取到哪里了
- long nextOffset = offset;
- int i = 0;
- ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
- for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
- long offsetPy = bufferCQ.getByteBuffer().getLong();
- int sizePy = bufferCQ.getByteBuffer().getInt();
- long tagsCode = bufferCQ.getByteBuffer().getLong();
-
- //TODO:....省略部分代码.....
-
- long now = System.currentTimeMillis();
- //TODO:计算投递时间,时间存储在了tag hashcode 中了
- long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
-
- nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
- long countdown = deliverTimestamp - now;
-
- //TODO:投递时间到了
- if (countdown <= 0) {
- //TODO:去broker中将消息读取出来
- MessageExt msgExt =
- ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
- offsetPy, sizePy);
-
- if (msgExt != null) {
- try {
- //TODO:构建新的消息体,将原来的消息信息设置到这里,并将topic和queueid设置为原始的topic和queueid(前面备份过)
- MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
- if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
- log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
- msgInner.getTopic(), msgInner);
- continue;
- }
-
- //TODO:将消息再次写入commitlog中,topic是原始topic,这样消费者就可以去消费了
- PutMessageResult putMessageResult =
- ScheduleMessageService.this.writeMessageStore
- .putMessage(msgInner);
-
- //TODO:....省略部分代码......
-
- } catch (Exception e) {
- /*
- * XXX: warn and notify me
- */
- log.error(
- "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
- + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
- + offsetPy + ",sizePy=" + sizePy, e);
- }
- }
- } else {
- ScheduleMessageService.this.timer.schedule(
- new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
- countdown);
- ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
- return;
- }
- } // end of for
-
- nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
- this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
- ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
- return;
- } finally {
-
- bufferCQ.release();
- }
- } // end of if (bufferCQ != null)
- else {
-
- long cqMinOffset = cq.getMinOffsetInQueue();
- if (offset < cqMinOffset) {
- failScheduleOffset = cqMinOffset;
- log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
- + cqMinOffset + ", queueId=" + cq.getQueueId());
- }
- }
- } // end of if (cq != null)
-
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
- failScheduleOffset), DELAY_FOR_A_WHILE);
- }
-
我们简单对代码总结下:

3.1 则根据索引单元中的commitlog offset 和 msg size 将该条消息A从commitlog中读取出 来.
3.2 将读取出来的消息属性复制到一个新的消息对象体B中,将A中备份的原始topic, queueid 读取 出来重新设置到B中,并清除延迟属性,使其成为一条普通消息.
3.3 调用CommitLog#putMessage(msg)方法,再次将消息B写入到commitlog中。这样消费者就可以消费到订阅了该topic的消息。
4.1 计算剩余投递时间countdown(投递时间-当前时间), 然后开启一个JDK的Timer延迟任务,延迟时间就是countdown,继续执行
DeliverDelayedMessageTimerTask的逻辑。
这里简单说下:同一个Queue(delayLevel - 1)中消息投递时间的延迟等级是相同的。那么投递时间就取决于消息存储时间了。即按照消息被发送到Broker的时间进行排序的。

持久化其实也非常的简单,就是通过定时任务,每隔10s将延迟队列的消费进度offset写到文件中。
文件默认路径:
$user.home/store/config/delayOffset.json

key 就是延迟等级,value 就是对应的消费进度offset。
本文从源码的角度分析了RocketMq是如何发送延迟消息的,那么我们就简单总结下:
setDelayTimeLevel(int level) 来设置延迟等级,RocketMQ默认支持18种延迟等级,每个延迟等级对应不同的延迟时间SCHEDULE_TOPIC_XXXX