• RocketMQ消息生产者是如何选择Broker的


    前言

    RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢?

    从NameServer查询Topic信息

    通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()这个方法,并且我们找到了最关键的Topic信息:

    1. TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    2. 复制代码

    这个方法就是通过topicNameServer拉出对应的Broker信息:

    1.    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    2.        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    3.        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
    4.            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
    5.            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    6.            topicPublishInfo = this.topicPublishInfoTable.get(topic);
    7.       }
    8.        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
    9.            return topicPublishInfo;
    10.       } else {
    11.            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
    12.            topicPublishInfo = this.topicPublishInfoTable.get(topic);
    13.            return topicPublishInfo;
    14.       }
    15.   }
    16. 复制代码

    1.一开始的话,是从当前缓存中找Topic信息,第一次肯定是找不到的;

    2.找不到Topic信息,那么就调用updateTopicRouteInfoFromNameServer(topic)NameServer拉对应的信息,如果拉到了就更新到缓存中;

    3.如果依然找不到Topic信息,说明没有任何Broker上面是有这个Topic的;但是我们还要拉开启了自动创建Topic配置的Broker信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)实现;

    生产者客户端会从两个地方获取Broker信息,第一个就是从内存缓存中获取,第二个就是从NameServer中获取。从NameServer中分两次获取,一次是获取存在的Topic对应的Broker信息,第二次是获取还没有创建出来的Topic对应的Broker信息;

    如何选择Broker

    当客户端拿到了Topic对应的Broker信息后,它是如何选择目标Broker的呢?继续向下看,我们找到了关键代码:

    1. int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
    2.            int times = 0;
    3.            String[] brokersSent = new String[timesTotal];
    4.            for (; times < timesTotal; times++) {
    5.                String lastBrokerName = null == mq ? null : mq.getBrokerName();
    6.                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    7.                if (mqSelected != null) {
    8.                    mq = mqSelected;
    9.                    brokersSent[times] = mq.getBrokerName();
    10.                 ......
    11. 复制代码

    1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;

    2.选择Broker的关键代码就在selectOneMessageQueue()方法中,通过前面拿到的topicPublishInfo作为参数,lastBrokerName作为额外的考虑参数;

    追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()中:

    1.    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    2.        if (this.sendLatencyFaultEnable) {
    3.            try {
    4.                int index = tpInfo.getSendWhichQueue().incrementAndGet();
    5.                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    6.                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    7.                    if (pos < 0)
    8.                        pos = 0;
    9.                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    10.                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
    11.                        return mq;
    12.               }
    13.                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    14.                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    15.                if (writeQueueNums > 0) {
    16.                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
    17.                    if (notBestBroker != null) {
    18.                        mq.setBrokerName(notBestBroker);
    19.                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
    20.                   }
    21.                    return mq;
    22.               } else {
    23.                    latencyFaultTolerance.remove(notBestBroker);
    24.               }
    25.           } catch (Exception e) {
    26.                log.error("Error occurred when selecting message queue", e);
    27.           }
    28.            return tpInfo.selectOneMessageQueue();
    29.       }
    30.        return tpInfo.selectOneMessageQueue(lastBrokerName);
    31.   }
    32. 复制代码

    1.如果开启了延迟故障规避,那么执行规避策略;

    • 1.1:轮询找一个Broker,该Broker要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);
    • 1.2:如果所有的Broker都没有度过规避期,那么从比较好的那一部分Broker里面找一个出来;
    • 1.3:如果依然没有找到合适的Broker,那么就随机选一个Broker

    2.否则就随机选一个Broker

    下面我们来看一下随机发送的策略是怎么实现的:

    1.    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    2.        if (lastBrokerName == null) {
    3.            return selectOneMessageQueue();
    4.       } else {
    5.            for (int i = 0; i < this.messageQueueList.size(); i++) {
    6.                int index = this.sendWhichQueue.incrementAndGet();
    7.                int pos = Math.abs(index) % this.messageQueueList.size();
    8.                if (pos < 0)
    9.                    pos = 0;
    10.                MessageQueue mq = this.messageQueueList.get(pos);
    11.                if (!mq.getBrokerName().equals(lastBrokerName)) {
    12.                    return mq;
    13.               }
    14.           }
    15.            return selectOneMessageQueue();
    16.       }
    17.   }
    18.    public MessageQueue selectOneMessageQueue() {
    19.        int index = this.sendWhichQueue.incrementAndGet();
    20.        int pos = Math.abs(index) % this.messageQueueList.size();
    21.        if (pos < 0)
    22.            pos = 0;
    23.        return this.messageQueueList.get(pos);
    24.   }
    25. 复制代码

    1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个Broker,其实就是轮询方式;

    2.如果不是第一次发送消息,那么会尽可能避开上一次的Broker服务,也是为了让Broker服务负载均衡;

    3.如果没有避开上一次的Broker,那么再向后找另一个Broker;除非只有一个Broker服务,否则会尽可能避开上次发送的Broker

    小结

    通过源码分析,我们已经知道了生产者是如何选择目标Broker的了:

    1.第一次发消息,通过轮询的方式选择Broker

    2.后续发消息会规避上次的Broker,同样采用轮询的方式选择Broker

    3.在消息发送过程中,存在一个Broker规避列表,用户可以通过setSendLatencyFaultEnable(true)开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker,如果所有的Broker都在规避列表中,那么会选择一个相对比较好的Broker来用;

  • 相关阅读:
    低代码合作开发的3个竞争优势是什么?
    Java中线程的状态
    python GIL全局锁、描述器
    股票价格预测 | Python实现基于LSTM与Transfomer的股票预测模型(pytorch)
    小程序的自定义组件
    MIPS汇编语言学习-01-两数求和以及环境配置、如何运行
    vue3回退页面不刷新解决
    动态规划太难了?是你没有找对方法,四题带你搞懂动态规划!
    java通过路径返回流给前台
    群晖7.2版本安装CloudDriver2(套件)挂载alist(xiaoya)到本地
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/128033837