• RocketMQ创建topic流程解析


    RocketMQ使用topic来分隔各个业务逻辑的消息,发送消息前需要创建topic。

    topic的创建有两种方式,一种是broker支持在收发消息时自动创建,比如producer发过来的消息带了一个不存在的topic,如果broker设置成可自动创建的话,会自动尝试创建topic。
    另外一种就是通过管理接口创建,这种方式生产环境用的更多一些,因为可以由管理员来统一管理topic。

    客户端创建topic

    RocketMQ提供了管理接口MQAdmin来支持用户的后台管理需求,比如topic创建,消息查询等。默认实现方法是MQAdminImpl.createTopic()

    1. public void createTopic(String key, String newTopic, int queueNum, int topicSysFlag) throws MQClientException {
    2. try {
    3. //1、一般使用defaultTopic获取已经存在的broker data,所有的broker默认都支持defaultTopic
    4. TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(key, timeoutMillis);
    5. List brokerDataList = topicRouteData.getBrokerDatas();
    6. if (brokerDataList != null && !brokerDataList.isEmpty()) {
    7. Collections.sort(brokerDataList);
    8. boolean createOKAtLeastOnce = false;
    9. MQClientException exception = null;
    10. StringBuilder orderTopicString = new StringBuilder(); //没用到
    11. //2、轮询所有broker,在master上创建topic,中间有一个broker失败,则中止创建
    12. for (BrokerData brokerData : brokerDataList) {
    13. String addr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
    14. if (addr != null) {
    15. TopicConfig topicConfig = new TopicConfig(newTopic);
    16. //3、设置queue的数量
    17. topicConfig.setReadQueueNums(queueNum);
    18. topicConfig.setWriteQueueNums(queueNum);
    19. //4、设置topic的属性,比如可读、可写
    20. topicConfig.setTopicSysFlag(topicSysFlag);
    21. boolean createOK = false;
    22. for (int i = 0; i < 5; i++) {//重试4次
    23. try {
    24. this.mQClientFactory.getMQClientAPIImpl().createTopic(addr, key, topicConfig, timeoutMillis);
    25. createOK = true;
    26. createOKAtLeastOnce = true;
    27. break;
    28. } catch (Exception e) {
    29. if (4 == i) {
    30. exception = new MQClientException("create topic to broker exception", e);
    31. }
    32. }
    33. }
    34. if (createOK) {
    35. orderTopicString.append(brokerData.getBrokerName());
    36. orderTopicString.append(":");
    37. orderTopicString.append(queueNum);
    38. orderTopicString.append(";");
    39. }
    40. }
    41. }
    42. if (exception != null && !createOKAtLeastOnce) {
    43. throw exception;
    44. }
    45. } else {
    46. throw new MQClientException("Not found broker, maybe key is wrong", null);
    47. }
    48. } catch (Exception e) {
    49. throw new MQClientException("create new topic failed", e);
    50. }
    51. }

    MQAdminImpl.createTopic中的参数如下:

    key:这个参数是系统已经存在的一个topic的名称,新建的topic会跟它在相同的broker上创建
    newTopic:新建的topic的唯一标识
    queueNum:指定topic中queue的数量
    topicSysFlag:topic的标记位设置,没有特殊要求就填0就可以了。可选值在TopicSysFlag中定义

    函数MQAdminImpl.createTopic()的流程如下:

    • 第1步,根据提供的key代表的topic去获取broker的路由,如果想在所有broker创建,一般使用DefaultTopic,因为这个topic是在所有broker上都存在的。
    • 第2步,轮询所有的broker,在master上创建topic,中间有一个broker失败,则中止创建,返回失败。因为master和slave的配置数据也会自动同步,所以只需要在master上创建。
    • 第3,4步,设置参数
    • 第5步,调用MQClientAPIImpl接口创建,失败会重试4次。
    MQClientAPIImpl#createTopic
    1. public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
    2. final long timeoutMillis)
    3. throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    4. CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
    5. requestHeader.setTopic(topicConfig.getTopicName());
    6. requestHeader.setDefaultTopic(defaultTopic);
    7. requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
    8. requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
    9. //设置topic的权限,可读,可写
    10. requestHeader.setPerm(topicConfig.getPerm());
    11. //设置topic支持的消息过滤类型
    12. requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
    13. requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
    14. //设置是否是顺序消息topic
    15. requestHeader.setOrder(topicConfig.isOrder());
    16. RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
    17. RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
    18. request, timeoutMillis);
    19. assert response != null;
    20. switch (response.getCode()) {
    21. case ResponseCode.SUCCESS: {
    22. return;
    23. }
    24. default:
    25. break;
    26. }
    27. throw new MQClientException(response.getCode(), response.getRemark());
    28. }

    函数createTopic就是设置好topic的参数然后发送RequestCode.UPDATE_AND_CREATE_TOPIC命令给Broker。后续的处理逻辑主要在Broker中。

    Broker创建topic

    Broker在接收到RequestCode.UPDATE_AND_CREATE_TOPIC命令后,进入函数AdminBrokerProcessor#updateAndCreateTopic

    1. private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
    2. RemotingCommand request) throws RemotingCommandException {
    3. final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    4. final CreateTopicRequestHeader requestHeader =
    5. (CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
    6. log.info("updateAndCreateTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
    7. //1、判断topicName的合法性,不能和clusterName同名
    8. if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
    9. String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
    10. log.warn(errorMsg);
    11. response.setCode(ResponseCode.SYSTEM_ERROR);
    12. response.setRemark(errorMsg);
    13. return response;
    14. }
    15. try {//2、先回复客户端创建成功,后更新broker缓存
    16. response.setCode(ResponseCode.SUCCESS);
    17. response.setOpaque(request.getOpaque());
    18. response.markResponseType();
    19. response.setRemark(null);
    20. ctx.writeAndFlush(response);
    21. } catch (Exception e) {
    22. log.error("Failed to produce a proper response", e);
    23. }
    24. TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
    25. topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
    26. topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
    27. topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
    28. topicConfig.setPerm(requestHeader.getPerm());
    29. topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
    30. //3、更新TopicConfigManager中的topic配置信息。不存在则创建,存在则更新,并且持久化到文件中
    31. this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
    32. //4、broker将topic信息同步到nameserv
    33. this.brokerController.registerIncrementBrokerData(topicConfig,this.brokerController.getTopicConfigManager().getDataVersion());
    34. return null;
    35. }

    broker接收到RequestCode.UPDATE_AND_CREATE_TOPIC命令后,代码流程如下:

         1、判断topicName的合法性,不能和clusterName同名。

         2、先回复客户端创建成功,后更新broker缓存。

         3、更新TopicConfigManager中的topic配置信息。不存在则创建,存在则更新,并且持久化到文件中。

         4、broker将topic信息同步到nameserv。

    在将topic保存后,broker会将新增的topic同步给NameServer,同步的过程跟broker注册是一样的。这样NameServer中就记录了topic的路由信息,后续发送消息的时候,客户端就可以从Broker中获取到Topic的路由信息。

  • 相关阅读:
    vue3+element Plus实现弹框的拖拽、可点击底层页面功能
    java毕业设计软件基于SpringBoot在线电影订票|影院购票系统
    使用 MAUI 进行数据可视化:与 图表控件LightningChart JS 的兼容性项目模板
    ChatGPT成知名度最高生成式AI产品,使用频率却不高
    Could not run ‘aten::slow_conv3d_forward‘ with arguments from the ‘CUDA‘ bac
    C-结构体
    【DC综合】DC工具 report_timing 命令的一些选项
    LeetCode - 解题笔记 - 214 - Shortest Palindrome
    项目Git分支管理规范
    柯桥在PPT中如何制作翻书动画?
  • 原文地址:https://blog.csdn.net/bao2901203013/article/details/126199065