• springboot+rocketmq(5):实现批量消息


    一、概述

    1.批量发送消息:

    批量发送消息能显著提高传递小消息的性能限制是这些批量消息应该有相同的 topic,相同的 waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过 4MB

    2.批量接收消息:

    批量接收消息能提高传递小消息的性能,同时与顺序消息配合的情况下,还能根据业务主键对顺序消息进行去重(是否可去重,需要业务来决定),减少消费者对消息的处理

    3.批量消息使用场景

    如果消息过多,每次发送消息都和MQ建立连接,无疑是一种性能开销,批量消息可以把消息打包批量发送,批量发送消息能显著提高传递小消息的性能。

    4.批量消息限制

    批量发送消息能显著提高传递消息的性能。限制是这些批量消息应该有相同的topic,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB

    如果超过可以有2种处理方案:

    1.将消息进行切割成多个小于4M的内容进行发送
    2.修改4M的限制改成更大

    可以设置Producer的maxMessageSize属性
    修改配置文件中的maxMessageSize属性

    对于消费者而言Consumer的MessageListenerConcurrently监听接口的consumeMessage()方法的第一个参数为消息列 表,但默认情况下每次只能消费一条消息可以通过:Consumer的pullBatchSize属性设置消息拉取数量(默认32),可以通过设置consumeMessageBatchMaxSize属性设置消息一次消费数量(默认1)。

    [注意]:pullBatchSize 和 consumeMessageBatchMaxSize并不是设置越大越好,一次拉取数据量太大会导致长时间等待,性能降低。而且消息处理失败同一批消息都会失败,然后进行重试,导致消费时长增加。增加没必要的重试次数

    二、应用

    1.创建Springboot项目,添加rockermq 依赖

    <!--rocketMq依赖-->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.2.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.配置rocketmq

    # 端口
    server:
      port: 8083
    
    # 配置 rocketmq
    rocketmq:
      name-server: 127.0.0.1:9876
      #生产者
      producer:
        #生产者组名,规定在一个应用里面必须唯一
        group: group1
        #消息发送的超时时间 默认3000ms
        send-message-timeout: 3000
        #消息达到4096字节的时候,消息就会被压缩。默认 4096
        compress-message-body-threshold: 4096
        #最大的消息限制,默认为128K
        max-message-size: 4194304
        #同步消息发送失败重试次数
        retry-times-when-send-failed: 3
        #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
        retry-next-server: true
        #异步消息发送失败重试的次数
        retry-times-when-send-async-failed: 3
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    3.创建生产者 发送批量消息—小于4MB

    发送批量消息,最主要的区别是在发送消息的send方法入参一个List。

    package com.example.springbootrocketdemo.controller;
    
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 批量消息
     * @author qzz
     */
    @RestController
    public class RocketMQBatchController {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
    
        /**
         * 发送批量消息
         * 发送批量消息,最主要的区别是在发送消息的send方法入参一个List。
         */
        @RequestMapping("/testBatchSend")
        public void testSyncSend(){
    
            List<Message<String>> messageList = new ArrayList<>();
            for(int i=0;i<10;i++){
                messageList.add(MessageBuilder.withPayload("批量消息"+(i+1)).build());
            }
            //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
            //参数二:消息内容
            SendResult sendResult = rocketMQTemplate.syncSend("test-topic-batch",messageList);
            System.out.println(sendResult);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    SpringBoot给我们提供了RocketMQTemplate模板类,我们利用这个类可以以多种形式发送消息。
    发送方法指定Topic主题test-topic-batch。

    官网也提示:如果批量消息大于1MB就不要用一个批次发送,而要拆分成多个批次消息发送。也就是说,一个批次消息的大小不要超过1MB,实际使用时,这个1MB的限制可以稍微扩大点,实际最大的限制是4194304字节,大概4MB。

    4.新建消息消费者监听RocketMQConsumerListener,监听消息,消费消息

    package com.example.springbootrocketdemo.config;
    
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Service;
    
    /**
     * 消费批量消息
     * 配置RocketMQ监听
     * ConsumeMode.ORDERLY:顺序消费
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-batch",topic = "test-topic-batch",consumeMode = ConsumeMode.ORDERLY)
    public class RocketMQBatchConsumerListener implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String s) {
            System.out.println("consumer 批量消息,收到消息:"+s);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    消费者类要实现RocketMQListener接口,以及动态指定消息类型String。

    类上要加上@RocketMQMessageListener注解,指定topic主题test-topic-batch,以及消费者组test-batch

    5.启动服务,测试效果

    在这里插入图片描述
    从控制台打印可以看出,发送一次批量消息,创建了多了msgId(使用逗号拼接而成),默认消费时也是 一次消费一次。

    那么,可以一次消费多条吗?当然是可以的,不过要进行特殊的设置:

    可以通过设置consumeMessageBatchMaxSize属性设置消息一次消费数量(默认1)

    consumeMessageBatchMaxSize:每次消费(即将多条消息合并为List消费)的最大消息数目,默认值为1,rocketmq-spring-boot-starter目前不支持批量消费(2.1.0版本)

    package com.example.springbootrocketdemo.config;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
    import org.springframework.stereotype.Service;
    
    /**
     * 消费批量消息
     * 配置RocketMQ监听
     * ConsumeMode.ORDERLY:顺序消费
     *
     * RocketMQPushConsumerLifecycleListener:当@RocketMQMessageListener中的配置不⾜以满⾜我们的需求时,可以实现该接⼝直接更改消费者类DefaultMQPushConsumer的配置
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "test-batch",topic = "test-topic-batch",consumeMode = ConsumeMode.ORDERLY)
    public class RocketMQBatchConsumerListener implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {
    
        /**
         * 客户端收到的消息
         * @param s
         */
        @Override
        public void onMessage(String s) {
            System.out.println("consumer 批量消息,收到消息:"+s);
        }
    
        /**
         * 对消费者客户端的一些配置
         * 重写prepareStart方法
         * @param defaultMQPushConsumer
         */
        @Override
        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
            //设置每次消息拉取的时间间隔 单位 毫秒
            defaultMQPushConsumer.setPullInterval(1000);
            //最小消费线程池数
            defaultMQPushConsumer.setConsumeThreadMin(1);
            //最大消费线程池数
            defaultMQPushConsumer.setConsumeThreadMax(10);
            //设置消费者单次批量消费的消息数目上限    默认1
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(3);
            //设置每个队列每次拉取的最大消费数
            defaultMQPushConsumer.setPullBatchSize(16);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    消息监听实现RocketMQPushConsumerLifecycleListener ,重写prepareStart方法,进行配置ConsumeMessageBatchMaxSize。

    我集成的rocketmq-spring-boot-starter 版本是 2.2.1,测试效果 发现 没有起效果。暂时没有找打解决方法。

    5.批量消息的限制使用

    我们需要做什么:

    • 定义消息切割器切割消息
    • 发送消息把消息切割之后,进行多次批量发送

    5.1定义消息切割器

    package com.example.springbootrocketdemo.splitter;
    
    import org.apache.rocketmq.common.message.Message;
    
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    
    /**
     * 消息切割器  按照4M大小切割
     * @author qzz
     */
    public class ListSplitter implements Iterator<List<Message>> {
    
        private final int SIZE_LIMIT = 1024 * 1024 * 4;
    
        private final List<Message> messages;
    
        private int currIndex;
    
        public ListSplitter(List<Message> messages) {
            this.messages = messages;
        }
    
        @Override
        public boolean hasNext() {
            return currIndex < messages.size();
        }
    
        @Override
        public List<Message> next() {
            int startIndex = getStartIndex();
            int nextIndex = startIndex;
            int totalSize = 0;
            for (; nextIndex < messages.size(); nextIndex++) {
                Message message = messages.get(nextIndex);
                int tmpSize = calcMessageSize(message);
                if (tmpSize + totalSize > SIZE_LIMIT) {
                    break;
                } else {
                    totalSize += tmpSize;
                }
            }
            List<Message> subList = messages.subList(startIndex, nextIndex);
            currIndex = nextIndex;
            return subList;
        }
        private int getStartIndex() {
            Message currMessage = messages.get(currIndex);
            int tmpSize = calcMessageSize(currMessage);
            while(tmpSize > SIZE_LIMIT) {
                currIndex += 1;
                Message message = messages.get(currIndex);
                tmpSize = calcMessageSize(message);
            }
            return currIndex;
        }
        private int calcMessageSize(Message message) {
            int tmpSize = message.getTopic().length() + message.getBody().length;
            Map<String, String> properties = message.getProperties();
            for (Map.Entry<String, String> entry : properties.entrySet()) {
                tmpSize += entry.getKey().length() + entry.getValue().length();
            }
            // 增加⽇日志的开销20字节
            tmpSize = tmpSize + 20;
            return tmpSize;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70

    5.2 改造 批量发送方法

    
        /**
         * 发送批量消息(大于4MB)
         * 发送批量消息,最主要的区别是在发送消息的send方法入参一个List。
         */
        @RequestMapping("/testBatchSendSplitter")
        public void testBatchSendSplitter(){
    
            List<Message> messageList = new ArrayList<>();
            for(int i=0;i<1000;i++){
                //添加内容
                byte[] bytes = (("批量消息"+i).getBytes(CharsetUtil.UTF_8));
                messageList.add(new Message("message-topic-batch","message-tag-batch",bytes));
            }
    
            //切割消息
            //把大的消息分裂传给你若干个小的消息
            ListSplitter splitter = new ListSplitter(messageList);
            while(splitter.hasNext()){
                List<Message> listItem = splitter.next();
                //发送消息
                //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
                //参数二:消息内容
                SendResult sendResult = rocketMQTemplate.syncSend("message-topic-batch",messageList,6000);
                System.out.println(sendResult);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    5.3 改造 批量消费监听

    package com.example.springbootrocketbatchdemo.config;
    
    import io.netty.util.CharsetUtil;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
    import org.springframework.stereotype.Service;
    
    import java.util.List;
    
    /**
     * 消费批量消息
     * 配置RocketMQ监听
     * ConsumeMode.ORDERLY:顺序消费
     *
     * RocketMQPushConsumerLifecycleListener:当@RocketMQMessageListener中的配置不⾜以满⾜我们的需求时,可以实现该接⼝直接更改消费者类DefaultMQPushConsumer的配置
     * @author qzz
     */
    @Service
    @RocketMQMessageListener(consumerGroup = "message-batch",topic = "message-topic-batch",consumeMode = ConsumeMode.ORDERLY)
    public class RocketMQBatchSplitterConsumerListener implements RocketMQListener<List<MessageExt>> , RocketMQPushConsumerLifecycleListener {
    
        /**
         * 客户端收到的消息
         * @param s
         */
        @Override
        public void onMessage(List<MessageExt> s) {
            System.out.println("---------------consumer 批量消息,总个数:"+s.size());
            s.forEach(message->{
                System.out.println("consumer 批量消息 body:"+new String(message.getBody(), CharsetUtil.UTF_8));
            });
        }
    
        /**
         * 对消费者客户端的一些配置
         * 重写prepareStart方法
         * @param defaultMQPushConsumer
         */
        @Override
        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
            //设置每次消息拉取的时间间隔 单位 毫秒
            defaultMQPushConsumer.setPullInterval(1000);
            //最小消费线程池数
            defaultMQPushConsumer.setConsumeThreadMin(1);
            //最大消费线程池数
            defaultMQPushConsumer.setConsumeThreadMax(10);
            //设置消费者单次批量消费的消息数目上限    默认1
            defaultMQPushConsumer.setConsumeMessageBatchMaxSize(3);
            //设置每个队列每次拉取的最大消费数
            defaultMQPushConsumer.setPullBatchSize(16);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    54.启动服务,测试效果

    在这里插入图片描述

  • 相关阅读:
    1、javaweb学习知识简析
    IDEA设置背景为自定义照片
    软考高级系统架构师_计算机组成与结构02_高速缓存_磁盘结构_输入输出技术_总线结构_可靠性_---软考高级系统架构师005
    <C++>类和对象——引入
    redis学习六redis的集群:主从复制、CAP、PAXOS、Cluster分片集群(一)
    Tomcat配置域名和端口
    XSS详解
    Aeraki Mesh 正式成为CNCF沙箱项目,腾讯云携手合作伙伴加速服务网格成熟商用
    strerror函数详解 看这一篇就够了-C语言(函数讲解、 使用用法举例、作用)
    走进Redis-扯扯集群
  • 原文地址:https://blog.csdn.net/qq_26383975/article/details/125440014