问题:RocketMQ批量发送消息是负载均衡的吗?
答:在rocketmq 4.0版本中不是负载均衡的,而是随机挑选的一个队列。
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); 其中index是一个随机整数.
源码中先有函数batch,将Collection
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
- // 判断是否开启发送延迟故障容错机制
- if (this.sendLatencyFaultEnable) {
- try {
- // 获取一个随机整数作为index
- int index = tpInfo.getSendWhichQueue().getAndIncrement();
-
- // 遍历tpInfo.getMessageQueueList(),选择一个MessageQueue
- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
- int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
-
- // 判断mq所属的BrokerName是否可用
- if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
- // 如果lastBrokerName为空或者与mq所属的BrokerName相等,则返回该mq
- if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
- return mq;
- }
- }
-
- // 获取一个不是最佳Broker的BrokerName
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
-
- // 根据notBestBroker获取对应的writeQueueNums
- int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
-
- if (writeQueueNums > 0) {
- // 选择一个MessageQueue
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
-
- if (notBestBroker != null) {
- // 设置mq的BrokerName为notBestBroker
- mq.setBrokerName(notBestBroker);
-
- // 设置mq的QueueId为tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums
- mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
- }
-
- return mq;
- } else {
- // 移除notBestBroker
- latencyFaultTolerance.remove(notBestBroker);
- }
- } catch (Exception e) {
- // 记录错误日志
- log.error("Error occurred when selecting message queue", e);
- }
-
- // 返回选择的MessageQueue
- return tpInfo.selectOneMessageQueue();
- }
-
- // 返回选择的MessageQueue
- return tpInfo.selectOneMessageQueue(lastBrokerName);
- }
而后有函数selectOneMessageQueue,为MessageBatch选取一个队列:
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
- if (this.sendLatencyFaultEnable) {
- try {
- int index = tpInfo.getSendWhichQueue().getAndIncrement();// index为随机整数
- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
- int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
- if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
- if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
- return mq;
- }
- }
-
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
- if (writeQueueNums > 0) {
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
- if (notBestBroker != null) {
- mq.setBrokerName(notBestBroker);
- mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
- }
- return mq;
- } else {
- latencyFaultTolerance.remove(notBestBroker);
- }
- } catch (Exception e) {
- log.error("Error occurred when selecting message queue", e);
- }
-
- return tpInfo.selectOneMessageQueue();
- }
-
- return tpInfo.selectOneMessageQueue(lastBrokerName);
- }
最后调用函数sendKernelImpl发送消息:
- private SendResult sendKernelImpl(final Message msg,
- final MessageQueue mq,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final TopicPublishInfo topicPublishInfo,
- final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- ...
- }