🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2022年6月25日
🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
Producer发送消息时,会首先获取Topic路由信息(通过本地 + 注册中心拉取),RocketMQ的架构里有多个Broker服务器,而消息队列也会存在于多个Broker服务器里,所以就需要负载均衡策略来将流量尽可能均匀的打到所有服务器上。
本章节就介绍一下RocketMQ中常用的四种负载均衡策略。
找到Producer发送消息时选择消息队列的逻辑,在DefaultMQProducerImpl
类中定义了sendDefaultImpl
方法:
进入到selectOneMessageQueue
方法里:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
上述代码的MQFaultStrategy
类中定义了selectOneMessageQueue
方法:
public class MQFaultStrategy {
/**
* 默认负载均衡策略
*
* @param tpInfo
* @param lastBrokerName
* @return
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
// 检查消息延迟容错开关
if (this.sendLatencyFaultEnable) {
try {
// 按顺序依次选择
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 选取时仍然会先选择相同集群下的其他MessageQueue
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 从其他Broker里选择一个,该列表的节点根据是否可用,超时时间和最新可用时间做了排序
/*
* ......
*/
} catch (Exception e) {
}
// 默认策略
return tpInfo.selectOneMessageQueue();
}
// 延迟容忍开关没开时的默认策略
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
}
根据源码可以很清楚地看到,默认策略就是依次选择消息队列进行发送,具体的执行细节如下:
TopicPublishInfo
中的selectOneMessageQueue(lastBrokerName)
方法返回一个MQThreadLocal
里存的值如果为空就随机生成一个数字,否则就给这个数字加1这里
tpInfo.getSendWhichQueue()
是存在于ThreadLocal
里的,有关资料参考 https://javaguide.cn/java/concurrent/threadlocal/
如何选一个较好的Broker呢?
RocketMQ的实现是按照该列表的节点根据是否可用,超时时间和最新可用时间做了排序
public MessageQueue selectOneMessageQueue() {
int index = this.sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
return this.messageQueueList.get(pos);
}
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return selectOneMessageQueue();
}
}
从不是上一次使用的Broker里选一个MQ出来。
上面过程,用流程图总结如下:
在编程中,想使用随机策略的话也非常简单,只用传进去一个选择器即可:
producer.send(message, new SelectMessageQueueByRandoom(), " ");
有一个比较有意思的问题,我这里是用的3.5.8版本的RocketMQ,上面方法里的【随机】一词拼写错误,正确的应该是
Random
,可能是一开始就手误了吧,后面为了兼容性不好直接修改名称。。。。
public class SelectMessageQueueByRandoom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
SelectMessageQueueByRandoom
的源码也很易读,就是随机选取一个MQ并返回
要使用Hash策略发送消息,只需传入一个SelectMessageQueueByHash
对象即可:
producer.send(message, new SelectMessageQueueByHash(), " ");
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// arg的计算哈希值
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
和随机策略类似,Hash负载均衡策略也很简单,通过arg的hash值来决定返回哪一个MQ
要使用Hash策略发送消息,只需传入一个SelectMessageQueueByMachineRoom
对象即可:
producer.send(message, new SelectMessageQueueByMachineRoom(), " ");
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
有意思的是机房策略的select代码在RocketMQ里并没有编写,而是直接返回null,如果用户有这个需求的话要自行编写!