想必你已经了解RocketMQ的设计哲学是,同一时刻同一个Queue只能被一个Consumer持有,
但同一个Consumer可以同时消费多个队列,为了订阅、消费模型的高效,Rocket总是希望将Queue分配的足够均匀,
日常使用时,Consumer的上下线,Queue的动态扩缩容,都可能会破坏分配均衡性,
故而Rocket提供了一套完整的Rebalance机制针对上述状况。
- 总结下来Rebalance一共三个触发条件,两个主动,一个被动。满足任意一个都会触发
-
- 1.Consumer启动之时执行start方法主动执行负载均衡逻辑;
- 2.定时任务触发;
- 3.Broker下发通知告知Client需要进行负载均衡;
-
- 今天重新翻阅代码的时候发现
- 很巧合三个触发条件或多或少跟DefaultMQPushConsumerImpl.start()都有关系;
-
DefaultMQPushConsumerImpl创建实例时,会初始化rebalanceImpl成员变量 private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); 此时此刻这个rebalanceImpl对象没有一点作用,因为ta的关键成员属性尚且为null,下文中的start肩负起了赋值重任。
下面是摘录start()主要代码:
- public synchronized void start() throws MQClientException {
- switch (this.serviceState) {
- case CREATE_JUST:
- /* 检查配置 */
- this.checkConfig();
- /* 构建 Topic 订阅信息——SubscriptionData,并添加至 RebalanceImpl 的订阅信息中 */
- this.copySubscription();
- /* 初始化 MQClientInstance */
- this.mQClientFactory = MQClientManager.getInstance()
- .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
- /**
- * 丰富 rebalanceImpl 对象属性,注意到了吗之前初始化的对象充血了
- * 之前产生的 rebalanceImpl 对象直到此刻才算真正意义上的初始化完成
- * rebalanceImpl就是负载均衡的相关实现
- */
- this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
- this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
- this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
- this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
-
- /**
- * 向 MQClientInstance 注册消费者,并启动 MQClientInstance
- * 一个 JVM 中的所有消费者、生产者持有同一个 MQClientInstance,MQClientInstance 只会启动一次
- */
- boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
- break;
- case
- ...;
- default:
- break;
- }
-
- /* Consumer启动成功,立即向所有Broker发送心跳 */
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
-
- /*
- * 注意到了嘛,Consumer上线会立即触发一次负载均衡
- * 但是这里并不是调用一下负载均衡的实现那么简单,这里其实是唤醒了相关服务线程
- * 下文笔者会着重介绍
- */
- this.mQClientFactory.rebalanceImmediately();
- }
-
见名知意,Consumer负载均衡相关的操作全部都委托给RebalanceImpl对象。 每一个Consumer的对象都持有一个RebalanceImpl实例,每个RebalanceImpl实例也只服务于一个Consumer。
二者是一个相互持有,循环引用的关系。
我们来看一下这个对象的关键成员属性:
- RebalancePushImpl extends RebalanceImpl {
- protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(64);
-
- /* ConcurrentMap<topic, Set<MessageQueue>>, Topic与分给自己的MessageQueue信息 */
- protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<>();
- /* ConcurrentMap<topic, SubscriptionData>, Topic与订阅数据 */
- protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap<>();
-
- /* 负载算法的具体实现,究竟如何分配就是由这个总指挥决定的 */
- protected AllocateMessageQueueStrategy allocateMessageQueueStrategy;
-
- /* Consumer实例 */
- private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
- }
-
其中有一个ProcessQueue对象尤为瞩目,因为我没做任何注释,如果你有阅读Rocket源码应该知道他是 Consumer消费Message过程中极其重要的一环,举足轻重,你可以认为ta是Client端的消息承载者。 因为跟负载时机关系不大所以此处不再赘述。
负责进行均衡负载的就是doRebalance(),实际上真正执行负载逻辑的是rebalanceByTopic();
rebalanceByTopic()是负载均衡的最终落脚点,即系统中所有需要负载的调用最后都会走到这里来。
集群消费模式下的实现 摘录关键代码:
- private void rebalanceByTopic(String topic, boolean isOrder) {
- /* 获取到该Topic下的所有队列 */
- Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
- /* 获取该Topic下ConsumerGroup此消费组所有的消费者Id */
- List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
- if (mqSet != null && cidAll != null) {
- List<MessageQueue> mqAll = new ArrayList<>(mqSet);
-
- /* 这两个排序极其关键 */
- Collections.sort(mqAll);
- Collections.sort(cidAll);
-
- /* 负载均衡算法 */
- AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
- List<MessageQueue> allocateResult;
- /* 调用具体的算法实现进行队列分配 */
- allocateResult = strategy.allocate(
- this.consumerGroup,
- this.mQClientFactory.getClientId(),
- mqAll,
- cidAll
- );
- }
- }
-
看似轻描淡写实则举足轻重的两句代码:
Collections.sort(mqAll);
Collections.sort(cidAll);
这两句代码意义在于该ConsumeGroup下的所有Consumer得到的队列顺序、消费者Id顺序都是一致的。在分配视图一致性得到保证的前提下,分配算法又是相同的,这样一来尽管各个Consumer在负载均衡的时候不进行任何信息交换,但是却可以互不干扰有条不紊的将队列均衡的分配完毕。
如何分配的具体细节在allocateMessageQueueStrategy中,RocketMQ也默认支持多种分配算法,比较简单,笔者不想赘述。
负载均衡的实现讲完了,那谁会调用ta,如何调用ta,在什么时机下会调用ta,一个个问题萦绕心头,分析上述问题之前我们总是绕不开RebalanceService对象。
RocketMQ中有一类对象地位超然,特立独行,一个对象主宰一个领域,ta们往往只受操作系统的约束。 似乎操作系统也格外偏爱他们,因为会给他们分配时间片,直接调度他们的运行(其实原因很简单,后面会有答案)。 ta就是传说中的ServiceThread,江湖中人往往称呼ta们为服务线程。
- public abstract class ServiceThread implements Runnable {
- protected boolean isDaemon = false;
-
- /* 这能不被钟爱吗,直接持有一个独立线程 */
- private Thread thread;
-
- /* 执行start的时候申请一个线程 */
- public void start() {
- /* 只允许申请一次 */
- if (!started.compareAndSet(false, true)) {
- return;
- }
- stopped = false;
- this.thread = new Thread(this, getServiceName());
- /* 设置为非守护线程 */
- this.thread.setDaemon(isDaemon);
- this.thread.start();
- }
- }
-
ServiceThread家族兴旺,除了专门负责负载均衡的RebalanceService,还有一众兄弟姐妹:
FlushRealTimeService:异步刷盘服务线程
CommitRealTimeService:异步刷盘服务线程
GroupCommitService: 同步刷盘服务线程
......
每一个都是Rocket能够稳定运行的背后功臣,上述三个其实就是刷盘相关的服务线程。 (涉及的面太广,如果有人想看,我尝试分析一下)
有了上面的铺垫,RebalanceService应该很好理解了,单独持有一个线程进行负载均衡,当然ta也不是无休止的一直进行负载处理。
- public class RebalanceService extends ServiceThread {
- /* 负载均衡时间间隔,默认20s,支持配置 */
- private final static long waitInterval =Long.parseLong(
- System.getProperty("rocketmq.client.rebalance.waitInterval", "20000")
- );
-
- private final MQClientInstance mqClientFactory;
-
- public void run() {
- /* 只要该线程未终止就会一直执行 */
- while (!this.isStopped()) {
- /* 喜欢才会放肆,但爱是克制,休息20s */
- this.waitForRunning(waitInterval);
- /* 执行负载均衡 */
- this.mqClientFactory.doRebalance();
- }
- }
- }
-
- 理解了上面的RebalanceService,应该就理解了定时触发的逻辑,只需要定时唤醒服务线程即可
- 每个Java服务单点只会启动一个RebalanceService服务实例,同时也只会启动一个mqClientFactory实例
- 单点内所有的Consumer实例都会共用该实例对象。
- 每次定时触发mqClientFactory.doRebalance()都会对该JVM下持有的所有Consumer进行负载均衡
-
- /**
- * RebalanceService 线程默认每隔20s调用该方法
- * ⚠️:每个 Java 服务单点只会启动一个 MQClientInstance 实例,单点内所有的 Consumer 实例都会持有该实例对象
- * @see #registerConsumer Consumer 对象会将自己注册进 MQClientInstance
- * @see #consumerTable Consumer对象注册表
- *
- * ⚠️:一个 Java 服务单点只有一个 RebalanceService 服务线程
- * ⚠️:但每个 Consumer 实例都持有一个 RebalanceImpl 对象
- */
- public void doRebalance() {
- for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
- MQConsumerInner impl = entry.getValue();
- if (impl != null) {
- try {
- impl.doRebalance();
- } catch (Throwable e) {
- log.error("doRebalance exception", e);
- }
- }
- }
- }
-
- 写这篇记录之时我还在思考,为什么刚刚那个定时任务不是交给JDK中的ScheduledExecutorService
- 事实上,RocketMQ中的很多定时任务也都是这么做的。
-
- 可是直到刚刚我才明白,因为RebalanceService支持主动唤醒,提前执行任务。
-
- Consumer上线时候触发主动负载均衡就是因为唤醒了RebalanceService线程,
- start()最后会调用rebalanceImmediately()
- public void rebalanceImmediately() {
- this.rebalanceService.wakeup();
- }
-
-
Broker下发通知指挥Consumer需要负载均衡则明显复杂很多,但万变不离其宗,无非是多几次Rpc调用而已,无非是网络传输了一遭而已。
- 每当DefaultMQPushConsumerImpl实例,调用start之后,总是会向Broker发送一个心跳
- 调用栈如下:
- DefaultMQPushConsumerImpl.start()
- -> MQClientInstance.sendHeartbeatToAllBrokerWithLock()
- -> MQClientInstance.sendHeartbeatToAllBroker()
- -> MQClientAPIImpl.sendHearbeat()
-
Consumer启动之后会立即发出一个心跳包告知Broker。
- public int sendHearbeat(
- String addr, HeartbeatData heartbeatData, long timeoutMillis
- ) throws RemotingException, MQBrokerException, InterruptedException {
- /* 又是一次 Rpc 远程调用 */
- RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
- request.setLanguage(clientConfig.getLanguage());
- request.setBody(heartbeatData.encode());
- RemotingCommand response = this.remotingClient.invokeSync(
- addr, request, timeoutMillis
- );
- }
-
根据RequestCode.HEART_BEAT得知此次Rpc的处理器为ClientManageProcessor ClientManageProcessor.heartBeat() -> ConsumerManager.registerConsumer()
仅摘录关键代码:
- public boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
- ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
- Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable
- ) {
- /* 有新的Consumer上线则有新的SocketChannel建立 */
- boolean r1 = consumerGroupInfo.updateChannel(
- clientChannelInfo, consumeType, messageModel, consumeFromWhere
- );
-
- /* 判断订阅信息是否发生变化 */
- boolean r2 = consumerGroupInfo.updateSubscription(subList);
-
- if (r1 || r2) {
- if (isNotifyConsumerIdsChangedEnable) {
- /* 触发ConsumerGroupEvent.CHANGE事件 */
- this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
- }
- }
- }
-
调用进行到DefaultConsumerIdsChangeListener.handle()
可以看到如果是CHANGE事件则调用Broker2Client.notifyConsumerIdsChanged()
- public void notifyConsumerIdsChanged(Channel channel, String consumerGroup) {
- /* 构造Rpc请求头 */
- NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
- requestHeader.setConsumerGroup(consumerGroup);
- /* 构造Rpc请求对象 */
- RemotingCommand request = RemotingCommand.createRequestCommand(
- RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader
- );
-
- /* 又是一次RPC */
- this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
- }
-
这个Rpc请求最终会交给ClientRemotingProcessor.notifyConsumerIdsChanged()处理
- public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
- RemotingCommand request) {
- NotifyConsumerIdsChangedRequestHeader requestHeader =(NotifyConsumerIdsChangedRequestHeader) request
- .decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class);
- /* 调用负载均衡逻辑 */
- this.mqClientFactory.rebalanceImmediately();
- }