• RocketMQ源码解析-topic创建机制


    以下源码基于Rocket MQ 4.7.0

    1. RocketMQ Topic创建机制

    RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建。可以通过设置broker的配置文件来禁用或者允许自动创建。默认是开启的允许自动创建

    autoCreateTopicEnable=true/false

    下面会结合源码来深度分析一下自动创建和手动创建的过程。

    2. 自动Topic

    默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic,这定义在org.apache.rocketmq.common.MixAll类中

    1. // Will be created at broker when isAutoCreateTopicEnable
    2. public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
    3. 复制代码

    自动创建开关是下BrokerConfig类中有一个私有变量:

    1. @ImportantField
    2. private boolean autoCreateTopicEnable = true;
    3. 复制代码

    这变量可以通过配置文件配置来进行修改,代码中的默认值为true,所以在默认的情况下Rocket MQ是会自动创建Topic的。
    在Broker启动,会调用TopicConfigManager的构造方法,在构造方法中定义了一系列RocketMQ系统内置的一些系统Topic(这里只关注一下TBW102):

    1. {
    2. // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
    3. if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
    4. String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
    5. TopicConfig topicConfig = new TopicConfig(topic);
    6. this.systemTopicList.add(topic);
    7. topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
    8. .getDefaultTopicQueueNums()); //8
    9. topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
    10. .getDefaultTopicQueueNums()); //8
    11. int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
    12. topicConfig.setPerm(perm);
    13. this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
    14. }
    15. }
    16. 复制代码

    这里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable() 这样一段代码,在开启允许自动创建的时候,会把当前Topic的信息存入topicConfigTable变量中。然后通过发送定期发送心跳包把Topic和Broker的信息发送到NameServer的RouteInfoManager中进行保存。在BrokerController中定义了这样的一个定时任务来执行这个心跳包的发送:

    1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    2. @Override
    3. public void run() {
    4. try {
    5. BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
    6. } catch (Throwable e) {
    7. log.error("registerBrokerAll Exception", e);
    8. }
    9. }
    10. }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
    11. 复制代码

    这里就说明了如何把每个Broker的系统自定义的Topic注册到NameServer。接下来看在发送过程中如何从NameServer获取Topic的路由信息: DefaultMQProducerImpl.sendDefaultImpl

    1. private SendResult sendDefaultImpl(
    2. Message msg,
    3. final CommunicationMode communicationMode,
    4. final SendCallback sendCallback,
    5. final long timeout
    6. ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    7. //省略代码
    8. //获取路由信息
    9. TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    10. }
    11. 复制代码

    通过DefaultMQProducerImpl.tryToFindTopicPublishInfo方法获取Topic的路由信息。

    1. private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    2. TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    3. //第一次从缓存中获取--肯定没有因为还没创建
    4. if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    5. this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    6. //从NameServer获取--也是没有,因为没有创建
    7. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    8. topicPublishInfo = this.topicPublishInfoTable.get(topic);
    9. }
    10. if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    11. return topicPublishInfo;
    12. } else {
    13. //第二次从这里获取
    14. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    15. topicPublishInfo = this.topicPublishInfoTable.get(topic);
    16. return topicPublishInfo;
    17. }
    18. }
    19. 复制代码

    下面来看一下 MQClientInstance.updateTopicRouteInfoFromNameServer 的方法:

    1. public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    2. DefaultMQProducer defaultMQProducer) {
    3. //省略代码
    4. if (isDefault && defaultMQProducer != null) {
    5. //使用默认的TBW102 Topic获取数据
    6. topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
    7. 1000 * 3);
    8. if (topicRouteData != null) {
    9. for (QueueData data : topicRouteData.getQueueDatas()) {
    10. int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
    11. data.setReadQueueNums(queueNums);
    12. data.setWriteQueueNums(queueNums);
    13. }
    14. }
    15. } else {
    16. //这是正常的
    17. topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
    18. }
    19. //省略代码
    20. }
    21. 复制代码

    如果isDefault=true并且defaultMQProducer不为空,从nameserver中获取默认路由信息,此时会获取所有已开启自动创建开关的broker的默认“TBW102”topic路由信息,并保存默认的topic消息队列数量。

    这里会比较一下配在在 DefaultMQProducer.defaultTopicQueueNums中的默认值和TBW102中的值哪个更小。

    1. if (topicRouteData != null) {
    2. TopicRouteData old = this.topicRouteTable.get(topic);
    3. boolean changed = topicRouteDataIsChange(old, topicRouteData);
    4. if (!changed) {
    5. changed = this.isNeedUpdateTopicRouteInfo(topic);
    6. } else {
    7. log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    8. }
    9. }
    10. 复制代码

    判断获取默认的是否存在,如果存在把当前的Topic的信息更新。也就是把TBW102 Topic的数据更新为自动创建的数据。

    1. if (changed) {
    2. TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
    3. for (BrokerData bd : topicRouteData.getBrokerDatas()) {
    4. this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
    5. }
    6. // Update Pub info
    7. {
    8. TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
    9. publishInfo.setHaveTopicRouterInfo(true);
    10. Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
    11. while (it.hasNext()) {
    12. Entry<String, MQProducerInner> entry = it.next();
    13. MQProducerInner impl = entry.getValue();
    14. if (impl != null) {
    15. impl.updateTopicPublishInfo(topic, publishInfo);
    16. }
    17. }
    18. }
    19. // Update sub info
    20. {
    21. Set<MessageQueue> subscribeInfo = topicRouteData2TopicSubscribeInfo(topic, topicRouteData);
    22. Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    23. while (it.hasNext()) {
    24. Entry<String, MQConsumerInner> entry = it.next();
    25. MQConsumerInner impl = entry.getValue();
    26. if (impl != null) {
    27. impl.updateTopicSubscribeInfo(topic, subscribeInfo);
    28. }
    29. }
    30. }
    31. log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
    32. this.topicRouteTable.put(topic, cloneTopicRouteData);
    33. return true;
    34. }
    35. 复制代码

    更新本地的缓存。这样TBW102 Topic的负载和一些默认的路由信息就会被自己创建的Topic使用。这里就是整个自动创建的过程.
    总结一下就是:通过使用系统内部的一个TBW102的Topic的配置来自动创建当前用户的要创建的自定义Topic。

    3. 手动创建--预先创建

    手动创建也叫预先创建,就是在使用Topic之前就创建,可以通过命令行或者通过RocketMQ的管理界面创建Topic。

    通过界面控制台创建

    项目地址: github.com/apache/rock…

    TopicController主要负责Topic的管理

    1. @RequestMapping(value = "/createOrUpdate.do", method = { RequestMethod.POST})
    2. @ResponseBody
    3. public Object topicCreateOrUpdateRequest(@RequestBody TopicConfigInfo topicCreateOrUpdateRequest) {
    4. Preconditions.checkArgument(CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getBrokerNameList()) || CollectionUtils.isNotEmpty(topicCreateOrUpdateRequest.getClusterNameList()),
    5. "clusterName or brokerName can not be all blank");
    6. logger.info("op=look topicCreateOrUpdateRequest={}", JsonUtil.obj2String(topicCreateOrUpdateRequest));
    7. topicService.createOrUpdate(topicCreateOrUpdateRequest);
    8. return true;
    9. }
    10. 复制代码

    然后通过MQAdminExtImpl.createAndUpdateTopicConfig方法来创建:

    1. @Override
    2. public void createAndUpdateTopicConfig(String addr, TopicConfig config)
    3. throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    4. MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
    5. }
    6. 复制代码

    通过调用DefaultMQAdminExtImpl.createAndUpdateTopicConfig创建Topic

    1. @Override
    2. public void createAndUpdateTopicConfig(String addr, TopicConfig config) throws RemotingException, MQBrokerException,
    3. InterruptedException, MQClientException {
    4. this.mqClientInstance.getMQClientAPIImpl().createTopic(addr, this.defaultMQAdminExt.getCreateTopicKey(), config, timeoutMillis);
    5. }
    6. 复制代码

    最后通过MQClientAPIImpl.createTopic创建Topic

    1. public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
    2. final long timeoutMillis)
    3. throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    4. CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
    5. requestHeader.setTopic(topicConfig.getTopicName());
    6. requestHeader.setDefaultTopic(defaultTopic);
    7. requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
    8. requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
    9. requestHeader.setPerm(topicConfig.getPerm());
    10. requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
    11. requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
    12. requestHeader.setOrder(topicConfig.isOrder());
    13. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
    14. RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
    15. request, timeoutMillis);
    16. assert response != null;
    17. switch (response.getCode()) {
    18. case ResponseCode.SUCCESS: {
    19. return;
    20. }
    21. default:
    22. break;
    23. }
    24. throw new MQClientException(response.getCode(), response.getRemark());
    25. }


     

  • 相关阅读:
    Linux思维导图
    解析 Python requests 库 POST 请求中的参数顺序问题
    前端导出图片和各种文件
    JavaScript - async 和 await 修饰符的基本使用方法
    数据结构 专项练习
    days month 間隔
    多元统计分析 实验一、多元统计数据的图标表示法
    报错:npm ERR code EPERM
    商业模式及其 SubDAO 深入研究
    在 JavaScript 中循环遍历数组的多种方法
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/126437753