RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建。可以通过设置broker的配置文件来禁用或者允许自动创建。默认是开启的允许自动创建
autoCreateTopicEnable=true/false
下面会结合源码来深度分析一下自动创建和手动创建的过程。
默认情况下,topic不用手动创建,当producer进行消息发送时,会从nameserver拉取topic的路由信息,如果topic的路由信息不存在,那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic,这定义在org.apache.rocketmq.common.MixAll类中
- // Will be created at broker when isAutoCreateTopicEnable
- public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
- 复制代码
自动创建开关是下BrokerConfig类中有一个私有变量:
- @ImportantField
- private boolean autoCreateTopicEnable = true;
- 复制代码
这变量可以通过配置文件配置来进行修改,代码中的默认值为true,所以在默认的情况下Rocket MQ是会自动创建Topic的。
在Broker启动,会调用TopicConfigManager的构造方法,在构造方法中定义了一系列RocketMQ系统内置的一些系统Topic(这里只关注一下TBW102):
- {
- // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
- if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
- String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
- TopicConfig topicConfig = new TopicConfig(topic);
- this.systemTopicList.add(topic);
- topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
- .getDefaultTopicQueueNums()); //8
- topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
- .getDefaultTopicQueueNums()); //8
- int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
- topicConfig.setPerm(perm);
- this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
- }
- }
- 复制代码
这里有 this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()
这样一段代码,在开启允许自动创建的时候,会把当前Topic的信息存入topicConfigTable变量中。然后通过发送定期发送心跳包把Topic和Broker的信息发送到NameServer的RouteInfoManager中进行保存。在BrokerController中定义了这样的一个定时任务来执行这个心跳包的发送:
- this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- BrokerController.this.registerBrokerAll(true, false, brokerCon