• 04 RocketMQ - Producer 源码分析


    Producer 启动流程

    整体脉络
    在这里插入图片描述

    启动入口

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    producer.setNamesrvAddr("192.0.0.1:9876");
    // 启动 producer
    producer.start();
    
    • 1
    • 2
    • 3
    • 4

    DefaultMQProducer::start()

    1. 组装 producerGroup
    2. 启动 producer
    3. 启动并发处理队列任务
    public void start() throws MQClientException {
            //1.组装 producerGroup
            this.setProducerGroup(withNamespace(this.producerGroup));
            //2.启动 producer
            this.defaultMQProducerImpl.start();
            //3.启动并发处理队列任务
            if (null != traceDispatcher) {
                try {
                    traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
                } catch (MQClientException e) {
                    log.warn("acl dispatcher start failed ", e);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    DefaultMQProducerImpl::start()

    1. 检查生产者组是否满足要求
    2. 更改当前 instanceName为进程ID
    3. 创建或获得MQ客户端实例(IP@instanceName@unitName 相同则为同一实例)
    4. 注册当前生产者到 MQClientInstance 中,方便后续调用网路请求
    5. 启动生产者
    6. 发送心跳到所有的 broker
    7. 定时清除过期请求
    public void start() throws MQClientException {
            this.start(true);
        }
        
    public void start(final boolean startFactory) throws MQClientException {
            switch (this.serviceState) {
                //第一次创建
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    //1.检查生产者组是否满足要求
                    this.checkConfig();
                    //2.更改当前instanceName为进程ID
                    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                        this.defaultMQProducer.changeInstanceNameToPID();
                    }
                    //3.创建或获得MQ客户端实例
                    // IP@instanceName@unitName 相同则为同一实例
                    this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQProducer, rpcHook);
                    //4.注册当前生产者到MQClientInstance中,方便后续调用网路请求
                    boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
                    this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
                    //5.启动生产者
                    if (startFactory) {
                        //最终还是调用MQClientInstance
                        mQClientFactory.start();
                    }
    
                    log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                        this.defaultMQProducer.isSendMessageWithVIPChannel());
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The producer service state not OK, maybe started once, "
                        + this.serviceState
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
    
            //6.发送心跳到所有的broker
            this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
            //7.定时清除过期请求
            this.timer.scheduleAtFixedRate(new TimerTask() {
                @Override
                public void run() {
                    try {
                        RequestFutureTable.scanExpiredRequest();
                    } catch (Throwable e) {
                        log.error("scan RequestFutureTable exception", e);
                    }
                }
            }, 1000 * 3, 1000);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    MQClientInstance::start()

    1. 如果 namesrvAddr 为空,通过 http 调用获取一下
    2. 开启消息发送服务 NRC
    3. 开启一系列定时任务(获取路由地址,修改路由信息,发送心跳等)
    4. 开启拉消息服务
    5. 开启负载均衡服务
    6. 再次调用 DefaultMQProducerImpl::start(false)
    public void start() throws MQClientException {
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        //1.如果 namesrvAddr 为空,通过http调用获取一下
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        //2.开启消息发送服务  NRC
                        this.mQClientAPIImpl.start();
                        //3.开启一系列定时任务(获取路由地址,修改路由信息,发送心跳等)
                        this.startScheduledTask();
                        //4.开启拉消息服务
                        this.pullMessageService.start();
                        //5.开启负载均衡服务
                        this.rebalanceService.start();
                        //6.再次调用 DefaultMQProducerImpl::start()??
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    startScheduledTask() - 开启定时任务
    1. 2分钟一次获取路由地址
    2. 30s一次修改路由信息
    3. 30s一次心跳
    4. 5s一次持久化消费进度
    5. 60s一次调整线程池
    private void startScheduledTask() {
            //1. 2分钟一次获取路由地址
            if (null == this.clientConfig.getNamesrvAddr()) {
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                    @Override
                    public void run() {
                        try {
                            MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                        } catch (Exception e) {
                            log.error("ScheduledTask fetchNameServerAddr exception", e);
                        }
                    }
                }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
            }
            //2. 30s一次修改路由信息
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                    } catch (Exception e) {
                        log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                    }
                }
            }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
            //3. 30s一次心跳
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.cleanOfflineBroker();
                        MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                    } catch (Exception e) {
                        log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                    }
                }
            }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
            //4. 5s一次持久化消费进度
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.persistAllConsumerOffset();
                    } catch (Exception e) {
                        log.error("ScheduledTask persistAllConsumerOffset exception", e);
                    }
                }
            }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
            //5. 60s一次调整线程池
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.adjustThreadPool();
                    } catch (Exception e) {
                        log.error("ScheduledTask adjustThreadPool exception", e);
                    }
                }
            }, 1, 1, TimeUnit.MINUTES);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    Producer 消息发送

    整体脉络
    在这里插入图片描述

    发送入口

    try {
                Message msg = new Message("TopicTest",
                        "TagA",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }catch (Exception e){
                e.printStackTrace();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    DefaultMQProducer::send()

    public SendResult send(
            Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            //校验消息
            Validators.checkMessage(msg, this);
            msg.setTopic(withNamespace(msg.getTopic()));
            return this.defaultMQProducerImpl.send(msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    DefaultMQProducerImpl::send()

    public SendResult send(
            Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return send(msg, this.defaultMQProducer.getSendMsgTimeout());
        }
    
    public SendResult send(Message msg,
            long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    DefaultMQProducerImpl::sendDefaultImpl()

    校验消息、查找路由、选择队列、发送消息、计算不可用时长

    private SendResult sendDefaultImpl(
            Message msg,
            final CommunicationMode communicationMode,
            final SendCallback sendCallback,
            final long timeout
        ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            this.makeSureStateOK();
            //1.校验消息
            Validators.checkMessage(msg, this.defaultMQProducer);
            final long invokeID = random.nextLong();
            long beginTimestampFirst = System.currentTimeMillis();
            long beginTimestampPrev = beginTimestampFirst;
            long endTimestamp = beginTimestampFirst;
            //2.查找路由
            TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
            if (topicPublishInfo != null && topicPublishInfo.ok()) {
                boolean callTimeout = false;
                MessageQueue mq = null;
                Exception exception = null;
                SendResult sendResult = null;
                //重试次数  2次 ,实际最大的发送3次  +1
                int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
                int times = 0;
                String[] brokersSent = new String[timesTotal];
                for (; times < timesTotal; times++) {
                    String lastBrokerName = null == mq ? null : mq.getBrokerName();
                    //3.选择队列
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                    if (mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mq.getBrokerName();
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            if (times > 0) {
                                //Reset topic with namespace during resend.
                                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                            }
                            long costTime = beginTimestampPrev - beginTimestampFirst;
                            if (timeout < costTime) {
                                callTimeout = true;
                                break;
                            }
                            //4.发送消息
                            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                            endTimestamp = System.currentTimeMillis();
                            //5.成功或失败都计算不可用时长(故障延迟用)
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            //todo 发送正常,使根据发送时长计算broker不可用时长(duration)
                            switch (communicationMode) {
                                case ASYNC:
                                    return null;
                                case ONEWAY:
                                    return null;
                                case SYNC:
                                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                            continue;
                                        }
                                    }
    
                                    return sendResult;
                                default:
                                    break;
                            }
                        } catch (RemotingException e) {
                            endTimestamp = System.currentTimeMillis();
                            //todo 发送异常,默认当前发送时长为30000L
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQClientException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            continue;
                        } catch (MQBrokerException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                            log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
                            exception = e;
                            switch (e.getResponseCode()) {
                                case ResponseCode.TOPIC_NOT_EXIST:
                                case ResponseCode.SERVICE_NOT_AVAILABLE:
                                case ResponseCode.SYSTEM_ERROR:
                                case ResponseCode.NO_PERMISSION:
                                case ResponseCode.NO_BUYER_ID:
                                case ResponseCode.NOT_IN_CURRENT_UNIT:
                                    continue;
                                default:
                                    if (sendResult != null) {
                                        return sendResult;
                                    }
    
                                    throw e;
                            }
                        } catch (InterruptedException e) {
                            endTimestamp = System.currentTimeMillis();
                            this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                            log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                            log.warn(msg.toString());
    
                            log.warn("sendKernelImpl exception", e);
                            log.warn(msg.toString());
                            throw e;
                        }
                    } else {
                        break;
                    }
                }
    
                if (sendResult != null) {
                    return sendResult;
                }
    
                String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                    times,
                    System.currentTimeMillis() - beginTimestampFirst,
                    msg.getTopic(),
                    Arrays.toString(brokersSent));
    
                info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
    
                MQClientException mqClientException = new MQClientException(info, exception);
                if (callTimeout) {
                    throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                }
    
                if (exception instanceof MQBrokerException) {
                    mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
                } else if (exception instanceof RemotingConnectException) {
                    mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
                } else if (exception instanceof RemotingTimeoutException) {
                    mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
                } else if (exception instanceof MQClientException) {
                    mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
                }
    
                throw mqClientException;
            }
    
            validateNameServerSetting();
    
            throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
                null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    checkMessage() - 校验消息
    // 校验消息
    public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
            throws MQClientException {
            //判断是否为空
            if (null == msg) {
                throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
            }
            // 校验主题
            Validators.checkTopic(msg.getTopic());
            Validators.isNotAllowedSendTopic(msg.getTopic());
    
            // 校验消息体
            if (null == msg.getBody()) {
                throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
            }
    
            if (0 == msg.getBody().length) {
                throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
            }
            // 不能超过4M
            if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
                throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
                    "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    tryToFindTopicPublishInfo() - 查找路由
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
            //从缓存中获得主题的路由信息
            TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
            //路由信息为空,则从NameServer获取路由
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
            }
    
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                return topicPublishInfo;
            } else {
                //如果未找到当前主题的路由信息,则用默认主题继续查找
                this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
                topicPublishInfo = this.topicPublishInfoTable.get(topic);
                return topicPublishInfo;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    TopicPublishInfo - 路由信息
    在这里插入图片描述
    在这里插入图片描述

    selectOneMessageQueue() - 选择队列

    MQFaultStrategy::selectOneMessageQueue(),选择队列时存在两种策略:

    • 轮询(默认)
      消息第一次发送时,用private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); 取模队列数,得到发送的队列,小心重试发送时,规避上次发送失败的broker队列进行发送
    • 故障延迟
      发送消息时计算服务不可用时长,当选择队列时会跳过“不可用”的服务
      在这里插入图片描述
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            //故障延迟机制
            if (this.sendLatencyFaultEnable) {
                try {
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();
                    //对消息队列轮询获取一个队列
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        //基于index和队列数量取余,确定位置
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                            return mq;
                    }
                    //如果预测的所有broker都不可用,则随机选择一个broker,随机选择该Broker下一个队列进行发送
                    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                    //获得Broker的写队列集合
                    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                    if (writeQueueNums > 0) {
                        final MessageQueue mq = tpInfo.selectOneMessageQueue();
                        if (notBestBroker != null) {
                            //获得一个队列,指定broker和队列ID并返回
                            mq.setBrokerName(notBestBroker);
                            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                        }
                        return mq;
                    } else {
                        latencyFaultTolerance.remove(notBestBroker);
                    }
                } catch (Exception e) {
                    log.error("Error occurred when selecting message queue", e);
                }
    
                return tpInfo.selectOneMessageQueue();
            }
            //轮询
            return tpInfo.selectOneMessageQueue(lastBrokerName);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    TopicPublishInfo::selectOneMessageQueue()

    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
            //todo lastBrokerName 是每次发送一条消息时保存的临时变量
            //第一次选择队列(非重试)
            if (lastBrokerName == null) {
                return selectOneMessageQueue();
            } else {
                //重试发送时的消息发送
                for (int i = 0; i < this.messageQueueList.size(); i++) {
                    int index = this.sendWhichQueue.getAndIncrement();
                    //遍历消息队列集合
                    int pos = Math.abs(index) % this.messageQueueList.size();
                    if (pos < 0)
                        pos = 0;
                    //重试中,规避上次Broker队列
                    MessageQueue mq = this.messageQueueList.get(pos);
                    if (!mq.getBrokerName().equals(lastBrokerName)) {
                        return mq;
                    }
                }
                //如果没有集群,或者没得选,还是按照第一次选择队列
                return selectOneMessageQueue();
            }
        }
    
    
    //默认选择选择队列
        public MessageQueue selectOneMessageQueue() {
            //sendWhichQueue自增(这里使用的 ThreadLocal)
            int index = this.sendWhichQueue.getAndIncrement();
            //对队列大小取模
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            //返回对应的队列
            return this.messageQueueList.get(pos);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
  • 相关阅读:
    内网渗透-常用反弹shell方法总结
    CentOS7.9.2009离线安装yum命令
    Go 中的方法
    PHY6222系统级SOC蓝牙芯片低功耗高性能蓝牙MESH组网智能家居
    mac: docker安装及其Command not found: docker
    redis篇
    flink1.18.0 sql-client报错
    AIGC入门 - LLM 信息概览
    GUI编程--PyQt5--QPushButton
    【DR_CAN-MPC学习笔记】1.最优化控制和MPC基本概念
  • 原文地址:https://blog.csdn.net/qq_33512765/article/details/126744886