• 【RocketMQ】消息的消费


    上一讲【RocketMQ】消息的拉取

    消息消费

    当RocketMQ进行消息消费的时候,是通过ConsumeMessageConcurrentlyServicesubmitConsumeRequest方法,将消息提交到线程池中进行消费,具体的处理逻辑如下:

    1. 如果本次消息的个数小于等于批量消费的大小consumeBatchSize,构建消费请求ConsumeRequest,直接提交到线程池中进行消费即可
    2. 如果本次消息的个数大于批量消费的大小consumeBatchSize,说明需要分批进行提交,每次构建consumeBatchSize个消息提交到线程池中进行消费
    3. 如果出现拒绝提交的异常,调用submitConsumeRequestLater方法延迟进行提交

    RocketMQ消息消费是批量进行的,如果一批消息的个数小于预先设置的批量消费大小,直接构建消费请求将消费任务提交到线程池处理即可,否则需要分批进行提交。

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        @Override
        public void submitConsumeRequest(
            final List msgs,
            final ProcessQueue processQueue,
            final MessageQueue messageQueue,
            final boolean dispatchToConsume) {
            final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
            // 如果消息的个数小于等于批量消费的大小
            if (msgs.size() <= consumeBatchSize) {
                // 构建消费请求
                ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
                try {
                    // 加入到消费线程池中
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    this.submitConsumeRequestLater(consumeRequest);
                }
            } else {
                // 遍历消息
                for (int total = 0; total < msgs.size(); ) {
                    // 创建消息列表,大小为consumeBatchSize,用于批量提交使用
                    List msgThis = new ArrayList(consumeBatchSize);
                    for (int i = 0; i < consumeBatchSize; i++, total++) {
                        if (total < msgs.size()) {
                            // 加入到消息列表中
                            msgThis.add(msgs.get(total));
                        } else {
                            break;
                        }
                    }
                    // 创建ConsumeRequest
                    ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                    try {
                        // 加入到消费线程池中
                        this.consumeExecutor.submit(consumeRequest);
                    } catch (RejectedExecutionException e) {
                        for (; total < msgs.size(); total++) {
                            msgThis.add(msgs.get(total));
                        }
                        // 如果出现拒绝提交异常,延迟进行提交
                        this.submitConsumeRequestLater(consumeRequest);
                    }
                }
            }
        }
    }
    
    折叠

    消费任务运行

    ConsumeRequestConsumeMessageConcurrentlyService的内部类,实现了Runnable接口,在run方法中,对消费任务进行了处理:

    1. 判断消息所属的处理队列processQueue是否处于删除状态,如果已被删除,不进行处理

    2. 重置消息的重试主题

      因为延迟消息的主题在后续处理的时候被设置为SCHEDULE_TOPIC_XXXX,所以这里需要重置。

    3. 如果设置了消息消费钩子函数,执行executeHookBefore钩子函数

    4. 获取消息监听器,调用消息监听器的consumeMessage进行消息消费,并返回消息的消费结果状态,状态有两种分别为CONSUME_SUCCESSRECONSUME_LATER

      CONSUME_SUCCESS:表示消息消费成功。

      RECONSUME_LATER:表示消费失败,稍后延迟重新进行消费。

    5. 获取消费的时长,判断是否超时

    6. 如果设置了消息消费钩子函数,执行executeHookAfter钩子函数

    7. 再次判断消息所属的处理队列是否处于删除状态,如果不处于删除状态,调用processConsumeResult方法处理消费结果

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        class ConsumeRequest implements Runnable {
            private final List msgs;
            private final ProcessQueue processQueue; // 处理队列
            private final MessageQueue messageQueue; // 消息队列
          
            @Override
            public void run() {
                // 如果处理队列已被删除
                if (this.processQueue.isDropped()) {
                    log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
                    return;
                }
                // 获取消息监听器
                MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
                ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
                ConsumeConcurrentlyStatus status = null;
                // 重置消息重试主题名称 
                defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
                ConsumeMessageContext consumeMessageContext = null;
                // 如果设置了钩子函数
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    // ...
    // 执行钩子函数            
                  ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
                }
    
                long beginTimestamp = System.currentTimeMillis();
                boolean hasException = false;
                ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                try {
                    if (msgs != null && !msgs.isEmpty()) {
                        for (MessageExt msg : msgs) {
                            // 设置消费开始时间戳
                            MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
                        }
                    }
                    // 通过消息监听器的consumeMessage进行消息消费,并返回消费结果状态
                    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
                } catch (Throwable e) {
                    log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
                        RemotingHelper.exceptionSimpleDesc(e),
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue), e);
                    hasException = true;
                }
                // 计算消费时长
                long consumeRT = System.currentTimeMillis() - beginTimestamp;
                if (null == status) {
                    if (hasException) {
                        // 出现异常
                        returnType = ConsumeReturnType.EXCEPTION;
                    } else {
                        // 返回NULL
                        returnType = ConsumeReturnType.RETURNNULL;
                    }
                } else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) { // 判断超时
                    returnType = ConsumeReturnType.TIME_OUT; // 返回类型置为超时
                } else if (ConsumeConcurrentlyStatus.RECONSUME_LATER == status) { // 如果延迟消费
                    returnType = ConsumeReturnType.FAILED; // 返回类置为失败
                } else if (ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status) { // 如果成功状态
                    returnType = ConsumeReturnType.SUCCESS; // 返回类型为成功
                }
                // ...
                // 如果消费状态为空
                if (null == status) {
                    log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                        ConsumeMessageConcurrentlyService.this.consumerGroup,
                        msgs,
                        messageQueue);
                    // 状态置为延迟消费
                    status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                // 如果设置了钩子函数
                if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
                    consumeMessageContext.setStatus(status.toString());
                    consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
                    // 执行executeHookAfter方法
                    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
                }
                ConsumeMessageConcurrentlyService.this.getConsumerStatsManager()
                    .incConsumeRT(ConsumeMessageConcurrentlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
                if (!processQueue.isDropped()) {
                    // 处理消费结果
                    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
                } else {
                    log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
                }
            }
        }
    }
    
    // 重置消息重试主题
    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
       public void resetRetryAndNamespace(final List msgs, String consumerGroup) {
            // 获取消费组的重试主题:%RETRY% + 消费组名称
            final String groupTopic = MixAll.getRetryTopic(consumerGroup);
            for (MessageExt msg : msgs) {
                // 获取消息的重试主题名称
                String retryTopic = msg.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                // 如果重试主题不为空并且与消费组的重试主题一致
                if (retryTopic != null && groupTopic.equals(msg.getTopic())) {
                    // 设置重试主题
                    msg.setTopic(retryTopic);
                }
                if (StringUtils.isNotEmpty(this.defaultMQPushConsumer.getNamespace())) {
                    msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
                }
            }
        }
      
    }
    
    // 消费结果状态
    public enum ConsumeConcurrentlyStatus {
        /**
         * 消费成功
         */
        CONSUME_SUCCESS,
        /**
         * 消费失败,延迟进行消费
         */
        RECONSUME_LATER;
    }
    
    折叠

    处理消费结果

    一、设置ackIndex

    ackIndex的值用来判断失败消息的个数,在processConsumeResult方法中根据消费结果状态进行判断,对ackIndex的值进行设置,前面可知消费结果状态有以下两种:

    • CONSUME_SUCCESS:消息消费成功,此时ackIndex设置为消息大小 - 1,表示消息都消费成功。
    • RECONSUME_LATER:消息消费失败,返回延迟消费状态,此时ackIndex置为-1,表示消息都消费失败。

    二、处理消费失败的消息

    广播模式

    广播模式下,如果消息消费失败,只将失败的消息打印出来不做其他处理。

    集群模式

    开启for循环,初始值为i = ackIndex + 1,结束条件为i < consumeRequest.getMsgs().size(),上面可知ackIndex有两种情况:

    • 消费成功:ackIndex值为消息大小-1,此时ackIndex + 1的值等于消息的个数大小,不满足for循环的执行条件,相当于消息都消费成功,不需要进行失败的消息处理。
    • 延迟消费:ackIndex值为-1,此时ackIndex+1为0,满足for循环的执行条件,从第一条消息开始遍历到最后一条消息,调用sendMessageBack方法向Broker发送CONSUMER_SEND_MSG_BACK消息,如果发送成功Broker会根据延迟等级,放入不同的延迟队列中,到达延迟时间后,消费者将会重新进行拉取,如果发送失败,加入到失败消息列表中,稍后重新提交消费任务进行处理。

    三、移除消息,更新拉取偏移量

    以上步骤处理完毕后,首先调用removeMessage从处理队列中移除消息并返回拉取消息的偏移量,然后调用updateOffset更新拉取偏移量。

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
        public void processConsumeResult(
            final ConsumeConcurrentlyStatus status,
            final ConsumeConcurrentlyContext context,
            final ConsumeRequest consumeRequest
        ) {
            // 获取ackIndex
            int ackIndex = context.getAckIndex();
            if (consumeRequest.getMsgs().isEmpty())
                return;
    
            switch (status) {
                case CONSUME_SUCCESS: // 如果消费成功
                    // 如果ackIndex大于等于消息的大小
                    if (ackIndex >= consumeRequest.getMsgs().size()) {
                        // 设置为消息大小-1
                        ackIndex = consumeRequest.getMsgs().size() - 1;
                    }
                    // 计算消费成功的的个数
                    int ok = ackIndex + 1;
                    // 计算消费失败的个数
                    int failed = consumeRequest.getMsgs().size() - ok;
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                    break;
                case RECONSUME_LATER: // 如果延迟消费
                    // ackIndex置为-1
                    ackIndex = -1;
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                        consumeRequest.getMsgs().size());
                    break;
                default:
                    break;
            }
            // 判断消费模式
            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING: // 广播模式
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                    }
                    break;
                case CLUSTERING: // 集群模式
                    List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
                    // 遍历消费失败的消息
                    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                        // 获取消息
                        MessageExt msg = consumeRequest.getMsgs().get(i);
                        // 向Broker发送延迟消息
                        boolean result = this.sendMessageBack(msg, context);
                        // 如果发送失败
                        if (!result) {
                            // 消费次数+1
                            msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                            // 加入失败消息列表中
                            msgBackFailed.add(msg);
                        }
                    }
                    // 如果不为空
                    if (!msgBackFailed.isEmpty()) {
                        consumeRequest.getMsgs().removeAll(msgBackFailed);
                        // 稍后重新进行消费
                        this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                    }
                    break;
                default:
                    break;
            }
            // 从处理队列中移除消息
            long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
            if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
                // 更新拉取偏移量
                this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
            }
        }
    }
    
    折叠

    发送CONSUMER_SEND_MSG_BACK消息

    延迟级别

    RocketMQ的延迟级别对应的延迟时间常量定义在MessageStoreConfigmessageDelayLevel变量中:

    public class MessageStoreConfig {
        private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    }
    

    延迟级别与延迟时间对应关系:

    延迟级别0 ---> 对应延迟时间1s,也就是延迟1秒后消费者重新从Broker拉取进行消费

    延迟级别1 ---> 延迟时间5s

    延迟级别2 ---> 延迟时间10s

    ...

    以此类推,最大的延迟时间为2h

    sendMessageBack方法中,首先从上下文中获取了延迟级别(ConsumeConcurrentlyContext中可以看到,延迟级别默认为0),并对主题加上Namespace,然后调用defaultMQPushConsumerImplsendMessageBack发送消息:

    public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
       public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
            // 获取延迟级别
            int delayLevel = context.getDelayLevelWhenNextConsume();
            // 对主题添加上Namespace
            msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
            try {
                // 向Broker发送消息
                this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
                return true;
            } catch (Exception e) {
                log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
            }
            return false;
        }
    }
    
    // 并发消费上下文
    public class ConsumeConcurrentlyContext {
        /**
         * -1,不进行重试,加入DLQ队列
         * 0, Broker控制重试频率
         * >0, 客户端控制
         */
        private int delayLevelWhenNextConsume = 0; // 默认为0
    }
    

    DefaultMQPushConsumerImpsendMessageBack方法中又调用了MQClientAPIImplconsumerSendMessageBack方法进行发送:

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
        public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
            throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
            try {
                // 获取Broker地址
                String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
                    : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
                // 调用consumerSendMessageBack方法发送消息
                this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
                    this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
            } catch (Exception e) {
                // ...
            } finally {
                msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
            }
        }
    }
    

    MQClientAPIImplconsumerSendMessageBack方法中,可以看到设置的请求类型是CONSUMER_SEND_MSG_BACK,然后设置了消息的相关信息,向Broker发送请求:

    public class MQClientAPIImpl {
        public void consumerSendMessageBack(
            final String addr,
            final MessageExt msg,
            final String consumerGroup,
            final int delayLevel,
            final long timeoutMillis,
            final int maxConsumeRetryTimes
        ) throws RemotingException, MQBrokerException, InterruptedException {
            // 创建请求头
            ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
            // 设置请求类型为CONSUMER_SEND_MSG_BACK
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
            // 设置消费组
            requestHeader.setGroup(consumerGroup);
            requestHeader.setOriginTopic(msg.getTopic());
            // 设置消息物理偏移量
            requestHeader.setOffset(msg.getCommitLogOffset());
            // 设置延迟级别
            requestHeader.setDelayLevel(delayLevel);
            // 设置消息ID
            requestHeader.setOriginMsgId(msg.getMsgId());
            // 设置最大消费次数
            requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
            // 向Broker发送请求
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
                request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    return;
                }
                default:
                    break;
            }
            throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
        }
    }
    
    折叠

    Broker对请求的处理

    Broker对CONSUMER_SEND_MSG_BACK类型的请求在SendMessageProcessor中,处理逻辑如下:

    1. 根据消费组获取订阅信息配置,如果获取为空,记录错误信息,直接返回
    2. 获取消费组的重试主题,然后从重试队列中随机选取一个队列,并创建TopicConfig主题配置信息
    3. 根据消息的物理偏移量从commitlog中获取消息
    4. 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0
      • 如果条件满足,表示需要把消息放入到死信队列DLQ中,此时设置DLQ队列ID
      • 如果不满足,判断延迟级别是否为0,如果为0,使用3 + 消息的消费次数作为新的延迟级别
    5. 新建消息MessageExtBrokerInner,设置消息的相关信息,此时相当于生成了一个全新的消息(会设置之前消息的ID),会重新添加到CommitLog中,消息主题的设置有两种情况:
      • 达到了加入DLQ队列的条件,此时主题为DLQ主题(%DLQ% + 消费组名称),消息之后会添加到选取的DLQ队列中
      • 未达到DLQ队列的条件,此时主题为重试主题(%RETRY% + 消费组名称),之后重新进行消费
    6. 调用asyncPutMessage添加消息,详细过程可参考之前的文章【消息的存储】
    public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
        // 处理请求
        public CompletableFuture asyncProcessRequest(ChannelHandlerContext ctx,
                                                                      RemotingCommand request) throws RemotingCommandException {
            final SendMessageContext mqtraceContext;
            switch (request.getCode()) {
                case RequestCode.CONSUMER_SEND_MSG_BACK:
                    // 处理请求
                    return this.asyncConsumerSendMsgBack(ctx, request);
                default:
                    // ...
            }
        }
      
        private CompletableFuture asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
                                                                            RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final ConsumerSendMsgBackRequestHeader requestHeader =
                    (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
            // ...
            // 根据消费组获取订阅信息配置
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
            // 如果为空,直接返回
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return CompletableFuture.completedFuture(response);
            }
            // ...
        
            // 获取消费组的重试主题
            String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
            // 从重试队列中随机选取一个队列
            int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
            int topicSysFlag = 0;
            if (requestHeader.isUnitMode()) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            }
            // 创建TopicConfig主题配置信息
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
            //...
        
            // 根据消息物理偏移量从commitLog文件中获取消息
            MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
            if (null == msgExt) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("look message by offset failed, " + requestHeader.getOffset());
                return CompletableFuture.completedFuture(response);
            }
            // 获取消息的重试主题
            final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
            if (null == retryTopic) {
                MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
            }
            msgExt.setWaitStoreMsgOK(false);
            // 延迟等级获取
            int delayLevel = requestHeader.getDelayLevel();
            // 获取最大消费重试次数
            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                Integer times = requestHeader.getMaxReconsumeTimes();
                if (times != null) {
                    maxReconsumeTimes = times;
                }
            }
            // 判断消息的消费次数是否大于等于最大消费次数 或者 延迟等级小于0
            if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
                || delayLevel < 0) {
                // 获取DLQ主题
                newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
                // 选取一个队列
                queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
                // 创建DLQ的topicConfig
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                        DLQ_NUMS_PER_GROUP,
                        PermName.PERM_WRITE | PermName.PERM_READ, 0);
                // ...
            } else {
                 // 如果延迟级别为0
                if (0 == delayLevel) {
                    // 更新延迟级别
                    delayLevel = 3 + msgExt.getReconsumeTimes();
                }
                // 设置延迟级别
                msgExt.setDelayTimeLevel(delayLevel);
            }
            // 新建消息
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            msgInner.setTopic(newTopic); // 设置主题
            msgInner.setBody(msgExt.getBody()); // 设置消息
            msgInner.setFlag(msgExt.getFlag());
            MessageAccessor.setProperties(msgInner, msgExt.getProperties()); // 设置消息属性
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
            msgInner.setQueueId(queueIdInt); // 设置队列ID
            msgInner.setSysFlag(msgExt.getSysFlag());
            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
            msgInner.setBornHost(msgExt.getBornHost());
            msgInner.setStoreHost(msgExt.getStoreHost()); 
            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);// 设置消费次数
            // 原始的消息ID
            String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
            // 设置消息ID
            MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
            // 添加重试消息
            CompletableFuture putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
            return putMessageResult.thenApply((r) -> {
                if (r != null) {
                    switch (r.getPutMessageStatus()) {
                        case PUT_OK:
                            // ...
                            return response;
                        default:
                            break;
                    }
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark(r.getPutMessageStatus().name());
                    return response;
                }
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("putMessageResult is null");
                return response;
            });
        }
    }
    
    折叠

    延迟消息处理

    【消息的存储】文章可知,消息添加会进入到asyncPutMessage方法中,首先获取了事务类型,如果未使用事务或者是提交事务的情况下,对延迟时间级别进行判断,如果延迟时间级别大于0,说明消息需要延迟消费,此时做如下处理:

    1. 判断消息的延迟级别是否超过了最大延迟级别,如果超过了就使用最大延迟级别

    2. 获取RMQ_SYS_SCHEDULE_TOPIC,它是在TopicValidator中定义的常量,值为SCHEDULE_TOPIC_XXXX:

      public class TopicValidator {
          // ...
          public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
      }
      
    3. 根据延迟级别选取对应的队列,一般会把相同延迟级别的消息放在同一个队列中

    4. 备份之前的TOPIC和队列ID

    5. 更改消息队列的主题为RMQ_SYS_SCHEDULE_TOPIC,所以延迟消息的主题最终被设置为RMQ_SYS_SCHEDULE_TOPIC,放在对应的延迟队列中进行处理

    public class CommitLog {
        public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) {
            // ...
            // 获取事务类型
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            // 如果未使用事务或者提交事务
            if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
                // 判断延迟级别
                if (msg.getDelayTimeLevel() > 0) {
                    // 如果超过了最大延迟级别
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
                    // 获取RMQ_SYS_SCHEDULE_TOPIC
                    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                    // 根据延迟级别选取对应的队列
                    int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                    // 备份之前的TOPIC和队列ID
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                    // 设置SCHEDULE_TOPIC
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
            }
            // ...
        }
    }
    
    折叠

    拉取进度持久化

    RocketMQ消费模式分为广播模式和集群模式,广播模式下消费进度保存在每个消费者端,集群模式下消费进度保存在Broker端。

    广播模式

    更新进度

    LocalFileOffsetStore中使用了一个ConcurrentMap类型的变量offsetTable存储消息队列对应的拉取偏移量,KEY为消息队列,value为该消息队列对应的拉取偏移量。

    在更新拉取进度的时候,从offsetTable中获取当前消息队列的拉取偏移量,如果为空,则新建并保存到offsetTable中,否则获取之前已经保存的偏移量,对值进行更新,需要注意这里只是更新了offsetTable中的数据,并没有持久化到磁盘,持久化的操作在persistAll方法中

    public class LocalFileOffsetStore implements OffsetStore {
        // offsetTable:KEY为消息队列,value为该消息队列的拉取偏移量
        private ConcurrentMap offsetTable =
            new ConcurrentHashMap();
      
        @Override
        public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
            if (mq != null) {
                // 获取之前的拉取进度
                AtomicLong offsetOld = this.offsetTable.get(mq);
                if (null == offsetOld) {
                    // 如果之前不存在,进行创建
                    offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
                }
                // 如果不为空
                if (null != offsetOld) {
                    if (increaseOnly) {
                        MixAll.compareAndIncreaseOnly(offsetOld, offset);
                    } else {
                        // 更新拉取偏移量
                        offsetOld.set(offset);
                    }
                }
            }
        }
    }
    

    加载进度

    由于广播模式下消费进度保存在消费者端,所以需要从本地磁盘加载之前保存的消费进度文件。

    LOCAL_OFFSET_STORE_DIR:消费进度文件所在的根路径

    public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
            "rocketmq.client.localOffsetStoreDir", System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
    

    在LocalFileOffsetStore的构造函数中可以看到,对拉取偏移量的保存文件路径进行了设置,为LOCAL_OFFSET_STORE_DIR + 客户端ID + 消费组名称 + offsets.json,从名字上看,消费进度的数据格式是以JSON的形式进行保存的:

    this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator + this.mQClientFactory.getClientId() + File.separator +
                this.groupName + File.separator + "offsets.json";
    

    在load方法中,首先从本地读取 offsets.json文件,并序列化为OffsetSerializeWrapper对象,然后将保存的消费进度加入到offsetTable中:

     public class LocalFileOffsetStore implements OffsetStore {
       
        // 文件路径
        public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
            "rocketmq.client.localOffsetStoreDir",
            System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
        private final String storePath;
        // ...
       
        public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
            this.mQClientFactory = mQClientFactory;
            this.groupName = groupName;
            // 设置拉取进度文件的路径
            this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
                this.mQClientFactory.getClientId() + File.separator +
                this.groupName + File.separator +
                "offsets.json";
        }
        @Override
        public void load() throws MQClientException {
            // 从本地读取拉取偏移量
            OffsetSerializeWrapper offsetSerializeWrapper = this.readLocalOffset();
            if (offsetSerializeWrapper != null && offsetSerializeWrapper.getOffsetTable() != null) {
                // 加入到offsetTable中
                offsetTable.putAll(offsetSerializeWrapper.getOffsetTable());
    
                for (Entry mqEntry : offsetSerializeWrapper.getOffsetTable().entrySet()) {
                    AtomicLong offset = mqEntry.getValue();
                    log.info("load consumer's offset, {} {} {}",
                            this.groupName,
                            mqEntry.getKey(),
                            offset.get());
                }
            }
        }
      
        // 从本地加载文件
        private OffsetSerializeWrapper readLocalOffset() throws MQClientException {
            String content = null;
            try {
                // 读取文件
                content = MixAll.file2String(this.storePath);
            } catch (IOException e) {
                log.warn("Load local offset store file exception", e);
            }
            if (null == content || content.length() == 0) {
                return this.readLocalOffsetBak();
            } else {
                OffsetSerializeWrapper offsetSerializeWrapper = null;
                try {
                    // 序列化
                    offsetSerializeWrapper =
                        OffsetSerializeWrapper.fromJson(content, OffsetSerializeWrapper.class);
                } catch (Exception e) {
                    log.warn("readLocalOffset Exception, and try to correct", e);
                    return this.readLocalOffsetBak();
                }
    
                return offsetSerializeWrapper;
            }
        }
    }
    
    折叠

    OffsetSerializeWrapper

    OffsetSerializeWrapper中同样使用了ConcurrentMap,从磁盘的offsets.json文件中读取数据后,将JSON转为OffsetSerializeWrapper对象,就可以通过OffsetSerializeWrapperoffsetTable获取到之前保存的每个消息队列的消费进度,然后加入到LocalFileOffsetStoreoffsetTable中:

    public class OffsetSerializeWrapper extends RemotingSerializable {
        private ConcurrentMap offsetTable =
            new ConcurrentHashMap();
    
        public ConcurrentMap getOffsetTable() {
            return offsetTable;
        }
    
        public void setOffsetTable(ConcurrentMap offsetTable) {
            this.offsetTable = offsetTable;
        }
    }
    
    

    持久化进度

    updateOffset更新只是将内存中的数据进行了更改,并未保存到磁盘中,持久化的操作是在persistAll方法中实现的:

    1. 创建OffsetSerializeWrapper对象
    2. 遍历LocalFileOffsetStore的offsetTable,将数据加入到OffsetSerializeWrapper的OffsetTable中
    3. OffsetSerializeWrapper转为JSON
    4. 调用string2File方法将JSON数据保存到磁盘文件
     public class LocalFileOffsetStore implements OffsetStore {
        @Override
        public void persistAll(Set mqs) {
            if (null == mqs || mqs.isEmpty())
                return;OffsetSerializeWrapper
            // 创建
            OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
            // 遍历offsetTable
            for (Map.Entry entry : this.offsetTable.entrySet()) {
                if (mqs.contains(entry.getKey())) {
                    // 获取拉取偏移量
                    AtomicLong offset = entry.getValue();
                    // 加入到OffsetSerializeWrapper的OffsetTable中
                    offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
                }
            }
            // 将对象转为JSON
            String jsonString = offsetSerializeWrapper.toJson(true);
            if (jsonString != null) {
                try {
                    // 将JSON数据保存到磁盘文件
                    MixAll.string2File(jsonString, this.storePath);
                } catch (IOException e) {
                    log.error("persistAll consumer offset Exception, " + this.storePath, e);
                }
            }
        }
    }
    
    折叠

    集群模式

    集群模式下消费进度保存在Broker端。

    更新进度

    集群模式下的更新进度与广播模式下的更新类型,都是只更新了offsetTable中的数据:

    public class RemoteBrokerOffsetStore implements OffsetStore {
        
        private ConcurrentMap offsetTable =
            new ConcurrentHashMap();
        @Override
        public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
            if (mq != null) {
                // 获取消息队列的进度
                AtomicLong offsetOld = this.offsetTable.get(mq);
                if (null == offsetOld) {
                    // 将消费进度保存在offsetTable中
                    offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
                }
                if (null != offsetOld) {
                    if (increaseOnly) {
                        MixAll.compareAndIncreaseOnly(offsetOld, offset);
                    } else {
                        // 更新拉取偏移量
                        offsetOld.set(offset);
                    }
                }
            }
        }
    }
    

    加载

    集群模式下加载消费进度需要从Broker获取,在消费者发送消息拉取请求的时候,Broker会计算消费偏移量,所以RemoteBrokerOffsetStore的load方法为空,什么也没有干:

    public class RemoteBrokerOffsetStore implements OffsetStore {
        @Override
        public void load() {
        }
    }
    

    持久化

    由于集群模式下消费进度保存在Broker端,所以persistAll方法中调用了updateConsumeOffsetToBroker向Broker发送请求进行消费进度保存:

    public class RemoteBrokerOffsetStore implements OffsetStore {
        @Override
        public void persistAll(Set mqs) {
            if (null == mqs || mqs.isEmpty())
                return;
    
            final HashSet unusedMQ = new HashSet();
    
            for (Map.Entry entry : this.offsetTable.entrySet()) {
                MessageQueue mq = entry.getKey();
                AtomicLong offset = entry.getValue();
                if (offset != null) {
                    if (mqs.contains(mq)) {
                        try {
                            // 向Broker发送请求更新拉取偏移量
                            this.updateConsumeOffsetToBroker(mq, offset.get());
                            log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                                this.groupName,
                                this.mQClientFactory.getClientId(),
                                mq,
                                offset.get());
                        } catch (Exception e) {
                            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                        }
                    } else {
                        unusedMQ.add(mq);
                    }
                }
            }
            // ...
        }
    }
    
    折叠

    持久化的触发

    MQClientInstance在启动定时任务的方法startScheduledTask中注册了定时任务,定时调用persistAllConsumerOffset对拉取进度进行持久化,persistAllConsumerOffset中又调用了MQConsumerInnerpersistConsumerOffset方法:

    public class MQClientInstance {
        private void startScheduledTask() {
            // ...
            // 注册定时任务,定时持久化拉取进度
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 持久化
                        MQClientInstance.this.persistAllConsumerOffset();
                    } catch (Exception e) {
                        log.error("ScheduledTask persistAllConsumerOffset exception", e);
                    }
                }
            }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
            // ...
        }
        
        private void persistAllConsumerOffset() {
            Iterator> it = this.consumerTable.entrySet().iterator();
            while (it.hasNext()) {
                Entry entry = it.next();
                MQConsumerInner impl = entry.getValue();
                // 调用persistConsumerOffset进行持久化
                impl.persistConsumerOffset();
            }
        }
    }
    
    折叠

    DefaultMQPushConsumerImplMQConsumerInner的一个子类,以它为例可以看到在persistConsumerOffset方法中调用了offsetStore的persistAll方法进行持久化:

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
        @Override
        public void persistConsumerOffset() {
            try {
                this.makeSureStateOK();
                Set mqs = new HashSet();
                Set allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
                mqs.addAll(allocateMq);
                // 拉取进度持久化
                this.offsetStore.persistAll(mqs);
            } catch (Exception e) {
                log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
            }
        }
    }
    

    总结

    参考
    丁威、周继锋《RocketMQ技术内幕》

    RocketMQ版本:4.9.3

  • 相关阅读:
    《算法导论》15.2 矩阵链乘法(含有C++代码)
    【CV】Reg2Net:一种用于计算机视觉任务的多尺度骨干架构
    JS利用循环解决的一些问题
    结构冲突-架构真题(三十三)
    transformers架构实现
    读书笔记:程序员的思维修炼:开发认知潜能的九堂课
    G. SlavicG‘s Favorite Problem(树的遍历DFS,BFS均可)
    python文件打包分发方法
    22. SAP ABAP OData 服务的 $count 和 $inlinecount 两个操作的区别
    BIT-6自定义类型和动态内存管理(11000字详解)
  • 原文地址:https://www.cnblogs.com/shanml/p/16513229.html