在RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢?
通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sendDefaultImpl()这个方法,并且我们找到了最关键的Topic信息:
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- 复制代码
这个方法就是通过topic从NameServer拉出对应的Broker信息:
- private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
- TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
- if (null == topicPublishInfo || !topicPublishInfo.ok()) {
- this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- }
-
- if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
- return topicPublishInfo;
- } else {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- return topicPublishInfo;
- }
- }
- 复制代码
1.一开始的话,是从当前缓存中找
Topic信息,第一次肯定是找不到的;2.找不到
Topic信息,那么就调用updateTopicRouteInfoFromNameServer(topic)从NameServer拉对应的信息,如果拉到了就更新到缓存中;3.如果依然找不到
Topic信息,说明没有任何Broker上面是有这个Topic的;但是我们还要拉开启了自动创建Topic配置的Broker信息,通过updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer)实现;
生产者客户端会从两个地方获取Broker信息,第一个就是从内存缓存中获取,第二个就是从NameServer中获取。从NameServer中分两次获取,一次是获取存在的Topic对应的Broker信息,第二次是获取还没有创建出来的Topic对应的Broker信息;
当客户端拿到了Topic对应的Broker信息后,它是如何选择目标Broker的呢?继续向下看,我们找到了关键代码:
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
- int times = 0;
- String[] brokersSent = new String[timesTotal];
- for (; times < timesTotal; times++) {
- String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
- if (mqSelected != null) {
- mq = mqSelected;
- brokersSent[times] = mq.getBrokerName();
- ......
- 复制代码
1.如果是同步发送消息,那么【总的发送次数】=1+【重试次数】,如果是异步发送,默认是1;我们当前是同步模式,所以会存在重试;
2.选择
Broker的关键代码就在selectOneMessageQueue()方法中,通过前面拿到的topicPublishInfo作为参数,lastBrokerName作为额外的考虑参数;
追踪代码,我们进入MQFaultStrategy.selectOneMessageQueue()中:
- public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
- if (this.sendLatencyFaultEnable) {
- try {
- int index = tpInfo.getSendWhichQueue().incrementAndGet();
- for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
- int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
- if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
- return mq;
- }
-
- final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
- int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
- if (writeQueueNums > 0) {
- final MessageQueue mq = tpInfo.selectOneMessageQueue();
- if (notBestBroker != null) {
- mq.setBrokerName(notBestBroker);
- mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
- }
- return mq;
- } else {
- latencyFaultTolerance.remove(notBestBroker);
- }
- } catch (Exception e) {
- log.error("Error occurred when selecting message queue", e);
- }
-
- return tpInfo.selectOneMessageQueue();
- }
-
- return tpInfo.selectOneMessageQueue(lastBrokerName);
- }
- 复制代码
1.如果开启了延迟故障规避,那么执行规避策略;
- 1.1:轮询找一个
Broker,该Broker要么不在规避名单内,要么已经度过了规避期(发送消息失败会将目标Broker放进规避名单,沉默一段时间);- 1.2:如果所有的
Broker都没有度过规避期,那么从比较好的那一部分Broker里面找一个出来;- 1.3:如果依然没有找到合适的
Broker,那么就随机选一个Broker;2.否则就随机选一个
Broker;
下面我们来看一下随机发送的策略是怎么实现的:
- public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
- if (lastBrokerName == null) {
- return selectOneMessageQueue();
- } else {
- for (int i = 0; i < this.messageQueueList.size(); i++) {
- int index = this.sendWhichQueue.incrementAndGet();
- int pos = Math.abs(index) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = this.messageQueueList.get(pos);
- if (!mq.getBrokerName().equals(lastBrokerName)) {
- return mq;
- }
- }
- return selectOneMessageQueue();
- }
- }
-
- public MessageQueue selectOneMessageQueue() {
- int index = this.sendWhichQueue.incrementAndGet();
- int pos = Math.abs(index) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- return this.messageQueueList.get(pos);
- }
- 复制代码
1.如果第一次发送消息,那么通过自增求余的方式从列表中找一个
Broker,其实就是轮询方式;2.如果不是第一次发送消息,那么会尽可能避开上一次的
Broker服务,也是为了让Broker服务负载均衡;3.如果没有避开上一次的
Broker,那么再向后找另一个Broker;除非只有一个Broker服务,否则会尽可能避开上次发送的Broker;
通过源码分析,我们已经知道了生产者是如何选择目标Broker的了:
1.第一次发消息,通过轮询的方式选择
Broker;2.后续发消息会规避上次的
Broker,同样采用轮询的方式选择Broker;3.在消息发送过程中,存在一个
Broker规避列表,用户可以通过setSendLatencyFaultEnable(true)开启故障规避策略,客户端会尽可能选择不在规避列表中的Broker,如果所有的Broker都在规避列表中,那么会选择一个相对比较好的Broker来用;