• 聊聊RocketMQMessageListener的实现机制


    本文主要研究一下RocketMQMessageListener的实现机制

    示例

    @Service
    @RocketMQMessageListener(nameServer = "${demo.rocketmq.myNameServer}", topic = "${demo.rocketmq.topic.user}", consumerGroup = "user_consumer")
    public class UserConsumer implements RocketMQListener {
        @Override
        public void onMessage(User message) {
    
            System.out.printf("######## user_consumer received: %s ; age: %s ; name: %s \n", message, message.getUserAge(), message.getUserName());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    实现了RocketMQListener接口的类,再配合@RocketMQMessageListener注解就可以实现对rocketmq消息的消费

    RocketMQListener

    rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQListener.java

    public interface RocketMQListener {
        void onMessage(T message);
    }
    
    • 1
    • 2
    • 3

    RocketMQListener接口定义了onMessage方法

    RocketMQMessageListener

    rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface RocketMQMessageListener {
    
        String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
        String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
        String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
        String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
        String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
    
        /**
         * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
         * load balance. It's required and needs to be globally unique.
         *
         *
         * See here for further discussion.
         */
        String consumerGroup();
    
        /**
         * Topic name.
         */
        String topic();
    
        /**
         * Control how to selector message.
         *
         * @see SelectorType
         */
        SelectorType selectorType() default SelectorType.TAG;
    
        /**
         * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
         */
        String selectorExpression() default "*";
    
        /**
         * Control consume mode, you can choice receive message concurrently or orderly.
         */
        ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    
        /**
         * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
         */
        MessageModel messageModel() default MessageModel.CLUSTERING;
    
        /**
         * Max consumer thread number.
         * This property control consumer thread pool executor maximumPoolSize see
         * {@link ConsumeMessageService#updateCorePoolSize(int)}
         * @see issues#546
         */
        int consumeThreadMax() default 64;
    
        /**
         * consumer thread number.
         */
        int consumeThreadNumber() default 20;
    
        /**
         * Max re-consume times.
         *
         * In concurrently mode, -1 means 16;
         * In orderly mode, -1 means Integer.MAX_VALUE.
         */
        int maxReconsumeTimes() default -1;
    
        /**
         * Maximum amount of time in minutes a message may block the consuming thread.
         */
        long consumeTimeout() default 15L;
    
        /**
         * Timeout for sending reply messages.
         */
        int replyTimeout() default 3000;
    
        /**
         * The property of "access-key".
         */
        String accessKey() default ACCESS_KEY_PLACEHOLDER;
    
        /**
         * The property of "secret-key".
         */
        String secretKey() default SECRET_KEY_PLACEHOLDER;
    
        /**
         * Switch flag instance for message trace.
         */
        boolean enableMsgTrace() default false;
    
        /**
         * The name value of message trace topic.If you don't config,you can use the default trace topic name.
         */
        String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
    
        /**
         * The property of "name-server".
         */
        String nameServer() default NAME_SERVER_PLACEHOLDER;
    
        /**
         * The property of "access-channel".
         */
        String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
     
        /**
         * The property of "tlsEnable" default false.
         */
        String tlsEnable() default "false";
    
        /**
         * The namespace of consumer.
         */
        String namespace() default "";
    
        /**
         * Message consume retry strategy in concurrently mode.
         *
         * -1,no retry,put into DLQ directly
         * 0,broker control retry frequency
         * >0,client control retry frequency
         */
        int delayLevelWhenNextConsume() default 0;
    
        /**
         * The interval of suspending the pull in orderly mode, in milliseconds.
         *
         * The minimum value is 10 and the maximum is 30000.
         */
        int suspendCurrentQueueTimeMillis() default 1000;
    
        /**
         * Maximum time to await message consuming when shutdown consumer, in milliseconds.
         * The minimum value is 0
         */
        int awaitTerminationMillisWhenShutdown() default 1000;
    
        /**
         * The property of "instanceName".
         */
        String instanceName() default "DEFAULT";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145

    RocketMQMessageListener注解定义了consumerGroup、topic、selectorType、selectorExpression、consumeMode、messageModel、consumeThreadMax、consumeThreadNumber、maxReconsumeTimes、consumeTimeout、replyTimeout、accessKey、secretKey、enableMsgTrace、customizedTraceTopic、nameServer、accessChannel、tlsEnable、namespace、delayLevelWhenNextConsume、suspendCurrentQueueTimeMillis、awaitTerminationMillisWhenShutdown、instanceName属性

    RocketMQMessageListenerBeanPostProcessor

    rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListenerBeanPostProcessor.java

    public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {
    
        private ApplicationContext applicationContext;
    
        private AnnotationEnhancer enhancer;
    
        private ListenerContainerConfiguration listenerContainerConfiguration;
    
        @Override
        public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
            return bean;
        }
    
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            Class targetClass = AopUtils.getTargetClass(bean);
            RocketMQMessageListener ann = targetClass.getAnnotation(RocketMQMessageListener.class);
            if (ann != null) {
                RocketMQMessageListener enhance = enhance(targetClass, ann);
                if (listenerContainerConfiguration != null) {
                    listenerContainerConfiguration.registerContainer(beanName, bean, enhance);
                }
            }
            return bean;
        }
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            buildEnhancer();
            this.listenerContainerConfiguration = this.applicationContext.getBean(ListenerContainerConfiguration.class);
        }
    
        private void buildEnhancer() {
            if (this.applicationContext != null) {
                Map enhancersMap =
                        this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false);
                if (enhancersMap.size() > 0) {
                    List enhancers = enhancersMap.values()
                            .stream()
                            .sorted(new OrderComparator())
                            .collect(Collectors.toList());
                    this.enhancer = (attrs, element) -> {
                        Map newAttrs = attrs;
                        for (AnnotationEnhancer enh : enhancers) {
                            newAttrs = enh.apply(newAttrs, element);
                        }
                        return attrs;
                    };
                }
            }
        }
    
        private RocketMQMessageListener enhance(AnnotatedElement element, RocketMQMessageListener ann) {
            if (this.enhancer == null) {
                return ann;
            } else {
                return AnnotationUtils.synthesizeAnnotation(
                        this.enhancer.apply(AnnotationUtils.getAnnotationAttributes(ann), element), RocketMQMessageListener.class, null);
            }
        }
    
        public interface AnnotationEnhancer extends BiFunction, AnnotatedElement, Map> {
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    RocketMQMessageListenerBeanPostProcessor实现了ApplicationContextAware、BeanPostProcessor、InitializingBean接口,其中postProcessAfterInitialization方法判断bean有没有RocketMQMessageListener注解,如果有的话则通过enhance进行增强,另外会通过listenerContainerConfiguration的registerContainer进行注册
    其afterPropertiesSet方法会执行buildEnhancer方法,该方法会获取AnnotationEnhancer的bean实例,然后排序好,最后构造AnnotationEnhancer,其作用就是把这些enhancers挨个apply上去

    registerContainer

    rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java

        public void registerContainer(String beanName, Object bean, RocketMQMessageListener annotation) {
            Class clazz = AopProxyUtils.ultimateTargetClass(bean);
    
            if (RocketMQListener.class.isAssignableFrom(bean.getClass()) && RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " cannot be both instance of " + RocketMQListener.class.getName() + " and " + RocketMQReplyListener.class.getName());
            }
    
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass()) && !RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
                throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName() + " or " + RocketMQReplyListener.class.getName());
            }
    
            String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
            String topic = this.environment.resolvePlaceholders(annotation.topic());
    
            boolean listenerEnabled =
                (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                    .getOrDefault(topic, true);
    
            if (!listenerEnabled) {
                log.debug(
                    "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                    consumerGroup, topic);
                return;
            }
            validate(annotation);
    
            String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                counter.incrementAndGet());
            GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
    
            genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
                () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
            DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                DefaultRocketMQListenerContainer.class);
            if (!container.isRunning()) {
                try {
                    container.start();
                } catch (Exception e) {
                    log.error("Started container failed. {}", container, e);
                    throw new RuntimeException(e);
                }
            }
    
            log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    ListenerContainerConfiguration的registerContainer方法会根据注解信息及对应的bean构造DefaultRocketMQListenerContainer并注册到genericApplicationContext,同时执行其start方法

    DefaultRocketMQListenerContainer

    rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

    public class DefaultRocketMQListenerContainer implements InitializingBean,
        RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
    
        private DefaultMQPushConsumer consumer;
    
        private RocketMQListener rocketMQListener;
    
        @Override
        public void start() {
            if (this.isRunning()) {
                throw new IllegalStateException("container already running. " + this.toString());
            }
    
            try {
                consumer.start();
            } catch (MQClientException e) {
                throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
            }
            this.setRunning(true);
    
            log.info("running container: {}", this.toString());
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            initRocketMQPushConsumer();
    
            this.messageType = getMessageType();
            this.methodParameter = getMethodParameter();
            log.debug("RocketMQ messageType: {}", messageType);
        }
    
    	private void initRocketMQPushConsumer() throws MQClientException {
            if (rocketMQListener == null && rocketMQReplyListener == null) {
                throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
            }
            Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
            Assert.notNull(nameServer, "Property 'nameServer' is required");
            Assert.notNull(topic, "Property 'topic' is required");
    
            RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(),
                this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
            boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace();
            if (Objects.nonNull(rpcHook)) {
                consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                    enableMsgTrace, this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
                consumer.setVipChannelEnabled(false);
            } else {
                log.debug("Access-key or secret-key not configure in " + this + ".");
                consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                    this.applicationContext.getEnvironment().
                        resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            }
            consumer.setNamespace(namespace);
    
            String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
            if (customizedNameServer != null) {
                consumer.setNamesrvAddr(customizedNameServer);
            } else {
                consumer.setNamesrvAddr(nameServer);
            }
            if (accessChannel != null) {
                consumer.setAccessChannel(accessChannel);
            }
    
            consumer.setConsumeThreadMax(consumeThreadMax);
            consumer.setConsumeThreadMin(consumeThreadNumber);
            consumer.setConsumeTimeout(consumeTimeout);
            consumer.setMaxReconsumeTimes(maxReconsumeTimes);
            consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
            consumer.setInstanceName(instanceName);
            switch (messageModel) {
                case BROADCASTING:
                    consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING);
                    break;
                case CLUSTERING:
                    consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.CLUSTERING);
                    break;
                default:
                    throw new IllegalArgumentException("Property 'messageModel' was wrong.");
            }
    
            switch (selectorType) {
                case TAG:
                    consumer.subscribe(topic, selectorExpression);
                    break;
                case SQL92:
                    consumer.subscribe(topic, MessageSelector.bySql(selectorExpression));
                    break;
                default:
                    throw new IllegalArgumentException("Property 'selectorType' was wrong.");
            }
    
            switch (consumeMode) {
                case ORDERLY:
                    consumer.setMessageListener(new DefaultMessageListenerOrderly());
                    break;
                case CONCURRENTLY:
                    consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                    break;
                default:
                    throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
            }
    
            //if String is not is equal "true" TLS mode will represent the as default value false
            consumer.setUseTLS(new Boolean(tlsEnable));
    
            if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
                ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
            } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
                ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
            }
    
        }
        //......
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117

    DefaultRocketMQListenerContainer的start方法(SmartLifecycle)执行consumer的start方法;其afterPropertiesSet方法(InitializingBean)会执行initRocketMQPushConsumer方法来创建consumer
    initRocketMQPushConsumer方法主要是创建DefaultMQPushConsumer,设置messageModel,根据selectorType执行subscribe方法,根据consumeMode来设置messageListener(DefaultMessageListenerOrderly或者DefaultMessageListenerConcurrently),最后针对RocketMQPushConsumerLifecycleListener执行prepareStart方法

    DefaultMessageListenerOrderly

    rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

        public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
    
            @SuppressWarnings("unchecked")
            @Override
            public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
                for (MessageExt messageExt : msgs) {
                    log.debug("received msg: {}", messageExt);
                    try {
                        long now = System.currentTimeMillis();
                        handleMessage(messageExt);
                        long costTime = System.currentTimeMillis() - now;
                        log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                    } catch (Exception e) {
                        log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                        context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
    
                return ConsumeOrderlyStatus.SUCCESS;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    DefaultMessageListenerOrderly实现了MessageListenerOrderly的consumeMessage接口,它内部遍历msgs,挨个执行handleMessage,有异常的话则设置context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis),然后返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT

    DefaultMessageListenerConcurrently

    rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java

        public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {
    
            @SuppressWarnings("unchecked")
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt messageExt : msgs) {
                    log.debug("received msg: {}", messageExt);
                    try {
                        long now = System.currentTimeMillis();
                        handleMessage(messageExt);
                        long costTime = System.currentTimeMillis() - now;
                        log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                    } catch (Exception e) {
                        log.warn("consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}", messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), e);
                        context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
    
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    DefaultMessageListenerConcurrently实现了MessageListenerConcurrently接口的consumeMessage方法,它内部遍历msgs,挨个执行handleMessage,有异常的话设置context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume),然后返回ConsumeConcurrentlyStatus.RECONSUME_LATER

    handleMessage

        private void handleMessage(
            MessageExt messageExt) throws MQClientException, RemotingException, InterruptedException {
            if (rocketMQListener != null) {
                rocketMQListener.onMessage(doConvertMessage(messageExt));
            } else if (rocketMQReplyListener != null) {
                Object replyContent = rocketMQReplyListener.onMessage(doConvertMessage(messageExt));
                Message message = MessageBuilder.withPayload(replyContent).build();
    
                org.apache.rocketmq.common.message.Message replyMessage = MessageUtil.createReplyMessage(messageExt, convertToBytes(message));
                DefaultMQProducer producer = consumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer();
                producer.setSendMsgTimeout(replyTimeout);
                producer.send(replyMessage, new SendCallback() {
                    @Override public void onSuccess(SendResult sendResult) {
                        if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                            log.error("Consumer replies message failed. SendStatus: {}", sendResult.getSendStatus());
                        } else {
                            log.debug("Consumer replies message success.");
                        }
                    }
    
                    @Override public void onException(Throwable e) {
                        log.error("Consumer replies message failed. error: {}", e.getLocalizedMessage());
                    }
                });
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    handleMessage方法则是委托给了rocketMQListener.onMessage(doConvertMessage(messageExt)),即回调业务自定义的RocketMQListener

    start

    org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java

        /**
         * This method gets internal infrastructure readily to serve. Instances must call this method after configuration.
         *
         * @throws MQClientException if there is any client error.
         */
        @Override
        public void start() throws MQClientException {
            setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
            this.defaultMQPushConsumerImpl.start();
            if (null != traceDispatcher) {
                try {
                    traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
                } catch (MQClientException e) {
                    log.warn("trace dispatcher start failed ", e);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    DefaultMQPushConsumer的start方法先执行setConsumerGroup,然后委托给了defaultMQPushConsumerImpl.start(),如果有traceDispatcher的话,则执行traceDispatcher.start方法
    defaultMQPushConsumerImpl.start()方法会触发MQClientInstance.start(),后者会触发pullMessageService.start()以及rebalanceService.start()(会在doRebalance的时候触发defaultMQPushConsumerImpl.executePullRequestImmediately,往pullRequestQueue放pullRequest),前者会从pullRequestQueue获取pullRequest然后执行DefaultMQPushConsumerImpl.pullMessage方法,里头是执行pullAPIWrapper.pullKernelImpl,然后通过pullCallback往processQueue.putMessage,再触发consumeMessageService.submitConsumeRequest,它会回调listener.consumeMessage来消费消息

    小结

    RocketMQMessageListenerBeanPostProcessor实现了ApplicationContextAware、BeanPostProcessor、InitializingBean接口,其中postProcessAfterInitialization方法判断bean有没有RocketMQMessageListener注解,如果有的话则通过enhance进行增强,另外会通过listenerContainerConfiguration的registerContainer进行注册

    ListenerContainerConfiguration的registerContainer方法会根据注解信息及对应的bean构造DefaultRocketMQListenerContainer并注册到genericApplicationContext,同时执行其start方法
    DefaultRocketMQListenerContainer的start方法(SmartLifecycle)执行consumer的start方法;其afterPropertiesSet方法(InitializingBean)会执行initRocketMQPushConsumer方法来创建consumer
    start方法主要是执行pullMessageService.start()以及rebalanceService.start(),前者负责从pullRequestQueue获取pullRequest然后拉取消息放到processQueue,然后触发回调listener.consumeMessage来消费消息;后者负责rebalance,一开始会触发defaultMQPushConsumerImpl.executePullRequestImmediately,即往pullRequestQueue放pullRequest
    pushConsumer本质上还是基于pull的模式来的,从RocketMQMessageListenerBeanPostProcessor --> DefaultRocketMQListenerContainer.start --> DefaultMQPushConsumer.start --> defaultMQPushConsumerImpl.start() --> pullMessageService.start()以及rebalanceService.start()

  • 相关阅读:
    日常工作中需要避免的9个React坏习惯
    被裁员一个月后,我被面试官的一套性能优化面试题给问自闭了
    u盘文件突然不见了怎么样才能恢复呢?
    C#演示单例模式
    大规模语言模型高效参数微调--BitFit/Prefix/Prompt 微调系列
    ExtJS 教程汇总
    VUE学习二:事件监听(v-on)、条件判断(v-if/v-else-if/v-else)、循环遍历(v-for)
    朗强科技:HDMI网线延长器有什么优势
    SpringBoot 整合WebService
    Kafka系列之二Docker集群安装运行
  • 原文地址:https://blog.csdn.net/hello_ejb3/article/details/133965604