• RocketMQ实战之Consumer


    环境以及版本

    1. 双主双从,版本4.3.1
      在这里插入图片描述

    概念

    1. Push方式是broker接收到消息后,主动把消息推送到Consumer

      • 优点:实时性高
      • 缺点: 1)加大broker的工作量,影响broker性能. 2) Consumer的处理能力各不相同,Consumer的状态不受broker控制,如果Consumer不能及时处理broker推送过来的消息,会造成各种问题,比如缓冲区溢出、内存溢出等
    2. Pull方式是Consumer循环地从broker拉取消息,拉取多少消息,什么时候拉取都是由Consumer决定,处理完毕再继续拉取,这样可以达到限流的目的,不会出现处理不过来的情况。

      • 优点:Consumer自己控制流量
      • 缺点: 拉取消息的时间间隔不好控制,间隔太短就处在一个忙等的状态,浪费资源,时间间隔太长,broker的消息不能及时处理
    3. 长轮询:既有Pull的优点,又能达到实时性的目的。长轮询的局限性是在HOLD住Consumer请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。

    4. consumerGroup:用于把多个Consumer组织在一起,提高处理能力,consumerGroup需要和消息模式(MessageModel)配合使用

    5. RocketMQ支持两种消息模式:ClusteringBroadcasting

      • Clustering(集群)模式下,同一个ConsumerGroup(消费组)(GroupName相同)里的Consumer负载均衡消费部分信息
      • Broadcasting(广播)模式下,同一个ConsumerGroup(消费组)里的每个Consumer都能消费到所订阅Topic的全部信息,即广播模式
    6. TAG:通过指定消息的Tag进行消息过滤(在发送消息时设置Tag)

       // 订阅testTopic下TagA的消息
        consumer.subscribe(topic, "TagA");
        // 订阅testTopic下TagA或TagC或TagD的消息
        consumer.subscribe("testTopic", "TagA || TagC || TagD");
        // 订阅testTopic下所有消息
        consumer.subscribe("testTopic", "*");
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6

    Push方式

    1. DefaultMQPushConsumer:系统收到消息后自动调用监听器处理,自动保存offset,加入新的DefaultMQPushConsumer会自动做负载均衡。一个应用创建一个DefaultMQPushConsumer,由应用来维护此对象,可以设置为全局对象或者单例。

    2. 并行消费使用MessageListenerConcurrently适用于不考虑消息顺序。PushConsumer为了保证消息肯定消费成功,消费方需要明确表示消费成功,Broker才会认为消息消费成功。

      • 当回调监听器返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,Broker才会认为这批消息(默认是1条)是消费完成的。
      • 如果消息消费失败,需要重试消费,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup
      • 如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预
    3. push模式并发消费案例

      
      @Test
      public void testPushConsumer() {
          String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9877";
          String consumerGroup = "ConsumerGroupName3";
      
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
          try {
      
              //从上一次消费位置开始消费
              consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
              consumer.setNamesrvAddr(namesrvAddr);
              consumer.setInstanceName("Consumber");
      
              //设置集群模式
              consumer.setMessageModel(MessageModel.CLUSTERING);
      
              //批量接收20条
              consumer.setConsumeMessageBatchMaxSize(20);
              //默认64
              consumer.setConsumeThreadMax(4);
              //默认20
              consumer.setConsumeThreadMin(1);
              consumer.setVipChannelEnabled(false);
      
              String topic = "testTopic";
              String tag = "TagA";
              consumer.subscribe(topic, tag);
      
              consumer.registerMessageListener(new MessageListenerConcurrently() {
                  @Override
                  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
      
                      for (MessageExt messageExt : msgs) {
                          try {
      
                              String topic = messageExt.getTopic();
                              String msg = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                              String tags = messageExt.getTags();
                              logger.info("threadName:{},topic:{},tag:{},msg:{}", Thread.currentThread().getName(), topic, tags, msg);
                          } catch (Exception e) {
                              logger.error(e.getMessage(), e);
                              // 重试消费,重发到Broker的RETRY TOPIC。 10s后Broker默认重新投递
                              return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                          }
                      }
                      //表示此批消息消费完成
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  }
              });
              //Consumer对象在使用之前必须要调用start初始化,初始化一次即可
              consumer.start();
              LockSupport.park();
          } catch (MQClientException e) {
              logger.error(e.getMessage(), e);
          } finally {
              consumer.shutdown();
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
    4. push顺序消费案例:MessageListenerOrderly适用于顺序消费,同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费。所以顺序消息消费失败,不会再继续推进,直到失败的消息消费成功为止

      @Test
      public void testMsgOrder() {
          String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9877";
          String consumerGroup = "ConsumerGroupNameOrder";
      
          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
          try {
              consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
              consumer.setNamesrvAddr(namesrvAddr);
              consumer.setInstanceName("ConsumberOrder");
              consumer.setMessageModel(MessageModel.CLUSTERING);
      
              consumer.setConsumeMessageBatchMaxSize(20);
              consumer.setVipChannelEnabled(false);
              consumer.setConsumeThreadMax(4);
              consumer.setConsumeThreadMin(1);
      
              consumer.subscribe("testTopic", "TagA");
              consumer.registerMessageListener(new MessageListenerOrderly() {
      
                  @Override
                  public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
                      //设置自动提交,默认true
                      context.setAutoCommit(true);
      
                      for (MessageExt messageExt : list) {
                          String msgStr = null;
                          try {
                              msgStr = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                              logger.info("threadName:{},topic:{},tag:{},msg:{}", Thread.currentThread().getName(), messageExt.getTopic(), messageExt.getTags(), msgStr);
                          } catch (Exception e) {
                              logger.error(e.getMessage(), e);
                              return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                          }
      
                      }
      
                      return ConsumeOrderlyStatus.SUCCESS;
                  }
              });
      
              consumer.start();
              LockSupport.park();
          } catch (MQClientException e) {
              logger.error(e.getMessage(), e);
          } finally {
              consumer.shutdown();
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50

    Pull方式

    1. Pull方式:Consumer自己控制消费

       @Test
       public void testPullConsumer() {
           String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9877";
           final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("ConsumerGroupName11");
           DefaultMQPullConsumer defaultMQPullConsumer = scheduleService.getDefaultMQPullConsumer();
           try {
      
               defaultMQPullConsumer.setNamesrvAddr(namesrvAddr);
               defaultMQPullConsumer.setVipChannelEnabled(false);
               scheduleService.setMessageModel(MessageModel.CLUSTERING);
      
               String topic = "testTopic";
      
               scheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
                   @Override
                   public void doPullTask(MessageQueue mq, PullTaskContext context) {
                       MQPullConsumer consumer = context.getPullConsumer();
                       logger.info("brokerName:{},topic:{},queueId:{}", mq.getBrokerName(), mq.getTopic(), mq.getQueueId());
                       try {
                           // 获取消费偏移量,其中fromStore为true表示从存储端(即Broker端)获取消费进度;
                           // 若fromStore为false表示从本地内存获取消费进度
                           long offset = consumer.fetchConsumeOffset(mq, true);
                           if (offset < 0)
                               offset = 0;
      
                           PullResult pullResult = consumer.pull(mq, "TagA", offset, 32);
                           switch (pullResult.getPullStatus()) {
                               case FOUND:
                                   List<MessageExt> list = pullResult.getMsgFoundList();
                                   for (MessageExt messageExt : list) {
                                       logger.info("msg:{}", new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                                   }
                                   break;
                               case NO_MATCHED_MSG:
                                   logger.info("没有匹配的消息");
                                   break;
                               case NO_NEW_MSG:
                                   logger.info("没有新消息");
                                   break;
                               case OFFSET_ILLEGAL:
                                   logger.info("偏移量非法");
                                   break;
                               default:
                                   break;
                           }
      
                           // 存储Offset,客户端每隔5s会定时刷新到Broker
                           consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
      
                           // 设置再过1000ms后重新拉取
                           context.setPullNextDelayTimeMillis(1000);
                       } catch (Exception e) {
                           logger.error(e.getMessage(), e);
                       }
                   }
               });
               scheduleService.start();
               LockSupport.park();
           } catch (MQClientException e) {
               logger.error(e.getMessage(), e);
           } finally {
               scheduleService.shutdown();
           }
      
       }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
  • 相关阅读:
    1.44寸OLED的Linux驱动
    高等数学基础概念的Python开发实现
    番外8.1 配置+管理文件系统
    Python的魔术方法
    恭喜!龙蜥获得 2023 大学生操作系统设计赛二等奖及特殊贡献奖
    22 相交链表
    Leetcode 198. House Robber
    智能穿戴终端设备安卓主板方案_MTK平台智能手表PCBA定制开发
    限量,阿里百万级Redis小册开源,原理应用拓展一键完成
    39 WEB漏洞-XXE&XML之利用检测绕过全解
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126355131