• Springboot整合RokectMQ


    Springboot整合RokectMQ

    1.开发环境搭建

    1.创建Springboot工程引入相关依赖

    
    <dependency>
       <groupId>org.apache.rocketmqgroupId>
       <artifactId>rocketmq-spring-boot-starterartifactId>
       <version>2.0.3version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.修改配置文件

    rocketmq:
      #指定nameserver地址,如果是集群多个服务器之间用分号隔开
      name-server: 192.168.79.203:9876;192.168.79.204:9876
      producer:
        #指定消费者组名
        group: llp
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.发送普通消息

    1.生产者

    ProducerController.java

    @RestController
    public class ProducerController {
        //模板,帮助我们去获取连接;感情自然流露:redisTemplate kafkaTemplate rabbitTemplate  jdbcTemplate
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
        
        @RequestMapping("/convertAndSend")
        public String convertAndSend() {
            User user = new User("llp", "110");
            rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
            //rocketmq-spring-boot-starter依赖中包含了fastJson
            return JSON.toJSONString(user);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    User.java

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    //在实际开发中生产者和消费者在不通的服务器上,数据的传输需要通过远程调用的方式,自然就涉及到对象的序列化问题了
    public class User implements Serializable {
        private static final long serialVersionUID = 4894770668175892723L;
        String userName;
        String userId;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.消费者

    /**
     * 1.通过实现RocketMQListener接口标识这个类是一个消费者
     * public interface RocketMQListener {
     *     void onMessage(T message);
     * }
     * 2.RocketMQListener接口可以指定泛型,比如我们生产者发送的是user对象,则可以指定泛型为User
     * 当然指定了泛型,onMessage方法参数类型也是明确的(User类型)
     * 3.除了标识这是一个消费者外,我们还需补充一些参数,比如:
     * consumerGroup 消费者组
     * topic 主题
     * messageModel 消息模式 BROADCASTING("BROADCASTING"),广播 CLUSTERING("CLUSTERING"); 均发
     * consumeThreadMax 最大消费线程数
     * selectorType 选择消息类型
     * selectorExpression 选择消息表达式
     * 4.如果RocketMQListener没有指定泛型则onmessage接收object对象,需要自行处理
     * 当然如果指定了泛型也就限定了消费者所能够接收的消息类型
     *
     */
    @Component
    @RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic")
    public class ConsumerListener implements RocketMQListener<User> {
    
        /**
         * 接收消息
         * @param message 消息对象
         */
        @Override
        public void onMessage(User message) {
            System.out.println(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    3.运行测试

    image-20220911192102107

    3.发同步消息

    1.生产者

        @RequestMapping("/syncSend")
        public String syncSend() {
            User user = new User("llp", "110");
            /**
             * 从下面这段代码可以看到,底层会根据冒号进行拆分
             * 第一个元素作为Topic
             * 如果拆分后的数组长度大于1则第二个元素作为Tag
             *
             *String[] tempArr = destination.split(":", 2);
             *  String topic = tempArr[0];
             *  String tags = "";
             *  if (tempArr.length > 1) {
             *      tags = tempArr[1];
             *  }
             */
            rocketMQTemplate.syncSend("syncSendTopic:syncSendTag", user);
            return JSON.toJSONString(user);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2.消费者

    /**
     * SelectorType selectorType() default SelectorType.TAG;
     * String selectorExpression()default "*";
     */
    @Component
    @RocketMQMessageListener(consumerGroup = "llp", topic = "syncSendTopic", selectorType = SelectorType.TAG, selectorExpression = "syncSendTag")
    //@RocketMQMessageListener(consumerGroup = "llp", topic = "syncSendTopic")
    public class SyncConsumerListener implements RocketMQListener<User> {
    
        @Override
        public void onMessage(User message) {
            System.out.println(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    4.发送异步消息

    异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    1.生产者

    /**
     * 发送异步消息
     * @return
     */
    public String asyncSend() {
        User user = new User("llp", "110");
    
        rocketMQTemplate.asyncSend("asyncTopic", user, new SendCallback() {
            //发送成功回调方法
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功,发送结果:" + sendResult);
            }
            //发送失败回调方法
            @Override
            public void onException(Throwable e) {
                System.out.println("发送失败:" + e.getMessage());
            }
        });
        return JSON.toJSONString(user);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2.消费者

    @Component
    @RocketMQMessageListener(consumerGroup = "llp",topic = "asyncTopic")
    public class AsyncConsumerListener implements RocketMQListener<User> {
        @Override
        public void onMessage(User message) {
            System.out.println("消费者接收消息:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.运行测试

    发送成功,发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801025F0818B4AAC2382C3BB10008, offsetMsgId=C0A84FCC00002A9F000000000003F5F8, messageQueue=MessageQueue [topic=asyncTopic, brokerName=broker-b, queueId=1], queueOffset=0]
    消费者接收消息:User(userName=llp, userId=110)
    
    • 1
    • 2

    5.发送单向消息

    这种方式主要用在不特别关心发送结果的场景,例如日志发送。

    1.生产者

    @RequestMapping("/sendOneWay")
    public String sendOneWay() {
        User user = new User("llp", "110");
        rocketMQTemplate.sendOneWay("sendOneWayTopic",user);
        return JSON.toJSONString(user);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.消费者

    @Component
    @RocketMQMessageListener(consumerGroup = "llp",topic = "sendOneWayTopic")
    public class SendOneWayConsumerListener implements RocketMQListener<User> {
    
        @Override
        public void onMessage(User message) {
            System.out.println("消费者接收消息:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    6.两种消费消息模式

    1.负载均衡模式

    消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。这也是rocketmq默认的消费模式

    1.生产者
    @RequestMapping("/convertAndSend")
    public String convertAndSend() {
        User user = new User("llp", "110");
        rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
        //rocketmq-spring-boot-starter依赖中包含了fastJson
        return JSON.toJSONString(user);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2.消费者1
    @Component
    @RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic")
    public class LoadBalanceConsumerListener implements RocketMQListener<User> {
    
        /**
         * 接收消息
         * @param message 消息对象
         */
        @Override
        public void onMessage(User message) {
            System.out.println("消费者接收消息:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    3.消费者2
    @Component
    @RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.CLUSTERING)
    public class LoadBalanceConsumerListener2 implements RocketMQListener<User> {
    
        /**
         * 接收消息
         * @param message 消息对象
         */
        @Override
        public void onMessage(User message) {
            System.out.println("消费者接收消息:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    4.测试结果

    image-20220911220119354

    2.广播模式

    1.生产者
    @RequestMapping("/convertAndSend")
    public String convertAndSend() {
        User user = new User("llp", "110");
        rocketMQTemplate.convertAndSend("convertAndSendTopic", user);
        //rocketmq-spring-boot-starter依赖中包含了fastJson
        return JSON.toJSONString(user);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    2.消费者1
    @Component
    @RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.BROADCASTING)
    public class SubscribeConsumerListener implements RocketMQListener<User> {
        @Override
        public void onMessage(User message) {
            System.out.println("消费者1接收消息:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    3.消费者2
    @Component
    @RocketMQMessageListener(consumerGroup = "llp",topic = "convertAndSendTopic",messageModel = MessageModel.BROADCASTING)
    public class SubscribeConsumerListener2 implements RocketMQListener<User> {
        @Override
        public void onMessage(User message) {
            System.out.println("消费者2接收消息:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    7.延时消息

    1.生产者

    @RequestMapping("/delaySend")
    public String delaySend() {
        User user = new User("llp", "110");
        //延迟等级,这里2级对应延迟5秒
        // org/apache/rocketmq/store/config/MessageStoreConfig.java
        //private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
        rocketMQTemplate.syncSend("delaySendTopic", MessageBuilder.withPayload(user).build(), 2000, 2);
        return JSON.toJSONString(user);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.消费者

    @Component
    @RocketMQMessageListener(consumerGroup = "llp",topic = "delaySendTopic")
    public class DelayConsumerListener implements RocketMQListener<User> {
        @Override
        public void onMessage(User message) {
            System.out.println("消费者接收消息:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    8.顺序消息

    1.先看一个问题:消息错乱

    原因

    消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

    顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的

    queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

    但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,

    则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分

    区有序,即相对每个queue,消息都是有序的。

    下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消

    息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

    image-20220911224918007

    image-20220911225029164

    1.生产者

    @RequestMapping("/orderSend")
    public void orderSend() {
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        for (OrderStep orderStep : orderSteps) {
            //SendResult sendResult = rocketMQTemplate.syncSendOrderly("orderSendTopic", orderStep, String.valueOf(2000));
            Message<OrderStep> message = MessageBuilder.withPayload(orderStep).build();
            SendResult sendResult = rocketMQTemplate.syncSendOrderly("orderSendTopic", message, String.valueOf(orderStep.getOrderId()));
            System.out.println("发送结果:" + sendResult);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    /**
     * 订单的步骤
     */
    @Data
    public class OrderStep {
        private long orderId;
        private String desc;
    
        /**
         * 生成模拟订单数据
         */
        public static List<OrderStep> buildOrders() {
            List<OrderStep> orderList = new ArrayList<OrderStep>();
    
            OrderStep orderDemo = new OrderStep();
            orderDemo.setOrderId(15103111039L);
            orderDemo.setDesc("创建");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103111065L);
            orderDemo.setDesc("创建");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103111039L);
            orderDemo.setDesc("付款");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103117235L);
            orderDemo.setDesc("创建");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103111065L);
            orderDemo.setDesc("付款");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103117235L);
            orderDemo.setDesc("付款");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103111065L);
            orderDemo.setDesc("完成");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103111039L);
            orderDemo.setDesc("推送");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103117235L);
            orderDemo.setDesc("完成");
            orderList.add(orderDemo);
    
            orderDemo = new OrderStep();
            orderDemo.setOrderId(15103111039L);
            orderDemo.setDesc("完成");
            orderList.add(orderDemo);
    
            return orderList;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    2.消费者

    @Component
    @RocketMQMessageListener(consumerGroup = "llp2",topic = "orderSendTopic")
    public class OrderConsumerListener implements RocketMQListener {
        @Override
        public void onMessage(Object message) {
            System.out.println("消费者接收到消息:"+message.toString());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.测试结果

    消费者接收到消息:{orderId=15103111039, desc=创建}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC238866135002A, offsetMsgId=C0A84FCB00002A9F000000000007BD60, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=147]
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886613C002E, offsetMsgId=C0A84FCB00002A9F000000000007BE65, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=63]
    消费者接收到消息:{orderId=15103111065, desc=创建}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886613E0030, offsetMsgId=C0A84FCB00002A9F000000000007BF6A, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=148]
    消费者接收到消息:{orderId=15103111039, desc=付款}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661410035, offsetMsgId=C0A84FCB00002A9F000000000007C06F, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=149]
    消费者接收到消息:{orderId=15103117235, desc=创建}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661430038, offsetMsgId=C0A84FCB00002A9F000000000007C174, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=64]
    消费者接收到消息:{orderId=15103111065, desc=付款}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC238866145003C, offsetMsgId=C0A84FCB00002A9F000000000007C279, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=150]
    消费者接收到消息:{orderId=15103117235, desc=付款}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC2388661470040, offsetMsgId=C0A84FCB00002A9F000000000007C37E, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=2], queueOffset=65]
    消费者接收到消息:{orderId=15103111065, desc=完成}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614A0044, offsetMsgId=C0A84FCB00002A9F000000000007C483, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=151]
    消费者接收到消息:{orderId=15103111039, desc=推送}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614D0048, offsetMsgId=C0A84FCB00002A9F000000000007C588, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=152]
    消费者接收到消息:{orderId=15103117235, desc=完成}
    发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A801027C2418B4AAC23886614F004C, offsetMsgId=C0A84FCB00002A9F000000000007C68D, messageQueue=MessageQueue [topic=orderSendTopic, brokerName=broker-a, queueId=3], queueOffset=153]
    消费者接收到消息:{orderId=15103111039, desc=完成}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    9.事务消息

    1. 正常事务过程

    2. 事务补偿过程

    image-20220911232840280

    事务消息状态

    1. 提交状态:允许进入队列,此消息与非事务消息无区别
    2. 回滚状态:不允许进入队列,此消息等同于未发送过
    3. 中间状态:完成了half消息的发送,未对MQ进行二次状态确认
    4. 注意:事务消息仅与生产者有关,与消费者无关

    1.生产者

    application.yml

    transaction:
      group: llp-transaction
    
    • 1
    • 2

    生产者代码

    @Value(value = "${transaction.group}")
    private String transactionGroup;
    
    @RequestMapping("/sendTransactionMsg")
    public void sendTransactionMsg() {
        User user = new User("llp", "110");
        Message<User> message = MessageBuilder.withPayload(user).build();
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(transactionGroup, "transactionTopic", message, null);
        System.out.println("发送事务消息,发送结果:"+sendResult);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    事务消息监听

    /**
     * 事务消息Listener
     *
     */
    @RocketMQTransactionListener(txProducerGroup = "llp-transaction")
    public class TransactionMsgListener implements RocketMQLocalTransactionListener {
    
        /**
         * 执行本地事务
         * 如果本地事务返回UNKNOWN,会进行事务补偿,自动执行下面的checkLocalTransaction方法
         *
         * @param msg
         * @param arg
         * @return
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println("执行本地事务=====");
            System.out.println(msg.getPayload());
            //模拟提交事务
            //return RocketMQLocalTransactionState.COMMIT;
            //模拟回滚事务
            //return RocketMQLocalTransactionState.ROLLBACK;
            //让去check本地事务状态 进行事务补偿
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    
        /**
         * 检测本地事务状态
         * 事务补偿过程
         * 当消息服务器没有收到消息生产者的事务提交或者回滚确认时,会主动要求消息生产者进行确认,
         * 消息生产者便会去检测本地事务状态,该过程称为事务补偿过程
         *
         * @param msg
         * @return
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
            System.out.println("执行事务补偿======");
            //事务补偿提交
            return RocketMQLocalTransactionState.COMMIT;
            //事务补偿回滚
            //return RocketMQLocalTransactionState.ROLLBACK;
            //如果事务补偿过程还是UNKNOWN 就会一直进行事务补偿,60s一次
            //return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    2.消费者

    @Component
    @RocketMQMessageListener(consumerGroup = "llp-transaction",topic = "transactionTopic")
    public class TransactionConsumerListener implements RocketMQListener {
        @Override
        public void onMessage(Object message) {
            System.out.println("消费者接收到消息:"+message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    3.测试结果
    执行本地事务=====
    [B@4174e858
    发送事务消息,发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A80102506818B4AAC2389ADEBA0000, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionTopic, brokerName=broker-b, queueId=2], queueOffset=2]
    执行事务补偿======
    消费者接收到消息:{userName=llp, userId=110}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    10.批量消息

    如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:

    List<Message> msgList = new ArrayList<>(); 
    msgList.add(new Message("topic6", "tag1", "msg1".getBytes())); 
    msgList.add(new Message("topic6", "tag1", "msg2".getBytes()));
    msgList.add(new Message("topic6", "tag1", "msg3".getBytes()));
    rocketMQTemplate.syncSend("topic8",msgList,1000);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    # 发送时间超时时间
    rocketmq.producer.send-message-timeout=300000
    #异步消息发送失败重试次数
    rocketmq.producer.retry-times-when-send-async-failed=0
    #消息发送失败后的最大重试次数
    rocketmq.producer.retry-times-when-send-failed=2
    #消息Body超过多大开始压缩(Consumer收到消息会自动解压缩),单位字节
    rocketmq.producer.compress-message-body-threshold=4096
    #消息最大容量
    rocketmq.producer.max-message-size=4194304
    rocketmq.producer.retry-next-server=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

    public class ListSplitter implements Iterator<List<Message>> {
       private final int SIZE_LIMIT = 1024 * 1024 * 4;
       private final List<Message> messages;
       private int currIndex;
       public ListSplitter(List<Message> messages) {
               this.messages = messages;
       }
        @Override 
        public boolean hasNext() {
           return currIndex < messages.size();
       }
       	@Override 
        public List<Message> next() {
           int nextIndex = currIndex;
           int totalSize = 0;
           for (; nextIndex < messages.size(); nextIndex++) {
               Message message = messages.get(nextIndex);
               int tmpSize = message.getTopic().length() + message.getBody().length;
               Map<String, String> properties = message.getProperties();
               for (Map.Entry<String, String> entry : properties.entrySet()) {
                   tmpSize += entry.getKey().length() + entry.getValue().length();
               }
               tmpSize = tmpSize + 20; // 增加日志的开销20字节
               if (tmpSize > SIZE_LIMIT) {
                   //单个消息超过了最大的限制
                   //忽略,否则会阻塞分裂的进程
                   if (nextIndex - currIndex == 0) {
                      //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                      nextIndex++;
                   }
                   break;
               }
               if (tmpSize + totalSize > SIZE_LIMIT) {
                   break;
               } else {
                   totalSize += tmpSize;
               }
    
           }
           List<Message> subList = messages.subList(currIndex, nextIndex);
           currIndex = nextIndex;
           return subList;
       }
    }
    //把大的消息分裂成若干个小的消息
    ListSplitter splitter = new ListSplitter(messages);
    while (splitter.hasNext()) {
      try {
          List<Message>  listItem = splitter.next();
        rocketMQTemplate.syncSend("topic8",listItem,1000);
      } catch (Exception e) {
          e.printStackTrace();
          //处理error
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
  • 相关阅读:
    提升Mac运行速度的三大方法
    Verilog模块的结构、数据类型、变量和基本运算符号
    云计算 2月26号 (进程管理和常用命令)
    【AGC】增长服务1-远程配置示例
    Windows 事件查看器记录到 MYSQL
    数据库之MySQL查询去重数据
    万宾科技管网水位监测预警,管网水位的特点有哪些?
    ShardingSphere实现数据库读写分离,主从库分离,docker详细教程
    C++ - map 和 set 的模拟实现 - 红黑树当中的仿函数 - 红黑树的迭代器实现
    王道数据结构2(线性表)
  • 原文地址:https://blog.csdn.net/qq_44981526/article/details/126811458