• RocketMq5 消息消费及相关源码浅阅


    如何消费

    example

    public class RocketMQConsumerDemo { 
    	public static void main(String[] args) throws Exception{ 
    	//创建消费者 
    	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group-java"); 
    	//设置 broker 地址 
    	consumer.setNamesrvAddr("192.168.66.66:9876"); 	
    	consumer.subscribe("topicA", "*");
    	consumer.registerMessageListener(new MessageListenerConcurrently() { 	
    		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    			for (MessageExt messageExt : list) { 
    				System.out.println("接受消息:队列 ID==>"+messageExt.getQueueId() + " 消息 Id:" + messageExt.getMsgId()); 	
    			}
    			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; 
    		} }); 	
    			consumer.start(); 
    			//System.in.read();
    	} 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    注意: 订阅消息必须要使得消费者订阅主题必须存在。

    问题 1:

    一个消费者组不同的消费者是否可以同时消费多个主题??(试验)
    在这里插入图片描述
    答案:同一个消费者组中多个消费者不能同时消费多个主题topic,如果出现此现象,会导致部分消息无法消费,如上图。

    在这里插入图片描述
    什么原因导致这样现象呢??

    答案:负载均衡算法问题导致了这一现象。(平均分配算法,环形平均分配算法…)

    负载算法:

    在这里插入图片描述

    • 根据组名,把主题下的队列平均分配给同一个消费者组下的消费者
    • topicA主题:4个队列先平均分配,分配给A 2个队列,B 2个队列,但是B没有订阅TopocA,因此就会导致2,3队列无法被消费。
    • topicB主题:4个队列先平均分配,分配给A 2个队列,B 2个队列,但是A没有订阅topicB主题,因此就会导致0,1无法被消费。

    问题2:

    不同的消费者组,是否可以消费相同同的topic ??

    可以的
    
    • 1

    消费模型

    一对多 (topic 多个消费者组)

    在这里插入图片描述

    一对一

    在这里插入图片描述
    注意:默认消费模式就是 一个消费者组消费一个主题(使用负载均衡算法进行分配)

    消费流程

    开始执行消息拉取

    由demo中 consumer.start(); 方法进入:

     public void start() throws MQClientException {
            this.defaultMQPushConsumerImpl.start();
     }
    
    • 1
    • 2
    • 3

    defaultMQPushConsumerImpl.start()方法实现:

    	public void start() throws MQClientException {
            switch (this.serviceState) {
                case CREATE_JUST:
                    log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),
                        this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                    this.serviceState = ServiceState.START_FAILED;
    
                    // 检查配置
                    this.checkConfig();
    
                    // Rebalance负载均衡 复制订阅数据
                    this.copySubscription();
    
                    // 设置instanceName,为一个字符串化的数字,比如10072
                    if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
                        this.defaultMQPushConsumer.changeInstanceNameToPID();
                    }
    
                    // 获取MQClient对象,clientId为ip@instanceName,比如192.168.0.1@10072
                    this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    
                    // 设置负载均衡器
                    this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                    //默认这是消费模式为集群模式,每条消息被同一组的消费者中的一个消费
                    //还可以设置为广播模式,每条消息被同一个组的所有消费者都消费一次
                    this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                    //默认是AllocateMessageQueueAveragely,均分策略
                    this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                    this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    
                    // 拉取API封装
                    this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    
                    //生成消费进度处理器,集群模式下消费进度保存在Broker上,因为同一组内的消费者要共享进度;广播模式下进度保存在消费者端
                    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                    }
                    //若是广播模式,加载本地的消费进度文件
                    this.offsetStore.load();
    
                    // 根据监听是顺序模式还是并发模式来生成相应的ConsumerService
                    if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                        this.consumeOrderly = true;
                        this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly)this.getMessageListenerInner());
                    } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                        this.consumeOrderly = false;
                        this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently)this.getMessageListenerInner());
                    }
                    this.consumeMessageService.start();
    
                    // 设置MQClient对象
                    boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                    if (!registerOK) {
                        this.serviceState = ServiceState.CREATE_JUST;
                        this.consumeMessageService.shutdown();
                        throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                            + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                            null);
                    }
                    mQClientFactory.start();
                    log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
    
                    // 设置服务状态
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                case START_FAILED:
                case SHUTDOWN_ALREADY:
                    throw new MQClientException("The PushConsumer service state not OK, maybe started once, "//
                        + this.serviceState//
                        + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                        null);
                default:
                    break;
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    在这里插入图片描述

    消费服务拉取

    mQClientFactory.start()实现:

    	public void start() throws MQClientException {
    
            synchronized (this) {
                switch (this.serviceState) {
                    case CREATE_JUST:
                        this.serviceState = ServiceState.START_FAILED;
                        // If not specified,looking address from name server
                        if (null == this.clientConfig.getNamesrvAddr()) {
                            //如果url未指定,可以通过Http请求从其他处获取
                            this.mQClientAPIImpl.fetchNameServerAddr();
                        }
                        // Start request-response channel
                        this.mQClientAPIImpl.start();
                        // 启动多个定时任务
                        this.startScheduledTask();
                        // Start pull service  拉取消费服务
                        this.pullMessageService.start();
                        // Start Consumer rebalance service 负载均衡
                        this.rebalanceService.start();
                        //启动内部默认的生产者,用于消费者SendMessageBack,但不会执行MQClientInstance.start(),也就是当前方法不会被执行
                        this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                        log.info("the client factory [{}] start OK", this.clientId);
                        this.serviceState = ServiceState.RUNNING;
                        break;
                    case RUNNING:
                        break;
                    case SHUTDOWN_ALREADY:
                        break;
                    case START_FAILED:
                        throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                    default:
                        break;
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    负载均衡

    this.rebalanceService.start()线程开启:

    	public void start() {
            this.thread.start();
        }
    
    • 1
    • 2
    • 3
    	/**
         * consumer负载均衡线程服务
         */
        @SuppressWarnings("SpellCheckingInspection")
        private final RebalanceService rebalanceService;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    查看该类run()方法实现:

    重点每隔20秒重新负载均衡

    this.mqClientFactory.doRebalance();
    
    • 1
    /**
     * Rebalance Service
     * consumer负载均衡线程服务
     */
    public class RebalanceService extends ServiceThread {
    
        /**
         * 等待间隔,单位:毫秒
         */
        private static long waitInterval =
            Long.parseLong(System.getProperty(
                "rocketmq.client.rebalance.waitInterval", "20000"));
    
        private final Logger log = ClientLogger.getLog();
        /**
         * MQClient对象
         */
        private final MQClientInstance mqClientFactory;
    
        public RebalanceService(MQClientInstance mqClientFactory) {
            this.mqClientFactory = mqClientFactory;
        }
    
        @Override
        public void run() {
            log.info(this.getServiceName() + " service started");
    
            while (!this.isStopped()) {
                //每等待20S执行一次负载均衡
                this.waitForRunning(waitInterval);
                this.mqClientFactory.doRebalance();
            }
    
            log.info(this.getServiceName() + " service end");
        }
    
        @Override
        public String getServiceName() {
            return RebalanceService.class.getSimpleName();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    每隔20秒重新负载均衡

    查看mqClientFactory.doRebalance()方法

    	/**
         * 消费者进行平衡
         */
        public void doRebalance() {
            for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
                MQConsumerInner impl = entry.getValue();
                if (impl != null) {
                    try {
                        impl.doRebalance();
                    } catch (Throwable e) {
                        log.error("doRebalance exception", e);
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    查看mqClientFactory.doRebalance()方法实现

    	@Override
        public void doRebalance() {
            if (this.rebalanceImpl != null) {
                this.rebalanceImpl.doRebalance(false);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    执行分配消费队列

    	/**
         * 执行分配消费队列
         *
         * @param isOrder 是否顺序消息
         */
        public void doRebalance(final boolean isOrder) {
            // 分配每个 topic 的消息队列
            Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
            if (subTable != null) {
                for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                    final String topic = entry.getKey();
                    try {
                        this.rebalanceByTopic(topic, isOrder);
                    } catch (Throwable e) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("rebalanceByTopic Exception", e);
                        }
                    }
                }
            }
            // 移除未订阅的topic对应的消息队列
            this.truncateMessageQueueNotMyTopic();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    默认集群消费

    查看 this.rebalanceByTopic(topic, isOrder)实现:

    	/**
         * 消费者对 单个Topic 重新进行平衡
         *
         * @param topic   Topic
         * @param isOrder 是否顺序
         */
        private void rebalanceByTopic(final String topic, final boolean isOrder) {
            switch (messageModel) {
                case BROADCASTING: {  //广播模式,每条消息被同一组的所有消费者均消费
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    if (mqSet != null) {
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                        if (changed) {
                            this.messageQueueChanged(topic, mqSet, mqSet);
                            log.info("messageQueueChanged {} {} {} {}", //
                                consumerGroup, //
                                topic, //
                                mqSet, //
                                mqSet);
                        }
                    } else {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                    break;
                }
                case CLUSTERING: {     //默认是集群模式,每条消息被同一消费者组的一个消费,
                    // 获取 topic 对应的 队列 和 consumer信息
                    Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                    List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                    if (null == mqSet) {
                        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                            log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                        }
                    }
    
                    if (null == cidAll) {
                        log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                    }
    
                    if (mqSet != null && cidAll != null) {
                        // 排序 消费队列 和 消费者数组。因为是在Client进行分配队列,排序后,各Client的顺序才能保持一致。
                        List<MessageQueue> mqAll = new ArrayList<>();
                        mqAll.addAll(mqSet);
    
                        Collections.sort(mqAll);
                        Collections.sort(cidAll);
    
                        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;   //AllocateMessageQueueAveragely
    
                        // 根据 队列分配策略 分配消费队列
                        List<MessageQueue> allocateResult;
                        try {
                            allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
                        } catch (Throwable e) {
                            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e);
                            return;
                        }
    
                        Set<MessageQueue> allocateResultSet = new HashSet<>();
                        if (allocateResult != null) {
                            allocateResultSet.addAll(allocateResult);
                        }
    
                        // 更新消费队列
                        boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                        if (changed) {
                            log.info(
                                "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, "
                                    + "rebalanceResultSize={}, rebalanceResultSet={}",
                                strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                                allocateResultSet.size(), allocateResultSet);
                            this.messageQueueChanged(topic, mqSet, allocateResultSet);
                        }
                    }
                    break;
                }
                default:
                    break;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80

    集群消费方法算法

     allocateResult = strategy.allocate(this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll);
    
    • 1

    平均分配算法

    /**
     * Average Hashing queue algorithm
     * 队列分配策略 - 平均分配
     * 如果 队列数 和 消费者数量 相除有余数时,余数按照顺序"1"个"1"个分配消费者。
     * 例如,5个队列,3个消费者时,分配如下:
     * - 消费者0:[0, 1] 2个队列
     * - 消费者1:[2, 3] 2个队列
     * - 消费者2:[4, 4] 1个队列
     *
     * 代码块 (mod > 0 && index < mod) 判断即在处理相除有余数的情况。
     */
    public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
    
        private final Logger log = ClientLogger.getLog();
    
        @Override
        public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {
            // 校验参数是否正确
            if (currentCID == null || currentCID.length() < 1) {
                throw new IllegalArgumentException("currentCID is empty");
            }
            if (mqAll == null || mqAll.isEmpty()) {
                throw new IllegalArgumentException("mqAll is null or mqAll empty");
            }
            if (cidAll == null || cidAll.isEmpty()) {
                throw new IllegalArgumentException("cidAll is null or cidAll empty");
            }
    
            List<MessageQueue> result = new ArrayList<>();
            if (!cidAll.contains(currentCID)) {
                log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                    consumerGroup,
                    currentCID,
                    cidAll);
                return result;
            }
            // 平均分配
            int index = cidAll.indexOf(currentCID); // 第几个consumer。
            int mod = mqAll.size() % cidAll.size(); // 余数,即多少消息队列无法平均分配。
    
            //队列总数 <= 消费者总数时,分配当前消费者1个队列
            //不能均分 &&  当前消费者序号(从0开始) < 余下的队列数 ,分配当前消费者 mqAll / cidAll +1 个队列
            //不能均分 &&  当前消费者序号(从0开始) >= 余下的队列数 ,分配当前消费者 mqAll / cidAll 个队列
            int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
    
            int startIndex = (mod > 0 && index < mod) ? index * averageSize
                : index * averageSize + mod; // 有余数的情况下,[0, mod) 平分余数,即每consumer多分配一个节点;第index开始,跳过前mod余数。
            int range = Math.min(averageSize, mqAll.size() - startIndex); // 分配队列数量。之所以要Math.min()的原因是,mqAll.size() <= cidAll.size(),部分consumer分配不到消费队列。
            for (int i = 0; i < range; i++) {
                result.add(mqAll.get((startIndex + i) % mqAll.size()));
            }
            return result;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    环形平均分配

    /**
     * Cycle average Hashing queue algorithm
     * 队列分配策略 - 环状分配
     */
    public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
        private final Logger log = ClientLogger.getLog();
    
        @Override
        public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
            List<String> cidAll) {
            // 校验参数是否正确
            if (currentCID == null || currentCID.length() < 1) {
                throw new IllegalArgumentException("currentCID is empty");
            }
            if (mqAll == null || mqAll.isEmpty()) {
                throw new IllegalArgumentException("mqAll is null or mqAll empty");
            }
            if (cidAll == null || cidAll.isEmpty()) {
                throw new IllegalArgumentException("cidAll is null or cidAll empty");
            }
    
            List<MessageQueue> result = new ArrayList<MessageQueue>();
            if (!cidAll.contains(currentCID)) {
                log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
                    consumerGroup,
                    currentCID,
                    cidAll);
                return result;
            }
    
            // 环状分配
            int index = cidAll.indexOf(currentCID);
            for (int i = index; i < mqAll.size(); i++) {
                if (i % cidAll.size() == index) {
                    result.add(mqAll.get(i));
                }
            }
            return result;
        }
    
        @Override
        public String getName() {
            return "AVG_BY_CIRCLE";
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    机房broker健康算法

    /**
     * Computer room Hashing queue algorithm, such as Alipay logic room
     * 分配策略 - 首先筛选可消费Broker对应的消费队列,筛选后的消费队列平均分配。
     * TODO 疑问:Broker、Consumer如何配置
     */
    public class AllocateMessageQueueByMachineRoom implements AllocateMessageQueueStrategy {
        /**
         * 消费者消费brokerName集合
         */
        private Set<String> consumeridcs;
    
        @Override
        public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
            List<String> cidAll) {
            // 参数校验
            List<MessageQueue> result = new ArrayList<MessageQueue>();
            int currentIndex = cidAll.indexOf(currentCID);
            if (currentIndex < 0) {
                return result;
            }
            // 计算符合当前配置的消费者数组('consumeridcs')对应的消费队列
            List<MessageQueue> premqAll = new ArrayList<MessageQueue>();
            for (MessageQueue mq : mqAll) {
                String[] temp = mq.getBrokerName().split("@");
                if (temp.length == 2 && consumeridcs.contains(temp[0])) {
                    premqAll.add(mq);
                }
            }
            // 平均分配
            int mod = premqAll.size() / cidAll.size();
            int rem = premqAll.size() % cidAll.size();
            int startIndex = mod * currentIndex;
            int endIndex = startIndex + mod;
            for (int i = startIndex; i < endIndex; i++) {
                result.add(mqAll.get(i));
            }
            if (rem > currentIndex) {
                result.add(premqAll.get(currentIndex + mod * cidAll.size()));
            }
            return result;
        }
    
        @Override
        public String getName() {
            return "MACHINE_ROOM";
        }
    
        public Set<String> getConsumeridcs() {
            return consumeridcs;
        }
    
        public void setConsumeridcs(Set<String> consumeridcs) {
            this.consumeridcs = consumeridcs;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    消费者路由

    在这里插入图片描述
    注意:消费者也必须和namesvr保存长连接,consumer从namesvr每隔30s获取topic路由信息。从而根据负载策略进行消费的消费。队列(ip)和broker对应关系。

  • 相关阅读:
    【一文带你详细学习RocketMQ存储设计方案、RocketMQ中消息文件存储结构、过期文件删除机制、零拷贝与MMAP内存映射】
    软件测试 -- 入门 4 软件测试原则
    开始MySQL之路——MySQL三大日志(binlog、redo log和undo log)概述详解
    UVA-1602 网格动物 题解答案代码 算法竞赛入门经典第二版
    01 【基础语法与基本数据类型】
    一篇博文,带你入门数据库SQL语言
    SpringBoot使用RabbitMQ实现延迟队列
    cpp primer笔记090-动态内存
    计算机网络_03_tcp/ip四层模型
    Java基础-day05
  • 原文地址:https://blog.csdn.net/Miaoshuowen/article/details/126397153