• RocketMQ广播消费本地Offset文件丢失问题探秘



    ​ 今天本来在用RocketMQ做一个大的功能改造,中间有个小问题随意跟了下源码,突然还发现一个小BUG。一通跟踪调试,虽然最后还是没有解决问题,但是受益匪浅。记录一下。

    一、问题出发点

    ​ 我们知道RocketMQ的消费者有两种消费模式MessageModel,一种是集群消费,一种是广播消费。这两种消费机制最本质的区别在于,集群消费是在Broker端保存各个ConsumeGroup的消费进度Offset,而广播消费则是在Consumer本地记录消费进度Offset。

    ​ 集群消费用得比较多,之前也认真跟过源码。但是广播消费因为用得比较少,所以基本也跟你一样,背过一点八股文,也就没有太过关注。但是,今天一个偶然的业务场景用到了广播消费。当我想要去本地看一下Offset的记录时,却发现怎么也找不到。这到底是我哪个业务配置配错了?还是万能的八股文欺骗了我?这背后藏着什么样的秘密?

    问题1:Offset的本地存储的目录如何进行定制?

    ​ 在RocketMQ的原生API中,提供了DefaultLitePullConsumer和DefaultMQPushConsumer(还有一个DefaultMQPullConsumer已经过时了)。在这两个消费实例中,都可以设定一个MessageModel属性,这个属性有两个枚举值,MessageModel.BROADCASTING(广播消费)和MessageModel.CLUSTERING(集群消费)。对于广播消费的消费者,默认会在消费者程序所在的机器本地的${user.home}/rocketmq_offset/${clientIp}@DEFAULT/${group}/offset.json文件中保存消费进度。

    其中$部分问变量。

    ${user.home}为系统所属用户目录。在windows下默认是C:\Users\${用户名}。

    ${clientIp}是消费者端的IP地址。

    ${group}是消费者指定的所属消费者组。

    ​ 这里就有了第一个问题:这个本地存储的目录如何进行定制?

    问题2:rocketmq-spring-boot-starter插件如何配置这个本地存储目录?

    ​ 在使用RocketMQ进行业务开发时,经常会用到SpringBoott框架来整合RocketMQ的客户端。于是,会使用到rocketmq提供的下面的Maven依赖包。

    <dependency>
        <groupId>org.apache.rocketmqgroupId>
        <artifactId>rocketmq-spring-boot-starterartifactId>
        <version>2.2.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ​ 然后,就可以使用配置或注解的方式来声明客户端,而不用再去接触RocketMQ的原生消费者API了。例如

    @Component
    @RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",messageModel= MessageModel.BROADCASTING)
    public class SpringConsumer implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            System.out.println("Received message : "+ message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ​ 工作起来没有问题,但是,当你沿着上面的思路,想要去本地找一找这个消费者的本地缓存时,你会发现一个问题,找不到本地Offset文件(不要怀疑,如果你是用windows开发,一定找不到)。

    ​ 于是,我开始冒出另外的两个问题:**问题2:rocketmq-spring-boot-starter中有几种声明消费者的方式?**是不是我的使用方式不对?**问题3:rocketmq-spring-boot-starter中是如何指定本地目录地址的?**要怎么找到本地缓存?

    什么?你还不会用RocketMQ,不知道我在说什么?试试看看我的RocketMQ教程。顺便支持鼓励一下。

    二、问题原因分析

    ​ 查问题的时候是忙忙碌碌瞎查的,现在回顾问题,就把思路整理一下,也便于你理解。

    1、RocketMQ中是如何记录广播消费的本地Offset的?

    ​ 原生的RocketMQ消费者API是基础,所以这个问题必须先梳理清楚。 以下分析RocketMQ源码用的是4.9.1版本

    ​ 原生的RocketMQ提供了两个消费者对象,DefaultLitePullConsumer和DefaultMQPushConsumer(还有一个DefaultMQPullConsumer已经过时了)。在这两个消费实例中,都可以设定一个MessageModel属性,这个属性有两个枚举值,MessageModel.BROADCASTING和MessageModel.CLUSTERING。这个属性是怎么跟Offset文件存储对应上的呢?其实从服务的启动过程中就能很清晰的跟踪到。

    1.1、看看DefaultMQPushConsumer。

    他的start()启动方法中,会启动一个defaultMQPushConsumerImpl实例,而这个defaultMQPushConsumerImpl示例会调用一个同步的start()方法。这个start()是同步的,针对Offset文件存盘问题,是为了保证存盘的Offset文件,在内存中只有一份统一的副本。带着今天的问题,可以在这个start()方法中找到一段很刺眼的代码:

    if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                        this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                    } else {
                        //从这里可以看出,广播模式与集群模式的最本质区别就是offset存储的地方不一样。
                        switch (this.defaultMQPushConsumer.getMessageModel()) {
                            //广播模式是在消费者本地存储offset
                            case BROADCASTING:
                                this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            //集群模式是在Broker远端存储offset
                            case CLUSTERING:
                                this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                                break;
                            default:
                                break;
                        }
                        this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    ​ 这里就能看到广播模式和集群模式在Offset存储上的区别。

    1.2:再来看看DefaultLitePullConsumer。

    在他的启动过程中,类似的会创建一个defaultLitePullConsumerImpl对象。而在defaultLitePullConsumerImpl对象的启动过程中,会调用到一个initOffsetStore()方法。这个方法里的实现也是一样的刺眼:

        private void initOffsetStore() throws MQClientException {
            if (this.defaultLitePullConsumer.getOffsetStore() != null) {
                this.offsetStore = this.defaultLitePullConsumer.getOffsetStore();
            } else {
                switch (this.defaultLitePullConsumer.getMessageModel()) {
                    case BROADCASTING:
                        this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
                        break;
                    case CLUSTERING:
                        this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultLitePullConsumer.getConsumerGroup());
                        break;
                    default:
                        break;
                }
                this.defaultLitePullConsumer.setOffsetStore(this.offsetStore);
            }
            this.offsetStore.load();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    ​ 所以,问题自然就集中到了这个LocalFileOffsetStore中了。

    1.3:看LocalFileOffsetStore是如何指定存储路径的

    RocketMQ的源码中采用了充血模型的实现方式,所有关于LocalFileOffset的业务动作,都集中到了LocalFileOffsetStore.java当中。在他的构造方法当中就直接维护了一个storePath属性来维护本地存储地址。

    public class LocalFileOffsetStore implements OffsetStore {
        //本地存储目录
        public final static String LOCAL_OFFSET_STORE_DIR = System.getProperty(
            "rocketmq.client.localOffsetStoreDir",
            System.getProperty("user.home") + File.separator + ".rocketmq_offsets");
        .....
        public LocalFileOffsetStore(MQClientInstance mQClientFactory, String groupName) {
            .....
            //本地存储文件
            this.storePath = LOCAL_OFFSET_STORE_DIR + File.separator +
                this.mQClientFactory.getClientId() + File.separator +
                this.groupName + File.separator +
                "offsets.json";
        }
        .....
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    ​ 先看LOCAL_OFFSET_STORE_DIR,这是本地存储的根目录。跟rocketmq.client.localOffsetStoreDir这个系统属性有关,**所以要定制本地存储的目录,只需要设定rocketmq.client.localOffsetStoreDir系统属性即可。**而这个系统属性还没有支持前端配置,所以,修改的方式,只能是在应用启动时手动进行指定。 例如 System.setProperty(“rocketmq.client.localOffsetStoreDir”,“D:/.rockemtq_offset”)。

    ​ 然后,再来看offsets.json的具体存储路径storePath。看这个结构,就跟最开始直接指出的结论对应上了。但是这样其中还有一个变量,就是clinetId。这个属性是如何指定的呢?是不是可以定制呢?

    ​ 这里面的mQClientFactory是一个MQClientInstance的实例对象。而MQClientInstance则是RocketMQ中对所有客户端的抽象。生产者和消费者最终都会交由MQClientInstance进行统一的服务启动。在MQClientInstance的start()方法中给所有的客户端定义了一个统一的启动标准,不同的客户端只要按照标准去注册不同的信息即可。而ClientId就是MQClientInstance在初始化的过程中指定的一个属性(初始化过程见MQClientManager.java#getOrCreateMQClientInstance方法)。ClientId的具体生成逻辑则是在ClientConfig对象中的buildMQClientId方法中。

    //指定instanceName
    private String instanceName = System.getProperty("rocketmq.client.name", "DEFAULT");
    //构建clientId
    public String buildMQClientId() {
            StringBuilder sb = new StringBuilder();
            sb.append(this.getClientIP());
    
            sb.append("@");
            sb.append(this.getInstanceName());
            if (!UtilAll.isBlank(this.unitName)) {
                sb.append("@");
                sb.append(this.unitName);
            }
            return sb.toString();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    ​ 这里面又引出了一个变量instanceName,这个变量对于广播消息的本地Offset存储路径起了很重要的作用。

    • 这个instanceName,默认值是DEFAULT。

    • 可以由系统属性rocketmq.client.name来替代这个默认值,这个系统属性依然是没有配置属性定制的,只能手动修改。

    • 这个instanceName在ClientConfig中还有对应的setter方法。可以由消费者客户端自行指定。而常用的RocketMQ客户端都是ClientConfig的子类,所以,他们都可以通过setter方法定制instanceName。
      在这里插入图片描述

    这个unitMode和unitName不太清楚具体干嘛的,unitMode默认是false,然后在源码中搜索了一下,好像也就在ClientConfig配置本地存储目录时用到了,在核心业务中并没有太多的作用。

    github仓库中只有一条关于unitMode的issue,回复也是目前并没有做什么实际的工作。参见https://github.com/apache/rocketmq/issues/639

    在这里插入图片描述

    1.4:Offsets.json文件何时写入?

    ​ 基于充血模型设计的好处在于,系统内的所有核心业务能力都在充血实体中统一整理,不需要去其他地方到处乱搜。对于Offsets.json文件的核心写入能力,也一起体现在了LocalFileOffsetStore类中。在LocalFileOffsetStore类中,有一个persistAll方法,实现了文件的写入。

    public void persistAll(Set<MessageQueue> mqs) {
            if (null == mqs || mqs.isEmpty())
                return;
    
            OffsetSerializeWrapper offsetSerializeWrapper = new OffsetSerializeWrapper();
            for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
                if (mqs.contains(entry.getKey())) {
                    AtomicLong offset = entry.getValue();
                    offsetSerializeWrapper.getOffsetTable().put(entry.getKey(), offset);
                }
            }
    
            String jsonString = offsetSerializeWrapper.toJson(true);
            if (jsonString != null) {
                try {
                    MixAll.string2File(jsonString, this.storePath);
                } catch (IOException e) {
                    log.error("persistAll consumer offset Exception, " + this.storePath, e);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    ​ 在这个方法中可以看到,如果offsets.json文件写入失败,RocketMQ只是记录一条log日志就没事了,甚至连异常都没有往外抛。这意味着如果广播消息本地的offsets.json进度没有更新,RocketMQ不会做任何的补救措施。

    ​ 不过,RocketMQ的客户端会启动一个线程,不断的尝试将这些offset偏移信息写入到文件当中,这也算是一种处理的方式把。

    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    try {
                        MQClientInstance.this.persistAllConsumerOffset();
                    } catch (Exception e) {
                        log.error("ScheduledTask persistAllConsumerOffset exception", e);
                    }
                }
            }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    1.5: 针对问题的总结

    ​ 经过一通整理,回到之前提到的广播消息的offsets文件内容定制的问题,可以总结出这样几个结论:

    • 消费者端存储广播消费的本地offsets文件的默认缓存目录是 System.getProperty(“user.home”) + File.separator + “.rocketmq_offsets” ,可以通过定制 rocketmq.client.localOffsetStoreDir 系统属性进行修改。
    • 本地offsets文件在缓存目录中的具体位置与消费者的clientIp 和 instanceName有关。其中instanceName默认是DEFAULT,可以通过定制系统属性 rocketmq.client.name 进行修改。另外,每个消费者对象也可以单独设定instanceName。
    • RocketMQ会通过定时任务不断尝试本地Offsets文件的写入,但是,如果本地Offsets文件写入失败,RocketMQ不会进行任何的不就,也就是说不会对业务有很大的影响。

    2、rocketmq-spring-boot-starter如何创建RocketMQ的消费者?

    ​ 接下来开始梳理rocketmq-spring-boot-starter对消费者端的封装。梳理封装的过程,其实也是在整理如何高效的使用RocketMQ的原生API,这一对于加深对于RocketMQ原生API的理解是非常有帮助的。如果你也尝试跟着这篇文章一起梳理源码,记得带上之前剔除的小问题。问题的本身相对比较简单,但是带上一个具体的问题去看源码,绝对会让你看源码的感觉不一样。

    ​ rocketmq-spring-boot-starter中在RocketMQ消费者这一块,封装了封装了两种模式,Push模式和Pull模式。

    • Push模式就是通过@RocketMQMessageListener注解声明一个RocketMQListener接口的实现类来声明一个消费者。Broker推过来的消息,会自行进入其中的onMessage方法进行处理。
    • Pull模式是通过内置的RocketMQTemplate对象的receive方法以及一系列的sendAndReceive方法,由消费者端主动去拉取消息。消息拉取过来后,可以通过里面的convert对象转换成所需要的结果对象。
    • 在Pull模式中,如果一个restTemplate实例不够用,还可以通过@ExtRocketMQTemplateConfiguration和@ExtRocketMQConsumerConfiguration两个注解来注册额外的rocketmqTemplate实例。一个用来初始化rocketmqTemplate中的消息发送者。例如发送事务消息时,一个restTemplate只能对应一个事务监听逻辑。如果你的项目中有多个事务消息逻辑,就需要注册多个restTemplate实例,来对应不同的事务监听器。另一个用来初始化rocketmqTemplate中的消费者。

    老规矩,如果使用还不太熟练,可以看下我的RocketMQ教程。支持鼓励一下。

    ​ 接下来,就按Push模式和Pull模式分开来梳理。其中,Push模式是分析的重点,因为这是平常用得做多的一种模式。Pull模式用得比较少,就当做是补充的彩蛋了。

    2.1、Push模式

    ​ Push模式对于@RocketMQMessageListener注解的处理方式,入口在rocketmq-spring-boot-2.2.1.jar中的org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration类中。

    怎么找到的?评经验猜以及碰运气。

    ​ 这个ListenerContainerConfiguration类继承了Spring当中的SmartInitializingSingleton接口,当Spring容器当中所有非懒加载的实例加载完成后,就会触发他的afterSingletonsInstantiated方法进行初始化。在这个方法中会去扫描所有带有注解@RocketMQMessageListener注解的类,将他注册到内部一个Container容器当中。

        public void afterSingletonsInstantiated() {
            Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    
            beans.forEach(this::registerContainer);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ​ 这里这个Container可以认为是客户端实例的一个容器,通过这个容器来封装RocketMQ的原生API。

    ​ registerContainer的方法挺长的,我这里截取出跟今天的主题相关的几行重要的源码:

    private void registerContainer(String beanName, Object bean) {
           .....
    	//获取Bean上面的注解
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
    
        	...
        //检查注解的配置情况
            validate(annotation);
    
            String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
                counter.incrementAndGet());
            GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
    		//将扫描到的注解转化成为Container,并注册到上下文中。
            genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
                () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
            DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                DefaultRocketMQListenerContainer.class);
        	//启动容器,这里就相当于是启动了消费者
            if (!container.isRunning()) {
                try {
                    container.start();
                } catch (Exception e) {
                    log.error("Started container failed. {}", container, e);
                    throw new RuntimeException(e);
                }
            }
    
            log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    ​ 这其中最关注的,当然是创建容器的createRocketMQListenerContainer方法中。而在这个方法中,你基本看不到RocketMQ的原生API,都是在创建并维护一个DefaultRocketMQListenerContainer对象。而这个DefaultRocketMQListenerContainer类,就是我们今天关注的重点。

    ​ DefaultRocketMQListenerContainer类实现了InitializingBean接口,自然要先关注他的afterPropertiesSet方法。这是Spring提供的对象初始化的扩展机制。

        public void afterPropertiesSet() throws Exception {
            initRocketMQPushConsumer();
    
            this.messageType = getMessageType();
            this.methodParameter = getMethodParameter();
            log.debug("RocketMQ messageType: {}", messageType);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    ​ 这个方法就是用来初始化RocketMQ消费者的。在这个方法里就会创建一个RocketMQ原生的DefaultMQPushConsumer消费者。同样,方法很长,抽取出比较关注的重点源码。

    private void initRocketMQPushConsumer() throws MQClientException {
           .....
            //检查并创建consumer对象。
            if (Objects.nonNull(rpcHook)) {
                consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),
                    enableMsgTrace, this.applicationContext.getEnvironment().
                    resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
                consumer.setVipChannelEnabled(false);
            } else {
                log.debug("Access-key or secret-key not configure in " + this + ".");
                consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,
                    this.applicationContext.getEnvironment().
                        resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            }
            // 定制instanceName,有没有很熟悉!!!
            consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
    		.....
           	//设定广播消费还是集群消费。
            switch (messageModel) {
                case BROADCASTING:
                    consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);
                    break;
                case CLUSTERING:
                    consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);
                    break;
                default:
                    throw new IllegalArgumentException("Property 'messageModel' was wrong.");
            }
        	//维护消费者的其他属性。   
        	...
               //指定Consumer的消费监听 --》在消费监听中就会去调用onMessage方法。
               switch (consumeMode) {
                case ORDERLY:
                    consumer.setMessageListener(new DefaultMessageListenerOrderly());
                    break;
                case CONCURRENTLY:
                    consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                    break;
                default:
                    throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    ​ 这整个就是在维护RocketMQ的原生消费者对象。其中的使用方式,其实有很多地方是很值得借鉴的,尤其是消费监听的处理。

    ​ 针对今天的这个问题,要关注的内容其实不多,只要关心这一句。

    consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
    
    • 1

    ​ 这里会设定消费者对象的instanceName。而这个instanceName属性,根据之前的分析,就会直接影响到广播消费的本地Offsets.json的存储路径。那接下来看RocketMQUtil.getInstanceName(nameServer)方法是如何产生instanceName的。

        public static String getInstanceName(String identify) {
            char separator = '@';
            StringBuilder instanceName = new StringBuilder();
            instanceName.append(identify)
                    .append(separator).append(UtilAll.getPid())
                    .append(separator).append(System.nanoTime());
            return instanceName.toString();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ​ 这里需要注意的是方法的入参,采用的是nameServer地址。而nameServer地址是怎么配的?默认是 ${brokerIP1}:9876。是的,问题就出在这里,nameServer中这个冒号!

    ​ 在windows文件系统中,是不允许创建带冒号的文件夹的!所以,使用rocketmq-spring-boot-starter后,广播消息的本地offsets.json文件就无法落盘了。
    在这里插入图片描述

    ​ 这样就给今天这个奇怪的问题找到了最终的答案!至于说广播消费的本地Offset落不了盘会有什么影响。之前提过,RocketMQ本身就只是记录一条log日志而已。

    2.2、Pull模

    ​ Pull模式的实现其实是通过在RocketMQTemplate实例中注入一个DefaultLitePullConsumer实例来实现的。只要注入并启动了这个DefaultLitePullConsumer示例后,后续就可以通过template实例的receive方法,来调用DefaultLitePullConsumer的poll方法,主动去Pull获取消息了。

    ​ 初始化DefaultLitePullConsumer的代码依然是在rocketmq-spring-boot-2.2.1.jar包中。不过处理类是org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration。这个配置类会配置在jar包中的spring.factories文件中,通过SpringBoot的自动装载机制加载进来。

        @Bean(CONSUMER_BEAN_NAME)
        @ConditionalOnMissingBean(DefaultLitePullConsumer.class)
        @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"}) //解析的springboot配置属性。
        public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
                throws MQClientException {
            RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
            String nameServer = rocketMQProperties.getNameServer();
            String groupName = consumerConfig.getGroup();
            String topicName = consumerConfig.getTopic();
            Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
            Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
            Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
    		
            ...
            //创建消费者   
            DefaultLitePullConsumer litePullConsumer = RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
                    groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize);
            litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
            litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
            return litePullConsumer;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    ​ 然后在创建消费者实例的RocketMQUtil.createDefaultLitePullConsumer方法中,依然可以看到这一行熟悉的代码。

    litePullConsumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));
    
    • 1

    ​ 至于通过@ExtRocketMQConsumerConfiguration注解声明其他的RocketMQTemplate实例的处理方式,可以参看ExtConsumerResetConfiguration。在这个类中会处理注解并同样会创建RocketMQTemplate实例以及RocketMQ的DefaultLitePullConsumer实例。

    三、过程中的收获

    ​ 借助RocketMQ广播消费的本地Offset文件丢失这一个小问题,把offset本地文件的写入过程以及Springboot集成RocketMQ的过程梳理了一遍。有很多细枝末节的逻辑,可以慢慢被捋顺,对RocketMQ的熟悉感也会慢慢培养起来。

    ​ 另外,本地Offset都梳理清楚了,那集群消费的远程Offset还远吗?

  • 相关阅读:
    每天学一个MySQL函数(一):CONCAT
    构建高可用性的 SQL Server:Docker 容器下的主从同步实现
    okcc坐席进线太慢,怎么办?
    [springboot专栏]缓存雪崩、穿透、击穿的代码级解决方案
    [HDLBits] Lemmings4
    FasterNet 与 RT-DTER 的 碰撞,打造 Faster-DTER 目标检测网络 | 《Run, Don’t Walk: Chasing Higher FLOPS for Faster 》
    原型和原型链
    工业树莓派的应用:助力构建智慧能源管理系统
    【ReID】1、行人重识别模型
    使用python提取轮廓做定制的毛笔字帖
  • 原文地址:https://blog.csdn.net/roykingw/article/details/126351010