目录
updateProcessQueueTableInRebalance
入口org.apache.rocketmq.client.impl.factory.MQClientInstance#start
RocketMQ的消息消费流程分为三部分,一消息队列负载均衡,二消息的拉取,三消息的消费
负载均衡定时任务的实现类
org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#doRebalance
org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance
广播模式所有队列都需订阅, 不用负载均衡,下边只关注集群模式
1、获取所有消息队列和消费者实例
2、使用负载均衡算法进行分配
3、根据分配后最新的队列信息,进行消息拉取或停止原队列消费
- private void rebalanceByTopic(final String topic, final boolean isOrder) {
- switch (messageModel) {
- case BROADCASTING: {
- 。。。。
- case CLUSTERING: {
- // 从缓存表获取该Topic所有的消息队列
- Set
mqSet = this.topicSubscribeInfoTable.get(topic); -
- // 从Broker获取该消费者组所有消费者列表
- List
cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); -
- if (mqSet != null && cidAll != null) {
- List
mqAll = new ArrayList(); - mqAll.addAll(mqSet);
-
- Collections.sort(mqAll);
- Collections.sort(cidAll);
-
- // 负载均衡策略处理
- AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
-
- List
allocateResult = null; - try {
- allocateResult = strategy.allocate(
- this.consumerGroup,
- this.mQClientFactory.getClientId(),
- mqAll,
- cidAll);
- } catch (Throwable e) {
- log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
- e);
- return;
- }
-
- Set
allocateResultSet = new HashSet(); - if (allocateResult != null) {
- allocateResultSet.addAll(allocateResult);
- }
-
- // 根据最新的队列信息,更新本地
- // 如果有队列被移除,停止消费原队列
- // 如果有队列新增,新增拉取消息的任务
- boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
- if (changed) {
- log.info(
- "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
- strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
- allocateResultSet.size(), allocateResultSet);
- this.messageQueueChanged(topic, mqSet, allocateResultSet);
- }
- }
- break;
- }
- default:
- break;
- }
- }
分配队列的策略接口,实现类有如下,假设有8个消费队列,3个消费者ABC
AllocateMessageQueueAveragely 默认使用的,分配后A123, B456, C67
AllocateMessageQueueAveragelyByCircle 分配后A147 B258 C36
AllocateMessageQueueConsistentHash 环形一致性hash算法,下节
AllocateMessageQueueByConfig 自定义配置,为每个消费者自定义要订阅的队列
AllocateMessageQueueByMachineRoom 也是自定义配置,每个消费者只负载自定义配置的机房中Broker的队列信息,BrokerName也需要规范命名
AllocateMessageQueueAveragely#allocate 代码如下
如果有删除的队列,停止消费消息,移除消息队列
如果有新增的队列,新增消息拉取任务