• RocketMQ源码分析(六)之Broker消息接收


    版本

    1. 基于rocketmq-all-4.3.1版本

    Broker处理消息流程

    1. RocketMQ客户端发送消息方法是MQClientAPIImpl#sendMessage,发送消息的请求命令是RequestCode.SEND_MESSAGE(10)

      public SendResult sendMessage(
          final String addr,
          final String brokerName,
          final Message msg,
          final SendMessageRequestHeader requestHeader,
          final long timeoutMillis,
          final CommunicationMode communicationMode,
          final SendCallback sendCallback,
          final TopicPublishInfo topicPublishInfo,
          final MQClientInstance instance,
          final int retryTimesWhenSendFailed,
          final SendMessageContext context,
          final DefaultMQProducerImpl producer
      ) throws RemotingException, MQBrokerException, InterruptedException {
          long beginStartTime = System.currentTimeMillis();
          RemotingCommand request = null;
          if (sendSmartMsg || msg instanceof MessageBatch) {
              SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
              request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
          } else {
              request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
          }
          ...省略...
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
    2. 在Broker端源码可以找到注册RequestCode.SEND_MESSAGE命令的地方是BrokerController#registerProcessor。对应的处理类是SendMessageProcessor

      public void registerProcessor() {
         SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
         sendProcessor.registerSendMessageHook(sendMessageHookList);
         sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
      	//可以看到处理类是SendMessageProcessor
         this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        ...省略...
      }  
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    SendMessageProcessor

    1. SendMessageProcessor#processRequest是客户端发送消息的处理方法(单个消息和批量消息),此方法是一个模板方法,增加钩子处理函数,批量消息和单条消息处理逻辑。SendMessageProcessor#processRequest整体的流程如下 在这里插入图片描述

    2. 单条消息和批量消息都是调用AbstractSendMessageProcessor#msgCheck进行主要参数检查。批量消息不支持私信队列,因为只有消费失败时Consumer才会发送单条消息到私信队列,并不会发送批量消息进去死信队列,所以不存在重试Topic

    3. AbstractSendMessageProcessor#msgCheck的执行逻辑

      • 检查Broker是否有写权限
      • 检查topic是否可以进行消息发送,主要针对默认主题,默认主题不能发送消息,仅供路由查找
      • 如果Topic不存在,则创建Topic。在NameServer端存储Topic的配置信息,默认路径为${ROCKET_HOME}/store/config/topic.json
  • 相关阅读:
    十大热门骨传导蓝牙耳机排行榜,精选最佳的五款骨传导蓝牙耳机
    YOLOv7训练自己的数据集
    文件加密软件哪个好丨2023年最值得收藏的6款文件加密软件
    关于计算机找不到d3dx9_43.dll,无法继续执行代码修复方法
    【论文阅读】CVPR2021: MP3: A Unified Model to Map, Perceive, Predict and Plan
    Linux 常用systemctl service 脚本
    PMP 80个输入输出总结
    基于微信小程序的电影院买票选座系统
    KingbaseES V8R6集群管理运维案例之---repmgr standby switchover故障
    Python编程学习第一篇——制作一个小游戏休闲一下
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126394165