• RocketMQ消息批处理与消峰填谷


    前言

    MQ除了具有解除业务耦合的作用之外,还可以消峰填谷,可以将流量拉平,避免短期密集请求压垮系统。

    消息批量获取、消峰填谷

    • 客户端可能在短期内发来大量的请求,我们利用RocketMQ周期性的批量获取数据,可以进行消息的批处理,降低业务对系统的开销。
      在这里插入图片描述

    环境准备

    需要安装RocketMQ,安装教程请见springboot基础(48): RocketMQ的安装

    周期批量处理消息

    public static void startConsumer() throws MQClientException {
            log.info("----开始批量消费消息-----");
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");//设置group
            consumer.setNamesrvAddr("localhost:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//从队列的头部消费
            consumer.subscribe("msgid", "*");
    
            consumer.setConsumeMessageBatchMaxSize(32);//每次指定消费的10条消息,默认1
            consumer.setPullBatchSize(32);//每次从Broker中拉40条消息,默认32
            consumer.setPullInterval(5000);//拉取时间间隔
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    log.info("收到的消息数量:" + list.size());
                    for(MessageExt messageExt:list){
                        String str = new String(messageExt.getBody(), Charset.forName("UTF-8"));
                        System.out.println(str);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回状态是消费成功还是消费失败
                }
            });
    
            consumer.start();
            log.info("consumer started ....");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    测试发送消息

        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        @Override
        public void sendMessage(String id) {
            SendResult result = rocketMQTemplate.syncSend("msgid", "同步测试消息"+id, 1000);
            System.out.println("msgid:"+id+",发送状态:"+result);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    启动消费者后,我们快速的生产消息,供消费者进行消费,当快速发送请求时,RocketMQ消费消息。
    在这里插入图片描述

    模拟消峰填谷

    假设系统的对某业务的处理能力是QPS为n,当QPS超过n时,会超过处理能力。
    那么我们为了方便距离,直接让消费者最大数量为1,也就是只有一个consumer thread,这样所有的请求都需要等待这个consumer来处理。

    这里为了方便模拟MQ消峰填谷效果,所以使用了consumer thread 为1,业务场景当然不会这么设置。

    设置消费者线程最大最小数量为1

            consumer.setConsumeThreadMax(1); 
            consumer.setConsumeThreadMin(1);
    
    • 1
    • 2
    public static void startConsumer() throws MQClientException {
            log.info("----开始批量消费消息-----");
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumergroup");//设置group
            consumer.setNamesrvAddr("localhost:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//从队列的头部消费
            consumer.subscribe("msgid", "*");
    
            consumer.setConsumeMessageBatchMaxSize(5);//每次指定消费的10条消息,默认1
            consumer.setPullBatchSize(12);//每次从Broker中拉40条消息,默认32
            consumer.setPullInterval(5000);//拉取时间间隔
            consumer.setConsumeThreadMax(1);
            consumer.setConsumeThreadMin(1);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @SneakyThrows
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    log.info("收到的消息数量:" + list.size());
                    for(MessageExt messageExt:list){
                        String str = new String(messageExt.getBody(), Charset.forName("UTF-8"));
                        log.info(str);
                        TimeUnit.SECONDS.sleep(1);//模拟业务处理消耗的时间
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回状态是消费成功还是消费失败
                }
            });
    
            consumer.start();
            log.info("consumer started ....");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    此时候for循环,发送mq消费,模拟突增的业务请求。
    在这里插入图片描述

    启动服务器,批量发送生产多条消息。可以看到生产者突然向MQ发送了大量的消息,而观察consumer,则依然按照设定的业务处理能力进行业务处理。
    在这里插入图片描述

    消费方按照1条/秒的处理能力处理消息。
    在这里插入图片描述

    consumer参数值范围

    参数的可取值范围。
    org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl.java
    在这里插入图片描述

  • 相关阅读:
    数据预处理—滑动窗口采样数据
    spark sql详解
    基于Caffeine再次封装的本地缓存工具
    echarts折线图流动特效的实现(非平滑曲线)
    Vant4 List列表组件:资源中有项目合集案列
    隔离出来的“陋室铭”
    TiUP 命令概览
    使用Django框架快速搭建个人网站
    C++第十单元 查找与检索10.1 顺序查找10.2 二分查找
    【个人成长】高效能人士的七个习惯
  • 原文地址:https://blog.csdn.net/u011628753/article/details/126876851