• RocketMQ 消息负载均衡策略解析——图解、源码级解析


    🍊 Java学习:Java从入门到精通总结

    🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

    🍊 绝对不一样的职场干货:大厂最佳实践经验指南


    📆 最近更新:2022年6月25日

    🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

    🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


    Producer发送消息时,会首先获取Topic路由信息(通过本地 + 注册中心拉取),RocketMQ的架构里有多个Broker服务器,而消息队列也会存在于多个Broker服务器里,所以就需要负载均衡策略来将流量尽可能均匀的打到所有服务器上。

    在这里插入图片描述

    本章节就介绍一下RocketMQ中常用的四种负载均衡策略。



    默认策略

    找到Producer发送消息时选择消息队列的逻辑,在DefaultMQProducerImpl类中定义了sendDefaultImpl方法:
    在这里插入图片描述
    进入到selectOneMessageQueue方法里:

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
            return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
        }
    
    • 1
    • 2
    • 3

    上述代码的MQFaultStrategy类中定义了selectOneMessageQueue方法:

    public class MQFaultStrategy {
    	/**
         * 默认负载均衡策略
         * 
         * @param tpInfo
         * @param lastBrokerName
         * @return
         */
        public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        	// 检查消息延迟容错开关
            if (this.sendLatencyFaultEnable) {
                try {
                    // 按顺序依次选择
                    int index = tpInfo.getSendWhichQueue().getAndIncrement();
                    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                        if (pos < 0)
                            pos = 0;
                        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                        // 选取时仍然会先选择相同集群下的其他MessageQueue
                        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                                return mq;
                        }
                    }
    				
    				// 从其他Broker里选择一个,该列表的节点根据是否可用,超时时间和最新可用时间做了排序
                    /*
                     * ......
                     */
                    
                } catch (Exception e) {
                }
                // 默认策略
                return tpInfo.selectOneMessageQueue();
            }
            // 延迟容忍开关没开时的默认策略
            return tpInfo.selectOneMessageQueue(lastBrokerName);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    根据源码可以很清楚地看到,默认策略就是依次选择消息队列进行发送,具体的执行细节如下:

    1. 判断延迟容错开关是否打开了,如果打开了就根据默认策略返回一个MQ,否则直接使用TopicPublishInfo中的selectOneMessageQueue(lastBrokerName)方法返回一个MQ
    2. 获取当前轮询到的MQ的索引。当第一次发送消息时,ThreadLocal里存的值如果为空就随机生成一个数字,否则就给这个数字加1

    这里tpInfo.getSendWhichQueue()是存在于ThreadLocal里的,有关资料参考 https://javaguide.cn/java/concurrent/threadlocal/

    1. 如果上一次发送该集群超时失败,选取时仍然会先选择相同集群下的其他MessageQueue
    2. 如果第3步里没有选出来,则从之前失败过的列表中选择一个较好的Broker

    如何选一个较好的Broker呢?
    RocketMQ的实现是按照该列表的节点根据是否可用,超时时间和最新可用时间做了排序

    1. 如果第3、4步都没有选出来,则走到默认策略(轮询出一个新的MQ来)
    public MessageQueue selectOneMessageQueue() {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            return this.messageQueueList.get(pos);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 第一步里如果没有打开延迟容错开关,进入
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
            if (lastBrokerName == null) {
                return selectOneMessageQueue();
            } else {
                int index = this.sendWhichQueue.getAndIncrement();
                for (int i = 0; i < this.messageQueueList.size(); i++) {
                    int pos = Math.abs(index++) % this.messageQueueList.size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = this.messageQueueList.get(pos);
                    if (!mq.getBrokerName().equals(lastBrokerName)) {
                        return mq;
                    }
                }
                return selectOneMessageQueue();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    从不是上一次使用的Broker里选一个MQ出来。


    上面过程,用流程图总结如下:
    在这里插入图片描述



    随机策略

    使用方式

    在编程中,想使用随机策略的话也非常简单,只用传进去一个选择器即可:

    producer.send(message, new SelectMessageQueueByRandoom(), " ");
    
    • 1

    有一个比较有意思的问题,我这里是用的3.5.8版本的RocketMQ,上面方法里的【随机】一词拼写错误,正确的应该是Random,可能是一开始就手误了吧,后面为了兼容性不好直接修改名称。。。。

    源码

    public class SelectMessageQueueByRandoom implements MessageQueueSelector {
        private Random random = new Random(System.currentTimeMillis());
    
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            int value = random.nextInt();
            if (value < 0) {
                value = Math.abs(value);
            }
    
            value = value % mqs.size();
            return mqs.get(value);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    SelectMessageQueueByRandoom的源码也很易读,就是随机选取一个MQ并返回



    Hash策略

    使用方式

    要使用Hash策略发送消息,只需传入一个SelectMessageQueueByHash对象即可:

    producer.send(message, new SelectMessageQueueByHash(), " ");
    
    • 1

    源码

    public class SelectMessageQueueByHash implements MessageQueueSelector {
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        	// arg的计算哈希值
            int value = arg.hashCode();
            if (value < 0) {
                value = Math.abs(value);
            }
    
            value = value % mqs.size();
            return mqs.get(value);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    和随机策略类似,Hash负载均衡策略也很简单,通过arg的hash值来决定返回哪一个MQ



    就近策略

    使用方式

    要使用Hash策略发送消息,只需传入一个SelectMessageQueueByMachineRoom对象即可:

    producer.send(message, new SelectMessageQueueByMachineRoom(), " ");
    
    • 1

    源码

    public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
        private Set<String> consumeridcs;
    
    
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            return null;
        }
    
    
        public Set<String> getConsumeridcs() {
            return consumeridcs;
        }
    
    
        public void setConsumeridcs(Set<String> consumeridcs) {
            this.consumeridcs = consumeridcs;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    有意思的是机房策略的select代码在RocketMQ里并没有编写,而是直接返回null,如果用户有这个需求的话要自行编写!

  • 相关阅读:
    【MySQL 第一天安装教程】
    How to Name a Compound
    Web攻防03_MySQL注入_数据请求
    云原生|kubernetes |部署k8s图形化管理组件 kuboard v3
    Typecho仿卢松松博客主题模板/科技资讯博客主题模板
    大学生个人网页设计 HTML个人网页制作 web个人网站模板 简单静态HTML个人网页作品
    文献阅读4
    数据库实验7 完整性约束
    MySQL主从同步
    java基于ssm+jsp 多用户博客个人网站
  • 原文地址:https://blog.csdn.net/HNU_Csee_wjw/article/details/125566289