• RocketMQ第二话 -- RocketMQ事务消息、延时消息实现


    本文主要记录Springboot集成RocketMQ来实现延时消息和事务消息

    1.坏境

    基于springboot2.6.8,客户端RocketMQ4.9.3,rocket依赖如下

    <dependency>
    	<groupId>org.apache.rocketmq</groupId>
    	<artifactId>rocketmq-spring-boot-starter</artifactId>
    	<version>2.2.2</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    yaml配置

    rocketmq:
      name-server: 192.168.0.221:9876
      producer:
        group: producer_group
        #默认3000 过短会报timeout错误
        send-message-timeout: 10000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    1.定时延时消息

    适用场景:对某个数据在多久后做过期或者做通知之类的。
    为什么要选择RocketMQ?

    • RocketMQ存在一种定时的延时队列,功能是将消息丢到该队列中会排在某个时间轮上,时间到了后才会丢给broker转发到消费者。
    • 能够持久化保存
      如果使用Redis、Mysql等中间件需要定时扫描,非常的消耗性能,这个时候延时队列不得不说是一种好的选择了。

    1.1 生产者

    @Autowired
    RocketMQTemplate rocketMQTemplate;
    
    //SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel)
    //delayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 对应定时的延时时间
    SendResult sendResult = rocketMQTemplate.syncSend("test_topic", MessageBuilder.withPayload(
                        "延迟10秒" + new Date().getTime()).build(), 3000, 3);
    log.info("消息延时推送成功,{},{}", sendResult.getSendStatus(), new Date().getTime());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1.2 消费者

    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "test_topic", consumerGroup = "consume_group")
    public class RocketmqConsume implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String s) {
            log.info("收到消息:{},{}", s, new Date().getTime());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1.3 测试日志

    c.e.r.controller.RocketmqController      : 消息延时推送成功,SEND_OK,1656054586384
    c.e.rocketmq.consume.RocketmqConsume     : 收到消息:延迟10秒1656054586381,1656054595694
    
    • 1
    • 2

    可以看到时间误差是在一秒内的,在某些场景非常的实用。

    2.事务消息

    实用场景:常说的二者不可兼得,有这么一个场景:
    应用需要在一条消息传递过程中记录一些什么内容成功后再传给最终的消费者
    那么RocketMQ中使用事务消息操作流程应该是这样的:

    • 发送一条事务消息给MQ
    • 消息发送成功,执行事务模块
    • 根据事务模块给出的结果(Commit、Rollback、UNKNOWN),再发送给MQ
    • 如果是Commit状态再下发给消费者,如果是Rollback直接删掉消息
    • 事务模块出现(UNKNOWN、无响应、MQ服务重启)这几种情况,提供了定时任务回查状态,默认15次,超过次数则丢弃该消息

    2.1 生产者

    @Autowired
    RocketMQTemplate rocketMQTemplate;
    
    
    for (int i = 0; i < 10; i++) {
    	//模拟十个订单入库 计算返利
        rocketMQTemplate.sendMessageInTransaction("transaction_topic",
                MessageBuilder.withPayload(UUID.randomUUID().toString()).build(), i);
        Thread.sleep(1000);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2.2 生产事务执行者

    @Slf4j
    @Component
    //在目前版本中仅能注解一个是我执行者
    @RocketMQTransactionListener
    public class TransactionExcetor implements RocketMQLocalTransactionListener {
    
        Map<String, Boolean> resMap = new ConcurrentHashMap<>();
    
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
            String orderId = new String((byte[]) message.getPayload());
            //每三个订单成功一个 即订单成功入库,发送消息到账务进行返利转账
            boolean res = Integer.valueOf(o.toString()) % 3 == 0;
            log.info("开始执行本地事务:{},{}", orderId, o.toString(), res);
            //存储回查状态
            resMap.put(orderId, res);
            //commit成功 UNKNOWN未知状态 ROLLBACK在这一步就直接回滚了
            return res ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.UNKNOWN;
        }
    
    	//UNKNOWN、服务重启、无响应情况下提供事务回查 最终状态自己控制
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            String orderId = new String((byte[]) message.getPayload());
            boolean res = resMap.get(orderId);
            log.info("执行事务检查:{},{}", orderId, res);
            return res ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    2.3 消费者

    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "transaction_topic", consumerGroup = "consume_group1")
    public class RocketmqConsume1 implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String s) {
            log.info("收到返利消息:{},{}", s, new Date().getTime());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    2.4 测试日志

    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:c82fae4d-6203-446e-976d-10de860fff9b,1
    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:4cf60f44-f088-48c0-bdb7-c73ee7af53f9,2
    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:8b49aefd-1057-44cc-a832-3e6882d31962,3
    c.e.rocketmq.consume.RocketmqConsume1    : 收到返利消息:8b49aefd-1057-44cc-a832-3e6882d31962,1656054589418
    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:04fe4c7b-5658-4499-b2ec-087a74db6177,4
    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:19ad92dd-8f20-416a-95ae-5ed5247a8df5,5
    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:fa77df40-d2ae-4cd5-a947-2e61eb61f62a,6
    c.e.rocketmq.consume.RocketmqConsume1    : 收到返利消息:fa77df40-d2ae-4cd5-a947-2e61eb61f62a,1656054592444
    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:8724bd01-4cc9-4ae4-b19a-27caeeacfaff,7
    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:b6310015-ae72-4e1b-ba0f-1d8273aa81c1,8
    c.e.rocketmq.consume.TransactionExcetor  : 开始执行本地事务:a81e290a-3611-4a28-b41c-e4a36cf99a4a,9
    c.e.rocketmq.consume.RocketmqConsume1    : 收到返利消息:a81e290a-3611-4a28-b41c-e4a36cf99a4a,1656054595464
    c.e.rocketmq.consume.TransactionExcetor  : 执行事务检查:4cf60f44-f088-48c0-bdb7-c73ee7af53f9,false
    c.e.rocketmq.consume.TransactionExcetor  : 执行事务检查:c82fae4d-6203-446e-976d-10de860fff9b,false
    c.e.rocketmq.consume.TransactionExcetor  : 执行事务检查:04fe4c7b-5658-4499-b2ec-087a74db6177,false
    c.e.rocketmq.consume.TransactionExcetor  : 执行事务检查:19ad92dd-8f20-416a-95ae-5ed5247a8df5,false
    c.e.rocketmq.consume.TransactionExcetor  : 执行事务检查:8724bd01-4cc9-4ae4-b19a-27caeeacfaff,false
    c.e.rocketmq.consume.TransactionExcetor  : 执行事务检查:b6310015-ae72-4e1b-ba0f-1d8273aa81c1,false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3.总结

    特别注意:多个监听者情况下,consumeGroup一定不要是同一个,否则消息会丢失
    其他扩展:

    //多个生产者 可以手写连接代码
    new DefaultMQProducer
    //多个消费者 同上
    new DefaultMQPushConsumer
    
    • 1
    • 2
    • 3
    • 4

    以上就是本章的全部内容了。

    上一篇:RocketMQ第一话 – Docker安装以及Springboot集成RocketMQ
    下一篇:RocketMQ第三话 – RocketMQ高可用集群搭建

    车到山前必有路,柳暗花明又一村

  • 相关阅读:
    C++编程案例讲解-基于结构体的控制台通讯录管理系统
    axios 用formData的方式请求数据
    C语言描述数据结构 —— 常见排序(1)直接插入排序、希尔排序、选择排序、堆排序
    LeetCode //C - 190. Reverse Bits
    GateWay实现负载均衡
    【初阶与进阶C++详解】第十篇:list
    nginx代理本地服务请求,避免跨域;前端图片压缩并上传
    接口测试步骤和场景分析,其实很简单~~
    【每日一题】53. 最大子数组和-2023.11.20
    【零基础学QT】第二章 工程文件内容分析
  • 原文地址:https://blog.csdn.net/qq_35551875/article/details/125428840