• 不科学,RocketMQ生产者在一个应用服务竟然不能向多个NameServer发送消息


    前言

    目前有两套RocketMQ集群,集群A包含topic名称为cluster_A_topic,集群B包含topic名称为cluster_B_topic,在应用服务OrderApp上通过RocketMQ Client创建两个DefaultMQProducer实例发送消息给集群A和集群B,架构图如下:

    根据上述架构图,我们给出的示例代码如下:

    1. // 创建第一个DefaultMQProducer
    2. DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
    3.    // 设置nameServer地址
    4. producer1.setNamesrvAddr("192.168.2.230:9876");
    5. try {
    6. producer1.start();
    7.      // 发送消息
    8. SendResult result1 = producer1.send(new Message("cluster_A_topic", "ping".getBytes(StandardCharsets.UTF_8)));
    9. switch (result1.getSendStatus()) {
    10. case SEND_OK:
    11. System.out.println("cluster_A_topic 发送成功!");
    12. break;
    13. case FLUSH_DISK_TIMEOUT:
    14. System.out.println("cluster_A_topic 持久化失败!");
    15. break;
    16. case FLUSH_SLAVE_TIMEOUT:
    17. System.out.println("cluster_A_topic 同步slave失败!");
    18. break;
    19. case SLAVE_NOT_AVAILABLE:
    20. System.out.println("cluster_A_topic 副本不可用!");
    21. }
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. }
    25.    // 创建第二个DefaultMQProducer
    26. DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_2");
    27.    // 设置nameServer地址
    28. producer2.setNamesrvAddr("192.168.2.231:9876");
    29. try {
    30. producer2.start();
    31.      // 发送消息
    32. SendResult result2 = producer2.send(new Message("cluster_B_topic", "ping".getBytes(StandardCharsets.UTF_8)));
    33. switch (result2.getSendStatus()) {
    34. case SEND_OK:
    35. System.out.println("cluster_B_topic 发送成功!");
    36. break;
    37. case FLUSH_DISK_TIMEOUT:
    38. System.out.println("cluster_B_topic 持久化失败!");
    39. break;
    40. case FLUSH_SLAVE_TIMEOUT:
    41. System.out.println("cluster_B_topic 同步slave失败!");
    42. break;
    43. case SLAVE_NOT_AVAILABLE:
    44. System.out.println("cluster_B_topic 副本不可用!");
    45. }
    46. return "ok";
    47. } catch (Exception e) {
    48. e.printStackTrace();
    49. } finally {
    50. producer1.shutdown();
    51. producer2.shutdown();
    52. }
    53. 复制代码

    结果竟然报错了,报错内容时cluster_B_topic不存在:

    经过不断的测试,发现只有放在最前面启动的DefaultMQProducer会生效,后面启动的DefaultMQProducer发送消息就报错说对应的topic不存在,而且报错的broker竟然是前面启动的DefaultMQProducer对应的broker。这就不科学了,难道RocketMQ不允许在一个应用上创建多个生产者?

    问题定位

    首先说明一下,当前使用的RocketMQ Client版本是4.8.0。为了确定是哪儿出了问题,不得不对源码来一波探索[哭泣脸😢]。

    我们都知道生产者是发送消息给Broker的,获取Broker信息是通过连接NameServer获取的。既然报错的Broker和目标Broker竟然不对应,肯定是后面启动的生产者获取的Broker不对。有了最基本的判断,我们先从DefaultMQProducer#start()入手,最终我们定位到这样一段代码DefaultMQProducerImpl#start(final boolean startFactory)

    1. public void start(final boolean startFactory) throws MQClientException {
    2.        switch (this.serviceState) {
    3.            case CREATE_JUST:
    4.                this.serviceState = ServiceState.START_FAILED;
    5.                this.checkConfig();
    6. // 如果生产者group名称不是`CLIENT_INNER_PRODUCER`,那么修改InstanceName值
    7.                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
    8.                    this.defaultMQProducer.changeInstanceNameToPID();
    9.               }
    10.            // 创建MQClientInstance实例
    11.                this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
    12.            // 注册生产者实例到MQClientInstance中
    13.                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
    14.                if (!registerOK) {
    15.                    this.serviceState = ServiceState.CREATE_JUST;
    16.                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
    17.                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
    18.                        null);
    19.               }
    20.            // 添加TBW102对应的topic信息,broker设置autoCreateTopicEnable = true才起作用
    21.            this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
    22.                if (startFactory) {
    23.                    // 启动刚刚创建的MQClientInstance实例
    24.                    mQClientFactory.start();
    25.               }
    26.                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
    27.                    this.defaultMQProducer.isSendMessageWithVIPChannel());
    28.                // 修改服务状态为RUNNING
    29.                this.serviceState = ServiceState.RUNNING;
    30.                break;
    31.            case RUNNING:
    32.            case START_FAILED:
    33.            case SHUTDOWN_ALREADY:
    34.                throw new MQClientException("The producer service state not OK, maybe started once, "
    35.                    + this.serviceState
    36.                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
    37.                    null);
    38.            default:
    39.                break;
    40.       }
    41. 复制代码

    上面的代码主要是创建了MQClientInstance实例,并且通过start()方法启动。

    通过针对这两段代码的debug,我们发现创建的两个DefaultMQProducer对象是共用了一个MQClientInstance实例,并且所有针对NameServerBroker的远程操作全部是通过MQClientInstance实例来做的。比如发送消息的时候需要找到对应的Broker下的消息队列:

    1. private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    2.        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    3.        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    4.            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    5.            // 从NameServer更新topic路由
    6.            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    7.            topicPublishInfo = this.topicPublishInfoTable.get(topic);
    8.       }
    9.        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    10.            return topicPublishInfo;
    11.       } else {
    12.            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    13.            topicPublishInfo = this.topicPublishInfoTable.get(topic);
    14.            return topicPublishInfo;
    15.       }
    16.   }
    17. 复制代码

    最终我们发现两个DefaultMQProducer对象都是去同一个NameServer下获取对应的topic信息,这下问题就定位到了:因为使用了同一个MQClientInstance实例导致不同的DefaultMQProducer去访问了同一个NameServer,同一个集群需要同时接收两个topic的消息,也就出现了前面的报错说topic不存在的情况。

    如何解决

    我们来看看MQClientInstance实例是如何保证唯一性的:

    1. public MQClientInstance getOrCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
    2.        // 生成clientID
    3.        String clientId = clientConfig.buildMQClientId();
    4.        // 从缓存中获取MQClientInstance
    5.        MQClientInstance instance = this.factoryTable.get(clientId);
    6.        if (null == instance) {
    7.            // 没有缓存的话就创建一个MQClientInstance
    8.            instance =
    9.                new MQClientInstance(clientConfig.cloneClientConfig(),
    10.                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
    11.            // 新创建出来的再放进缓存
    12.            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
    13.            if (prev != null) {
    14.                instance = prev;
    15.                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
    16.           } else {
    17.                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
    18.           }
    19.       }
    20.        // 返回MQClientInstance实例
    21.        return instance;
    22.   }
    23. 复制代码

    我们之所以拿到的MQClientInstance实例是同一个,是因为在同一个服务下创建的clientId相同:

    1.    public String buildMQClientId() {
    2.        StringBuilder sb = new StringBuilder();
    3.        sb.append(this.getClientIP());
    4.        sb.append("@");
    5.        sb.append(this.getInstanceName());
    6.        if (!UtilAll.isBlank(this.unitName)) {
    7.            sb.append("@");
    8.            sb.append(this.unitName);
    9.       }
    10.        return sb.toString();
    11.   }
    12. 复制代码

    两个clientId都是192.168.18.173@14933,为了防止clientId相同,我们可以在创建DefaultMQProducer实例是加上unitName值,保证两个unitName值不同来避免共享同一个MQClientInstance

    1. DefaultMQProducer producer1 = new DefaultMQProducer("producer_group_1");
    2. producer1.setNamesrvAddr("192.168.2.230:9876");
    3. producer1.setUnitName("producer1");
    4. producer1.start();
    5. DefaultMQProducer producer2 = new DefaultMQProducer("producer_group_1");
    6. producer2.setNamesrvAddr("192.168.2.231:9876");
    7. producer2.setUnitName("producer2");
    8. producer2.start();
    9. 复制代码

    通过上述代码修改后,两个消息都发送成功了。

    另一个办法就是升级RocketMQ Client4.9.0,我们来看一下RocketMQ Client 4.9.0是怎么解决这个问题的:

    1.    public void changeInstanceNameToPID() {
    2.        if (this.instanceName.equals("DEFAULT")) {
    3.            this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    4.       }
    5.   }
    6. 复制代码

    RocketMQ Client 4.9.0在后面补充了一个纳秒值,之前的代码是这样的:

    1.    public void changeInstanceNameToPID() {
    2.        if (this.instanceName.equals("DEFAULT")) {
    3.            this.instanceName = String.valueOf(UtilAll.getPid());
    4.       }
    5.   }
    6. 复制代码

    也就是说,在新的版本中,一个应用服务内创建多个DefaultMQProducer就会有多个MQClientInstance实例对应,不会再出现我们前面的报错。

  • 相关阅读:
    ClickHouse学习笔记之数据一致性
    Python GDAL+numpy遥感图像处理过程中背景像元处理方法
    Android---深入理解AQS和CAS原理
    什么是JMM
    苹果放出快捷指令专题介绍页面,大大提高了 Mac 使用效率
    P1104 生日
    2439. 最小化数组中的最大值-暴力解法+动态规划法
    奥特曼autMan机器人安装,开启插件市场+对接QQ、微信、公众号教程
    《代码大全2》第14章 组织直线型代码
    常见的Transforms(一)Compose & Normalize
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/127933842