• RocketMQ源码阅读(十)消息消费-—消息队列负载均衡


    目录

    RebalanceService

    RebalanceImpl

    rebalanceByTopic

    AllocateMessageQueueStrategy

    updateProcessQueueTableInRebalance


    入口org.apache.rocketmq.client.impl.factory.MQClientInstance#start

    RocketMQ的消息消费流程分为三部分,一消息队列负载均衡,二消息的拉取,三消息的消费

    RebalanceService

    负载均衡定时任务的实现类

     org.apache.rocketmq.client.impl.factory.MQClientInstance#doRebalance

     org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#doRebalance

    RebalanceImpl

    org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance

    rebalanceByTopic

    广播模式所有队列都需订阅, 不用负载均衡,下边只关注集群模式

    1、获取所有消息队列和消费者实例

    2、使用负载均衡算法进行分配

    3、根据分配后最新的队列信息,进行消息拉取或停止原队列消费 

    1. private void rebalanceByTopic(final String topic, final boolean isOrder) {
    2. switch (messageModel) {
    3. case BROADCASTING: {
    4. 。。。。
    5. case CLUSTERING: {
    6. // 从缓存表获取该Topic所有的消息队列
    7. Set mqSet = this.topicSubscribeInfoTable.get(topic);
    8. // 从Broker获取该消费者组所有消费者列表
    9. List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
    10. if (mqSet != null && cidAll != null) {
    11. List mqAll = new ArrayList();
    12. mqAll.addAll(mqSet);
    13. Collections.sort(mqAll);
    14. Collections.sort(cidAll);
    15. // 负载均衡策略处理
    16. AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    17. List allocateResult = null;
    18. try {
    19. allocateResult = strategy.allocate(
    20. this.consumerGroup,
    21. this.mQClientFactory.getClientId(),
    22. mqAll,
    23. cidAll);
    24. } catch (Throwable e) {
    25. log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
    26. e);
    27. return;
    28. }
    29. Set allocateResultSet = new HashSet();
    30. if (allocateResult != null) {
    31. allocateResultSet.addAll(allocateResult);
    32. }
    33. // 根据最新的队列信息,更新本地
    34. // 如果有队列被移除,停止消费原队列
    35. // 如果有队列新增,新增拉取消息的任务
    36. boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
    37. if (changed) {
    38. log.info(
    39. "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
    40. strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
    41. allocateResultSet.size(), allocateResultSet);
    42. this.messageQueueChanged(topic, mqSet, allocateResultSet);
    43. }
    44. }
    45. break;
    46. }
    47. default:
    48. break;
    49. }
    50. }

    AllocateMessageQueueStrategy

    分配队列的策略接口,实现类有如下,假设有8个消费队列,3个消费者ABC

    AllocateMessageQueueAveragely  默认使用的,分配后A123, B456, C67

    AllocateMessageQueueAveragelyByCircle  分配后A147     B258    C36

    AllocateMessageQueueConsistentHash   环形一致性hash算法,下节

    AllocateMessageQueueByConfig   自定义配置,为每个消费者自定义要订阅的队列

    AllocateMessageQueueByMachineRoom   也是自定义配置,每个消费者只负载自定义配置的机房中Broker的队列信息,BrokerName也需要规范命名

    AllocateMessageQueueAveragely#allocate  代码如下

    updateProcessQueueTableInRebalance

    如果有删除的队列,停止消费消息,移除消息队列

    如果有新增的队列,新增消息拉取任务

  • 相关阅读:
    电脑分区如何合并?简单易行的操作方法!
    docker部署go项目
    Android NDK篇-C++语言之运算符重载 与多继承二义性
    Unity—项目小技术总结
    MVC医院信息管理系统源码 BS架构
    JAVA三道编程题
    网站系统告警哪家强
    CICD—Linux下Jenkins+Gitlab 自动化打包安卓-APK
    CS5801 HDMI转4K 4lane_DP/eDP方案
    中文GPTS详尽教程,字节扣子Coze插件使用全输出
  • 原文地址:https://blog.csdn.net/xyjy11/article/details/126174547