• 【消息中间件】默认的RocketMQ消息消费者是如何启动的?


    前言

    大家好,在当下的分布式服务中,消息队列中间件是一个解决服务之间耦合的利器,今天我们来瞧一瞧开源的RocketMQ消息中间件,他的消费端是如何启动的,以及在使用他的过程中有哪些配置。

    启动流程图

    Push和Pull的区别

    Apache RocketMQ在消费者服务中,为我们提供了Push模式也提供了Pull模式

    那他们主要有什么区别呢?

    • Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

    • Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

    BROADCASTING 和 CLUSTERING 模式的区别

    BROADCASTING(广播模式):当使用广播消费模式时,RocketMQ 会将每条消息推送给消费组所有的消费者,保证消息至少被每个消费者消费一次。

    CLUSTERING(集群模式):当使用集群消费模式时,RocketMQ 认为任意一条消息只需要被消费组内的任意一个消费者处理即可。

    Consumer启动流程

    我们直接跑一个官方提供的Demo,大家也可以去官网上去下载源码

    1. public static void main(String[] args) throws InterruptedException, MQClientException {
    2. /*
    3. * Instantiate with specified consumer group name.
    4. * 消费者模式有两种 推和拉
    5. */
    6. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    7. consumer.setNamesrvAddr("127.0.0.1:9876");
    8. /*
    9. * Specify where to start in case the specific consumer group is a brand-new one.
    10. * 指定消费从哪里开始
    11. */
    12. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    13. /*
    14. * Subscribe one more topic to consume.
    15. * 设置监听主题以及过滤条件
    16. */
    17. consumer.subscribe("TopicTest999", "*");
    18. /*
    19. * Register callback to execute on arrival of messages fetched from brokers.
    20. * 注册消息监听器
    21. */
    22. consumer.registerMessageListener(new MessageListenerConcurrently() {
    23. @Override
    24. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    25. ConsumeConcurrentlyContext context) {
    26. //System.out.println("待消费条数:"+ msgs.size());
    27. LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
    28. /*try {
    29. Thread.sleep(10000);
    30. } catch (InterruptedException e) {
    31. e.printStackTrace();
    32. }*/
    33. LOGGER.info("success");
    34. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    35. }
    36. });
    37. /*
    38. * Launch the consumer instance.
    39. */
    40. consumer.start();
    41. System.out.printf("Consumer Started.%n");
    42. }
    43. 复制代码

    前置设置

    1. 指定namesrvAddr地址
    1. consumer.setNamesrvAddr("127.0.0.1:9876");
    2. 复制代码
    1. 指定消费从哪里开始
    1. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    2. 复制代码
    • CONSUME_FROM_LAST_OFFSET 从最新的消息开始消费
    • CONSUME_FROM_FIRST_OFFSET 从最新的位点开始消费
    • CONSUME_FROM_TIMESTAMP 从指定的时间戳开始消费
    1. 指定负载均衡策略
    • AllocateMessageQueueAveragely:平均连续分配算法。
    • AllocateMessageQueueAveragelyByCircle:平均轮流分配算法。
    • AllocateMachineRoomNearby:机房内优先就近分配。
    • AllocateMessageQueueByConfig:手动指定,这个通常需要配合配置中心,在消费者启动时,首先先创建 AllocateMessageQueueByConfig 对象,然后根据配置中心的配置,再根据当前的队列信息,进行分配,即该方法不具备队列的自动负载,在 Broker 端进行队列扩容时,无法自动感知,需要手动变更配置。
    1. // AllocateMessageQueueByConfig
    2. AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
    3. MessageQueue messageQueue = new MessageQueue();
    4. messageQueue.setBrokerName("broker-a");
    5. messageQueue.setQueueId(2);
    6. messageQueue.setTopic("TopicTest");
    7. allocateMessageQueueByConfig.setMessageQueueList(Collections.singletonList(messageQueue));
    8. consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
    9. 复制代码
    • AllocateMessageQueueByMachineRoom:消费指定机房中的队列,该分配算法首先需要调用该策略的 setConsumeridcs(Set consumerIdCs) 方法,用于设置需要消费的机房,将刷选出来的消息按平均连续分配算法进行队列负载。
    1. // AllocateMessageQueueByMachineRoom
    2. AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new AllocateMachineRoomNearby.MachineRoomResolver() {
    3. // Broker部署
    4. @Override
    5. public String brokerDeployIn(MessageQueue messageQueue) {
    6. System.out.println(messageQueue.getBrokerName().split("-")[0]);
    7. return messageQueue.getBrokerName().split("-")[0];
    8. }
    9. // 消费端部署
    10. @Override
    11. public String consumerDeployIn(String clientID) {
    12. System.out.println(clientID.split("-")[0]);
    13. return clientID.split("-")[0];
    14. }
    15. };
    16. consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(new AllocateMessageQueueAveragely(), machineRoomResolver));
    17. 复制代码
    1. 设置监听主题以及过滤条件

    Tag过滤,用于对某个Topic下的消息进行分类,

    消息发送到名称为TopicTest999的Topic中,被各个不同的系统所订阅,我们可以利用Tag来区分

    1. consumer.subscribe("TopicTest999", "order");
    2. consumer.subscribe("TopicTest999", "user");
    3. 复制代码
    1. 注册消息监听器

    注册消息监听器的目的就是为了接收消息,RocketMQ本身为我们提供了两种模式

    • 并发消费
    1. consumer.registerMessageListener(new MessageListenerConcurrently() {
    2. @Override
    3. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    4. ConsumeConcurrentlyContext context) {
    5. LOGGER.info("Receive New Messages : {}", Thread.currentThread().getName());
    6. /*try {
    7. Thread.sleep(10000);
    8. } catch (InterruptedException e) {
    9. e.printStackTrace();
    10. }*/
    11. LOGGER.info("success");
    12. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    13. }
    14. });
    15. 复制代码
    • 顺序消费
    1. consumer.registerMessageListener(new MessageListenerOrderly() {
    2. AtomicLong consumeTimes = new AtomicLong(0);
    3. @Override
    4. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    5. System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
    6. this.consumeTimes.incrementAndGet();
    7. if ((this.consumeTimes.get() % 2) == 0) {
    8. return ConsumeOrderlyStatus.SUCCESS;
    9. } else if ((this.consumeTimes.get() % 5) == 0) {
    10. context.setSuspendCurrentQueueTimeMillis(3000);
    11. return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
    12. }
    13. return ConsumeOrderlyStatus.SUCCESS;
    14. }
    15. });
    16. 复制代码

    他们主要的区别继承MessageListener接口的实现

    除了这一些重要的参数以外,RocketMQ为我们提供了其他非常丰富的配置,我总结在了下图

    启动流程源码跟踪

    需要注意的是,在配置后我们才能去调用启动方法

    1. 设置消费者分组后,DefaultMQPushConsumer调用start()启动消费者

    入口:org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#start

    1. @Override
    2. public void start() throws MQClientException {
    3. // step 1 设置消费者分组
    4. setConsumerGroup(NamespaceUtil.wrapNamespace(this.getNamespace(), this.consumerGroup));
    5. this.defaultMQPushConsumerImpl.start();
    6. if (null != traceDispatcher) {
    7. try {
    8. traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
    9. } catch (MQClientException e) {
    10. log.warn("trace dispatcher start failed ", e);
    11. }
    12. }
    13. }
    14. 复制代码

    2. 根据serviceState状态启动消费者,当服务未创建时,才能启动成功

    在这里主要做了五件事

    1. 检查核心参数是否都配置了
    1. private void checkConfig() throws MQClientException {
    2. // 检查消费者组,是否满足条件
    3. Validators.checkGroup(this.defaultMQPushConsumer.getConsumerGroup());
    4. if (null == this.defaultMQPushConsumer.getConsumerGroup()) {
    5. ...
    6. }
    7. if (this.defaultMQPushConsumer.getConsumerGroup().equals(MixAll.DEFAULT_CONSUMER_GROUP)) {
    8. ...
    9. }
    10. if (null == this.defaultMQPushConsumer.getMessageModel()) {
    11. ...
    12. }
    13. if (null == this.defaultMQPushConsumer.getConsumeFromWhere()) {
    14. ...
    15. }
    16. // allocateMessageQueueStrategy
    17. // subscription
    18. // messageListener
    19. // consumeThreadMin
    20. // consumeThreadMax
    21. // consumeConcurrentlyMaxSpan
    22. // pullThresholdForQueue
    23. // pullThresholdForTopic
    24. // pullThresholdSizeForQueue
    25. // pullInterval
    26. // consumeMessageBatchMaxSize
    27. // pullBatchSize
    28. 复制代码

    主要是进行了参数配置的校验,如果一些参数设置不合理的,在这里就会抛出异常,终止了消费者服务的启动,这里的配置对后面的使用会产生一定的影响,所以我们在配置的时候需要更加的谨慎

    1. 复制订阅信息,生成重试主题
    1. private void copySubscription() throws MQClientException {
    2. try {
    3. Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
    4. if (sub != null) {
    5. for (final Map.Entry<String, String> entry : sub.entrySet()) {
    6. final String topic = entry.getKey();
    7. final String subString = entry.getValue();
    8. SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(topic, subString);
    9. // 更新内部订阅关系
    10. this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
    11. }
    12. }
    13. if (null == this.messageListenerInner) {
    14. this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
    15. }
    16. // 默认情况下我们是CLUSTERING模式
    17. switch (this.defaultMQPushConsumer.getMessageModel()) {
    18. case BROADCASTING:
    19. break;
    20. case CLUSTERING:
    21. // 创建重试主题
    22. final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
    23. SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
    24. // 将重试主题放入订阅关系容器中
    25. this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
    26. break;
    27. default:
    28. break;
    29. }
    30. } catch (Exception e) {
    31. throw new MQClientException("subscription exception", e);
    32. }
    33. }
    34. 复制代码

    获取配置的订阅关系,因为setSubscription()方法已经被作废,subscription都是为空的,在下面他会去维护一个subscriptionInner

    1. protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
    2. new ConcurrentHashMap<String, SubscriptionData>();
    3. 复制代码

    subscriptionInner保存了我们在前置配置的时候插入的订阅关系

    1. 消息模型设置为集群,则修改实例名称
    1. if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    2. this.defaultMQPushConsumer.changeInstanceNameToPID();
    3. }
    4. public void changeInstanceNameToPID() {
    5. if (this.instanceName.equals("DEFAULT")) {
    6. this.instanceName = UtilAll.getPid() + "#" + System.nanoTime();
    7. }
    8. }
    9. 复制代码

    将实例名称修改为 pid + 时间戳

    1. // format: "pid@hostname"
    2. final static String HOST_NAME = ManagementFactory.getRuntimeMXBean().getName();
    3. public static int getPid() {
    4. try {
    5. return Integer.parseInt(HOST_NAME.substring(0, HOST_NAME.indexOf('@')));
    6. } catch (Exception e) {
    7. return -1;
    8. }
    9. }
    10. 复制代码
    1. 获取全局唯一的实例
    1. this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
    2. // 一个clientId对应一个实例
    3. if (null == instance) {
    4. instance =
    5. new MQClientInstance(clientConfig.cloneClientConfig(),
    6. this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
    7. // 一个clientId对应一个实例
    8. MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
    9. if (prev != null) {
    10. instance = prev;
    11. log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
    12. } else {
    13. log.info("Created new MQClientInstance for clientId:[{}]", clientId);
    14. }
    15. }
    16. 复制代码

    在一个JVM中所有消费者、生产者持有同一个MQClientInstance实例,采用了双重检查的方式,来确定有且只有一个MQClientInstance实例。

    1. 负载均衡参数设置
    1. this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
    2. this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
    3. this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
    4. this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    5. 复制代码

    3. 注册过滤消息的钩子函数

    这里注册过滤消息的钩子函数有什么用呢,我们可以思考一下 ?

    1. private final ArrayList filterMessageHookList = new ArrayList();
    2. this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
    3. // 注册过滤消息 钩子函数
    4. this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
    5. 复制代码

    4. 根据不同模式,将消息进度存储在不同的地方

    1. switch (this.defaultMQPushConsumer.getMessageModel()) {
    2. // 广播模式
    3. case BROADCASTING:
    4. // 消息进度存储在本地文件
    5. this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    6. break;
    7. // 集群模式
    8. case CLUSTERING:
    9. // 消息进度存储在Broker 服务器上
    10. this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
    11. break;
    12. default:
    13. break;
    14. this.offsetStore.load();
    15. }
    16. 复制代码

    目的是初始化消息进度,以及确定消息存储的位置。

    5. 根据不同的消息监听器初始化消费消息线程池和扫描过期消息清除线程池

    顺序消费模式与并发消费模式有一些的不同

    根据不同的消息监听器初始化消费消息线程池、扫描过期消息清除线程池

    顺序消息模式,不初始化扫描过期消息清除线程池,只初始化消费消息线程池

    1. if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    2. this.consumeOrderly = true;
    3. this.consumeMessageService =
    4. new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
    5. }
    6. 复制代码

    初始化消费消息线程池、扫描过期线程池

    1. if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    2. this.consumeOrderly = false;
    3. this.consumeMessageService =
    4. new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
    5. }
    6. 复制代码

    6. 启动消费消息服务

    入口:org.apache.rocketmq.client.impl.consumer.ConsumeMessageService#start

    1. // 启动消费消息服务
    2. this.consumeMessageService.start();
    3. 复制代码
    1. 顺序消费模式

    顺序消费模式,启动的是线程池名称为ConsumeMessageScheduledThread_开头的定时线程池,每秒扫描一次

    1. this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    2. @Override
    3. public void run() {
    4. try {
    5. ConsumeMessageOrderlyService.this.lockMQPeriodically();
    6. } catch (Throwable e) {
    7. log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
    8. }
    9. }
    10. }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    11. 复制代码

    lockMQPeriodically()这个方法就是给当前客户端所消费的所有队列去borker进行上锁。

    1. public synchronized void lockMQPeriodically() {
    2. if (!this.stopped) {
    3. this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
    4. }
    5. }
    6. 复制代码
    1. 并发消费模式

    启动的线程池是清除过期消息定时线程池,每15分钟扫描一次

    1. // 开启过期消息清除,定时器
    2. this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
    3. @Override
    4. public void run() {
    5. try {
    6. cleanExpireMsg();
    7. } catch (Throwable e) {
    8. log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
    9. }
    10. }
    11. }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    12. 复制代码

    7. 将消费者组注入消费者容器

    入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#registerConsumer

    1. // 将消费者组注入消费者容器
    2. boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
    3. 复制代码

    消费者容器

    1. private final ConcurrentMap<String/* group */, MQConsumerInner> consumerTable = new ConcurrentHashMap<String, MQConsumerInner>();
    2. MQConsumerInner prev = this.consumerTable.putIfAbsent(group, consumer);
    3. if (prev != null) {
    4. log.warn("the consumer group[" + group + "] exist already.");
    5. return false;
    6. }
    7. 复制代码

    使用consumerTable来保存消费者组与消费者关系

    在后续更新主题路由信息,检查客户端与Broker的关系等

    8. 启动消息监听

    入口:org.apache.rocketmq.client.impl.factory.MQClientInstance#start

    1. mQClientFactory.start();
    2. 复制代码

    这一个环节的内容过多,我们将在另外写一篇文章中,去了解RocketMQ是如何做消息监听的

    9. 最后向Broker设置心跳检测

    更新主题发布详情,当发布者变动时

    1. this.updateTopicSubscribeInfoWhenSubscriptionChanged();
    2. Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
    3. if (subTable != null) {
    4. for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
    5. final String topic = entry.getKey();
    6. this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
    7. }
    8. }
    9. 复制代码

    检查Broker状态

    1. this.mQClientFactory.checkClientInBroker();
    2. 复制代码

    向Broker设置心跳检测

    1. this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    2. 复制代码

    Consumer跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。

    总结

    我们主要了解的RocketMQ消息消费的启动流程,还是看得懂的,抓住这一条主线,我们在接下去的过程中,我们可以给自己留下一些问题,比如他是如何做负载均衡的?如何监听消息?在这过程中他使用到了线程池,我们能够去监听线程池的情况等等问题。

    我用思维导图整理了今天分享的内容,如果有不对的地方欢迎大家指出~

  • 相关阅读:
    getenv、setenv和putenv实践
    01. Kubernetes基础入门
    iOS 在OC旧项目中使用Swift进行混编
    基于I2C协议的OLED显示(利用U82G库)
    三方检测-服务及服务器扫描问题及处理方案
    ArcGIS:如何利用栅格数据进行路径网络分析-可达性分析?
    数据治理-数据管理角色
    最新版k8s 1.25版本安装
    react-router v6使用createHashHistory进行history.push时,url改变页面不渲染
    【毕业设计】基于SSM的商城系统
  • 原文地址:https://blog.csdn.net/m0_73311735/article/details/127864222