• rocketMQ是如何利用MQFaultStrategy规避延迟故障的?


    背景

    RocketMq集群中,queue分布在各个不同的broker服务器中时,当尝试向其中一个queue发送消息时,如果出现耗时过长或者发送失败的情况,RocketMQ则会尝试重试发送。不妨细想一下,同样的消息第一次发送失败或耗时过长,可能是网络波动或者相关broker停止导致,如果短时间再次重试极有可能还是同样的情况

    RocketMQ为我们提供了延迟故障自动切换queue的功能,并且会根据故障次数和失败等级来预判故障时间并自动恢复,该功能是选配,默认关闭,可以通过如下配置开启。

    DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
    producer.setSendLatencyFaultEnable(true);
    
    • 1
    • 2

    注:该功能只有在没有指定queue时生效

    源码解读

    我们定位到queue决策和重试相关的源码中org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        // 参数校验
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 获得topic发布的信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        // 这里.ok()是判断是否有可用的queue,只有当queue不为空时才能将消息投递出去
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 同步执行需要设置一个最大重试次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 选择投递的queue,会自动规避最近故障的queue
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            // 为了防止namespace状态发生变更,重试期间利用namespace重新解析topic名称
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        if (timeout < costTime) {
                            // 如果超时则break停止投递
                            callTimeout = true;
                            break;
                        }
    
                        // 开始投递消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        endTimestamp = System.currentTimeMillis();
                        // 更新发送超时记录,用于规避再次故障
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        // 失败则尝试投递其他broker
                                        continue;
                                    }
                                }
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQClientException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    } catch (MQBrokerException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
                            continue;
                        } else {
                            if (sendResult != null) {
                                return sendResult;
                            }
    
                            throw e;
                        }
                    } catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
    
                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }
    
            // 是否有响应数据,有则直接响应结果
            if (sendResult != null) {
                return sendResult;
            }
            // 下面就是异常类的包装和抛出操作
    
            String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
                times,
                System.currentTimeMillis() - beginTimestampFirst,
                msg.getTopic(),
                Arrays.toString(brokersSent));
    
            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
    
            MQClientException mqClientException = new MQClientException(info, exception);
            if (callTimeout) {
                throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
            }
    
            if (exception instanceof MQBrokerException) {
                mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
            } else if (exception instanceof RemotingConnectException) {
                mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
            } else if (exception instanceof RemotingTimeoutException) {
                mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
            } else if (exception instanceof MQClientException) {
                mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
            }
    
            // 将包装好的异常结果抛出
            throw mqClientException;
        }
    
        // 校验NameServer服务器是否正常
        validateNameServerSetting();
    
        // 抛出topic异常信息
        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150

    注意this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);这行代码,该代码在消息推送至broker完成之后会立马调用,捕获异常一样会调用该方法,我们前面说了出现耗时过长或者发送失败该broker都会被暂时标记为不可用,我们看看它底层是如何实现的。

    顺着代码定位到org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem

        public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
            // 配置如果开启则生效
            if (this.sendLatencyFaultEnable) {
                // 如果是个隔离异常则标记执行持续时长为30秒,并根据执行时长计算broker不可用时长
                long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
                // 记录broker不可用的时长信息
                this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
            }
        }
    
        private long computeNotAvailableDuration(final long currentLatency) {
            for (int i = latencyMax.length - 1; i >= 0; i--) {
                if (currentLatency >= latencyMax[i])
                    return this.notAvailableDuration[i];
            }
    
            return 0;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    以上方法有三个入参

    • brokerName broker的名称
    • currentLatency 截至当前延迟
    • isolation 是否隔离,该处会直接将隔离状态的延迟时长标记为30秒,也就是说为true时就认为执行时长为30秒

    computeNotAvailableDuration方法中使用了两个数组,我们看看这两个数组

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    
    • 1
    • 2

    以上latencyMax表示执行时长,notAvailableDuration表示broker不可用时长,他们索引位一一对应,该方法是反向遍历索引位置,假设我当前消息推送时长为600ms,对应latencyMax下标是2,那么在notAvailableDuration下标也是2,这个broker的不可用时长则是30000ms。

    下面我们看看不可用的broker是如何维护的,顺着逻辑定位到org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem

    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        // 查看broker是否被标记过
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            // 没有则进行一次标记
            final FaultItem faultItem = new FaultItem(name);
            // 记录本次的耗时
            faultItem.setCurrentLatency(currentLatency);
            // 当前时间+不可用时间=截至时间
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    
            // 再次尝试放入map中,为了防止并发情况下key已存在,则使用putIfAbsent
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                // 放入时已存在则更新存在的对象
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            // broker被标记过则直接更新
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    被标记的broker使用ConcurrentHashMap维护它的延迟对象,其中包含耗时时长和截至不可用的时间

    到这我们就知道异常不可用的原理了,接下来我们看看queue自动决策时相关的代码,我们再次定位到org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

    注意MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);这行代码,因为存在重试逻辑,所以这里有lastBrokerName,表示上次调用时使用的broker,topicPublishInfo表示要投递的topic相关信息。

    顺着逻辑进入到核心queue决策的方法org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 是否开启延迟故障功能
        if (this.sendLatencyFaultEnable) {
            try {
                // 使用threadlocal维护索引位置,做到线程隔离
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                // 遍历所有可用queue
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 索引位置对queue数量进行取模,保证分布尽量均匀
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 检查broker是否可用
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                // 没有可用的broker走下面逻辑
                // 从疑似故障的broker中强行取一个broker出来
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                // 从broker中取一个queue
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
    
            return tpInfo.selectOneMessageQueue();
        }
        // 重最后一个broker中取出一个queue
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    这里latencyFaultTolerance.isAvailable(mq.getBrokerName())就是利用了前面ConcurrentHashMap存储的延迟对象,通过与当前时间来判定是否可用

    public boolean isAvailable() {
        // startTimestamp是 上次调度故障的时间+故障恢复时间
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
    
    • 1
    • 2
    • 3
    • 4

    假如说这里全部broker都曾被标记为故障,且都还没有到达恢复时间怎么办呢?

    以上代码final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();会强制的从故障的broker中取一个出来,我们定位到org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#pickOneAtLeast

    public String pickOneAtLeast() {
        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<FaultItem>();
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            tmpList.add(faultItem);
        }
    
        if (!tmpList.isEmpty()) {
            // 这里属于无效操作,忽略就好,官方已在最新版本修复
            Collections.shuffle(tmpList);
    
            // 进行排序
            Collections.sort(tmpList);
    
            // 这段逻辑表示只从延迟最低的一半broker中选择一个
            final int half = tmpList.size() / 2;
            if (half <= 0) {
                return tmpList.get(0).getName();
            } else {
                final int i = this.whichItemWorst.incrementAndGet() % half;
                return tmpList.get(i).getName();
            }
        }
    
        return null;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    注意以上逻辑,并不是随便拿一个出来,首先会根据最近一次调度的延迟来进行排序,然后折半,只从最快的那一半broker中取模一个broker出来。

    上面Collections.shuffle(tmpList);忽略就好,官方已说明这里将会被修复https://github.com/apache/rocketmq/pull/3945

    可以看到RocketMQ在消息发送端,仅仅是故障熔断、故障切换就做了很多考量,当然,本章所讲代码只适用于在没有手动指定queue且开启了发送延迟故障功能的情况。

  • 相关阅读:
    JMeter笔记7 | JMeter脚本回放
    电子签章软件怎么解救电子检测报告
    Java——List接口
    【小想法】第1期:模型工程化,向量相似度,早停机制,BERT微调小trick
    Docker(3)汇总
    ELF: better symbol lookup via DT_GNU_HASH
    动态调试python源码的步骤与案例
    MobileNetv1论文详解
    jvm概述
    mysql8.0英文OCP考试第131-140题
  • 原文地址:https://blog.csdn.net/qq_21046665/article/details/125892156