• 你肯定不知道RocketMQ生产者是如何规避故障Broker的


    前言

    在消息发送过程中,生产者从NameServer中获取到了指定Topic对应的Broker信息,在同步发送消息的代码中,如果消息发送失败,生产者默认是会重试两次的。那么Broker有问题的情况下,无论重试多少次都是没有意义的,消息生产者是如何规避这些故障Broker的呢?

    收集故障Broker

    我们在所有的发送消息源码中都可以找到这样一段代码,可在DefaultMQProducerImpl类中查找:

    1. this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    2. 复制代码

    无论是发送成功还是失败,RocketMQ生产者客户端都会做这一步操作:

    1. // 发送成功的话,isolation传false,失败isolation传true
    2. public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    3.    if (this.sendLatencyFaultEnable) {
    4.        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
    5.        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    6.   }
    7. }
    8. private long computeNotAvailableDuration(final long currentLatency) {
    9.    for (int i = latencyMax.length - 1; i >= 0; i--) {
    10.        if (currentLatency >= latencyMax[i])
    11.            return this.notAvailableDuration[i];
    12.   }
    13.    return 0;
    14. }
    15. private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    16. 复制代码

    如果Broker产生故障,那么会创建一个FaultItem对象记录故障的Broker,并把结果放进故障规避表faultItemTable中,数据格式如下:

    1. "broker-a": {
    2.  // broker名称
    3.  "name": "broker-a",
    4.  "currentLatency": 发送消息消耗的时间,毫秒值之差,
    5.  // 解除规避的时间,绝对时间
    6.  "startTimestamp": 时间戳毫秒值
    7. },
    8. "broker-b": {
    9.  // broker名称
    10.  "name": "broker-b",
    11.  "currentLatency": 发送消息消耗的时间,毫秒值之差,
    12.  // 解除规避的时间,绝对时间
    13.  "startTimestamp": 时间戳毫秒值
    14. }
    15. 复制代码

    发送成功的Broker设置的故障规避时间为0,发送失败的Broker将被设置为规避30秒;

    选择Broker

    MQFaultStrategy.selectOneMessageQueue()方法中,我们分三部分来分析如何选择Broker。

    • 轮询选择一个可用的Broker
    1. // 轮询的基本套路,一个自增变量
    2. int index = tpInfo.getSendWhichQueue().incrementAndGet();
    3. for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
    4.     // 通过对队列数量取模,获取选定的Broker所在的位置
    5.     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
    6.     if (pos < 0)
    7.         pos = 0;
    8.     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
    9.     // 判断Broker是否在规避时间内,如果不在规避时间内,就选择这个Broker,否则继续循环直至所有Broker都在规避时间内
    10.     if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
    11.         return mq;
    12. }
    13. 复制代码

    1.轮询的基本套路都是通过一个自增变量来对所有的Broker数量取模,这样就可以命中一个Broker;

    2.针对命中的Broker判断是否在规避时间范围内,不在规避时间内就可以返回;否则只能进入第二个方案;

    • 选择一个相对延迟低的Broker
    1. // 把所有规避列表中的Broker按延迟高低排序,并从延迟低的Broker中选择一个
    2. final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    3. int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    4. // 判断该Broker是否允许写消息
    5. if (writeQueueNums > 0) {
    6.    final MessageQueue mq = tpInfo.selectOneMessageQueue();
    7.    if (notBestBroker != null) {
    8.        mq.setBrokerName(notBestBroker);
    9.        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
    10.   }
    11.    // 返回选中的Broker
    12.    return mq;
    13. }
    14. 复制代码

    1.从规避列表中找到延时比较低的Broker;

    2.判断该Broker是否允许写消息,允许写消息的话就直接返回,否则再进入下一个方案;

    • 默认的选择
    1. return tpInfo.selectOneMessageQueue();
    2. 复制代码

    最后直接轮询一个Broker直接返回:

    1.    public MessageQueue selectOneMessageQueue() {
    2.        int index = this.sendWhichQueue.incrementAndGet();
    3.        int pos = Math.abs(index) % this.messageQueueList.size();
    4.        if (pos < 0)
    5.            pos = 0;
    6.        return this.messageQueueList.get(pos);
    7.   }
    8. 复制代码

    该方案是默认方案,没有开启故障规避配置的话,所有Broker的选择都是使用的该方案;

    小结

    RocketMQ通过设置故障规避表的方式,把所有的Broker的延迟数据都保留在故障规避表中,根据该列表制定了以下几种策略:

    1.优先选择不在规避时间范围内的Broker

    2.如果所有Broker都在规避时间内,优先选择延迟低的Broker

    3.如果依然没有选中合适的Broker,那么就直接挑一个Broker来用;

  • 相关阅读:
    【Python & turtle】绘制一个有趣的的Emoticons
    RT-Thread实战笔记-小白一看就会的平衡车教程(附源码)
    【毕业设计】基于超声波与红外的自动调速风扇系统(代码开源) -物联网 嵌入式 stm32
    视频理解【论文学习】
    如何开发一款高效便捷的搬家服务小程序
    宏集新闻 | 虹科传感器事业部正式更名为宏集科技
    RKMEDIA--VI的使用
    C++ 中的 this 指针
    NFTScan | 10.09~10.15 NFT 市场热点汇总
    SpringBoot-黑马程序员-学习笔记(三)
  • 原文地址:https://blog.csdn.net/m0_71777195/article/details/128049080