• RocketMQ消息消费-客户端拉取消息前的准备工作


    我们以一段构建消费者的代码开始本篇文章。

    1. public class Consumer {
    2. public static void main(String[] args) throws InterruptedException, MQClientException {
    3. // 实例化消费者
    4. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test_name");
    5. // 设置NameServer的地址
    6. consumer.setNamesrvAddr("localhost:9876");
    7. // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
    8. consumer.subscribe("TopicTest", "*");
    9. // 注册回调实现类来处理从broker拉取回来的消息
    10. consumer.registerMessageListener(new MessageListenerConcurrently() {
    11. @Override
    12. public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    13. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    14. // 标记该消息已经被成功消费
    15. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    16. }
    17. });
    18. // 启动消费者实例
    19. consumer.start();
    20. System.out.printf("Consumer Started.%n");
    21. }
    22. }

    这是一段很常见的代码,主要内容就是创建消费者并制定组名、设置NameServer地址、订阅Topic、注册回调业务处理器、启动消费者。

    这段代码默认使用了DefaultMQPushConsumer,这个也是RocketMQ推荐使用的构建消费者实例。从名称来看Push是一个推类型的消费者,但是内部依赖是采用了拉模式,置于为何内部是拉模式,这一点后面文章中会详细的介绍到。

    当调用DefaultMQPushConsumer的start方法时,消费者开始启动工作了,在获取到消息的时候,会主动回调业务监听器,进行业务上面的处理。

    本篇文章,我们将探讨start方法背后的故事,看一下客户端在拉取消息之前,都做了哪些准备工作,来支撑客户端的运行。

    消费者客户端内部类关系图

    我们先来整体看一下涉及客户端消费几个重要的类以及他们之间的构成关系,以图的形式展示一下,先有个整体的认知印象,然后在拆开来一个一个从源码的角度介绍一下具体的内容。

    图中箭头指引的方向表示存在内部引用关系。

    图片罗列了客户端涉及的重要类以及主要功能介绍,接下来我们以源码的角度,开始从start方法入手,深入分析一下具体的功能实现,一步一步拆解客户端在拉取消息前的准备工作。

    源码解析篇

    DefaultMQPushConsumer的start方法内部调用的其实是DefaultMQPushConsumerImpl的start方法,DefaultMQPushConsumer内部持有一个DefaultMQPushConsumerImpl的实例,在构造方法中创建。

    traceDispatcher这个涉及轨迹的记录,不是本篇分析的内容,知晓一下就可以。

    DefaultMQPushConsumerImpl的start方法涉及的内比较多,内部封装了复杂的业务逻辑,串联了消费者客户端涉及的各个子模块。

    从DefaultMQPushConsumerImpl中衍生出了几个重要的类,比如MQClientInstance、RebalanceImpl、pullAPIWrapper、OffsetStore、ConsumeMessageService。这几个类的创建以及初始化就代表着客户端在拉取消息前做的准备工作。

    我们重点介绍一下MQClientInstance类。

    MQClientInstance介绍

    MQClientInstance的创建是根据客户端的clientId来判断是否应该创建新的实例还有获取已经存在的实例。

    因为clientId的不同,MQClientInstance存在一个或者多个。一个MQClientInstance实例封装的逻辑是很重的,之所以设计成多个可能是考虑到各个消费者或者生产者不互相影响,也可能涉及到不同NameServer集群不同Broker集群在同一客户端实现的可能性。

    factoryTable结构的定义如下,是一个ConcurrentMap。

    clientId的的构成主要是由ClientIP以及InstanceName构成,而在DefaultMQPushConsumerImpl中start方法中介绍了InstanceName的构成。

    MQClientInstance内部也涉及start方法,用于处理更加底层的一些服务,涉及到网络通信的API、定时调度任务、启动消息拉取服务、启动队列分配服务、启动内部生产者。

    分析一下startScheduledTask涉及的调度服务都有哪些,从上到下一次是:

    1. 如果没有在启动客户时设置NameServer地址,则会从一个域名地址以HTTP GET方式请求NameServer地址。
    2. 从NameServer定时更新Topic的路由信息,涉及消费者和生产者。
    3. 清除离线的Broker服务器以及想Broker服务器发送心跳包。
    4. 持久化消费者offset,广播模式存储在本地,集群模式存储在远程Broker上面
    5. 动态调整线程池线程数,暂时没有实现。

    这些调度服务伴随着客户端的启动而启动,也属于是拉取消息前准备工作的一部分

    本篇总结

    本篇主要介绍了消息消费者在拉取消息前做的一些准备工作,准备工作主要是在DefaultMQPushConsumerImpl中来完成的,这个类伴随着DefaultMQPushConsumer的创建而创建,内部封装了消费者客户端涉及的方方面面,要分析消费者实现源码可以着重从这里入手。

    随着内部各个模块均已初始化启动,客户端拉取消息的准备工作完成。

  • 相关阅读:
    vivado Versal 串行 I/O 硬件调试流程、使用 Vivado Serial I/O Analyzer 来调试设计
    YOLOv8_obb预测流程-原理解析[旋转目标检测理论篇]
    opencv-图像平滑
    【C语言刷题】Leetcode268丢失的数字
    web前端框架JS学习之JavaScript类型转换
    【go-zero】go-zero 脚手架 simple-admin 第二章:通过goctls生成api整个项目
    微软开源 windows-drivers-rs, 用 Rust 开发 Windows 驱动程序
    基于单片机智能液位水位监测控制系统设计
    【微服务】Alibaba Cloud Linux环境下Docker以及MySQL安装
    基于SVm和随机森林算法模型的中国黄金价格预测分析与研究
  • 原文地址:https://blog.csdn.net/m0_57042151/article/details/126901761