• springcloudalibaba架构(25):RocketMQ事务消息


    前言

    RocketMQ有特有的事物消息机制,事务消息是其它所有消息中间件所不具备的。
    RocketMQ提供事务消息,通过事务消息就能达到分布式事务的最终一致性。

    环境说明

    使用的rocketmq-spring-boot-starter版本

    <dependency>
                <groupId>org.apache.rocketmqgroupId>
                <artifactId>rocketmq-spring-boot-starterartifactId>
                <version>2.2.2version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    springboot版本2.3.2.RELEASE
    springcloud版本Hoxton.SR8
    springcloud-alibaba版本2.2.5.RELEASE

    本章代码已分享至Gitee: https://gitee.com/lengcz/springcloudalibaba01.git

    事务消息概述

    事务消息可以分为两个流程: 正常事务消息的发送及提交流程、事务消息的补偿流程,分别解析如下:
    在这里插入图片描述

    概念解释

    • 半事务消息: 暂不能投递的消息,发送方已经成功地将消息发送到RocketMQ服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记为“暂不能投递”状态,处于这种状态下的消息即为半事务消息。
    • 消息回查:由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,RocketMQ服务端通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或者Rollback),该询问过程即消息回查。

    (1)正常的事务消息的发送及提交流程

    • 发送半消息
    • 服务端响应消息写入结果
    • 根据发送结果执行本地事务(如果写入失败,此时半消息对业务不可见,本地逻辑不执行)
    • 根据本地事务状态执行Commit或者Rollback(Commint操作生成消息索引,消息对消费者可见)。

    (2)事务消息的补偿流程

    • 对没有Commit或Rollback的事务消息(Pending状态的消息),从服务端发起一次”回查“。
    • Producer收到回查消息,检查回查消息对应的本地事务的状态。
    • 根据本地事务的状态,重新Commit或Rollback

    代码示例

    1. 因为使用到事务,首先在common模块新建一个记录事务的实体类
    package com.lcz.pojo;
    
    import lombok.Data;
    
    import javax.persistence.Entity;
    import javax.persistence.Id;
    import java.util.Date;
    
    //消息事务状态记录
    @Entity(name="tx_log")
    @Data
    public class TxLog {
    
        @Id
        private String txId;
    
        private Date date;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述

    1. 发送半事务消息,使用本地事务创建订单(以下环节均在order模块里实现)
    import com.lcz.dao.OrderDao;
    import com.lcz.dao.TxLogDao;
    import com.lcz.pojo.Order;
    import com.lcz.pojo.TxLog;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Service;
    import org.springframework.transaction.annotation.Transactional;
    
    import java.util.Date;
    import java.util.UUID;
    
    @Service
    @Slf4j
    public class OrderServiceImpl4 {
    
        @Autowired
        private OrderDao orderDao;
    
        @Autowired
        private TxLogDao txLogDao;
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        public void createOrderBefore(Order order) {
            String uuid = UUID.randomUUID().toString();//设置唯一的事务id
            log.info("创建订单:"+uuid+",发送半事务消息");
            //发送半消息
            rocketMQTemplate.sendMessageInTransaction(
                    "tx_topic",
                    MessageBuilder.withPayload(order).setHeader("txId",uuid).build(),
                    order);
        }
    
        @Transactional //本地事务,要么同时成功,要么同时失败
        public void createOrder(String txId,Order order) {
            log.info("创建订单:"+txId+",创建成功");
            //保存订单
            orderDao.save(order);
            TxLog txLog = new TxLog();
            txLog.setTxId(txId);
            txLog.setDate(new Date());
            //记录事务日志
            this.txLogDao.save(txLog);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    1. 实现RocketMQLocalTransactionListener 的两个方法,一个执行本地事务,一个负责回查。
    import com.lcz.dao.TxLogDao;
    import com.lcz.pojo.Order;
    import com.lcz.pojo.TxLog;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
    import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.Message;
    import org.springframework.stereotype.Service;
    
    
    @Service
    @RocketMQTransactionListener
    @Slf4j
    public class OrderServiceImpl4Listener implements RocketMQLocalTransactionListener {
    
        @Autowired
        private OrderServiceImpl4 orderServiceImpl4;
    
        @Autowired
        private TxLogDao txLogDao;
    
        //执行本地事务
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
            //第一个参数message对应的消息,第二个参数对应的arg
            String txId = (String)message.getHeaders().get("txId");
            try{
                log.info("执行事务,txId:"+txId);
                Order order=(Order) o;
                orderServiceImpl4.createOrder(txId,order);
                log.info("执行事务成功,txId:"+txId);
                return RocketMQLocalTransactionState.COMMIT; //本地事务成功
            }catch (Exception e){
                log.info("执行事务失败,txId:"+txId);
                return RocketMQLocalTransactionState.ROLLBACK; //本地事务失败
            }
        }
    
        //消息回查
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
            String txId = (String)message.getHeaders().get("txId");
            log.info("事务回查,txId:"+txId);
           TxLog txLog= txLogDao.findById(txId).get();
           if(null!=txLog){
               //本地事务成功了
               return RocketMQLocalTransactionState.COMMIT;
           }
           return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    1. 在controller里调用创建订单的方法
        @Autowired
        private OrderServiceImpl4 orderService;//用的这个service
        @Autowired
        private ProductService productService;
        
      @GetMapping("/order/prod/{pid}")
        public Order order(@PathVariable Integer pid) {
            log.info("调用商品服务,调用商品微服务查询此商品");
    
            /**
             * Ribbon负载均衡
             */
            Product product = productService.findById(pid);
    
            if(product.getPid()<1){
                Order order=new Order();
                order.setOid(-1L);
                order.setPname("下单失败");
                return order;
            }
    
            log.info("查询到的商品内容:" + JSON.toJSONString(product));
    
            Order order = new Order();
            order.setUid(1);
            order.setUsername("测试");
            order.setPid(pid);
            order.setPrice(product.getPrice());
            order.setPname(product.getPname());
            order.setNumber(1);
    
            orderService.createOrderBefore(order);
            log.info("用户下单成功,订单信息为:" + JSON.toJSON(order));
    
            return order;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    1. 以debug模式启动order模块,同时启动product模块,发送创建订单请求(注意先清空控制台日志),可以用户发送半事务消息,创单成功。
      在这里插入图片描述

    模拟回查

    上面我们的事务是成功的,我们通过debug模式,在执行事务和消息回查添加断点,发送下单请求,当请求进入执行事务时,直接杀死线程(模拟宕机),然后重启应用,将会进行消息回查。
    解释:
    txId 事务id,因为我们在创建订单时,使用@Transactional 本地事务,所以当创建订单成功时,事务表里也会有这条对应的txId,如果查不到这个txId,则表示事务是失败的,需要回滚,半事务消息就不会被投递。
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    事务消息的使用限制

    • 事务消息不支持延时消息和批量消息
    • 为了避免单个消息被检查太多次而导致队列中消息积累,默认将单个消息的检查次数限制为15次,但是用户可以根据Broker配置文件里的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次(N=transactionCheckMax),则Broker将丢弃此消息,并在默认情况下打印错误日志。用户可以通过重写AbstractTransactionCheckListener类来修改这个行为。
    • Broker配置文件中的参数transactionMsgTimeout指定了事务消息的回查时间。当发送事务消息时,用户还可以通过设置用户属性(CHECKIMMUNITYTIMEINSECONDS来修改这个限制,该参数由于transactionMsgTimeout参数。
    • 事务性消息可能不止一次被检查或消费,需要保证幂等性。
    • 提交给用户的目标主题可能会失败,这以日志的记录而定。它的高可用性通过RocketMQ本身的高可用性机制来保证,如果希望确保事务消息不丢弃并且事务的完整性得到保证,建议使用同步的双重写入机制。
    • 事务消息的生产者ID不能与其它类型消息的生产者ID共享。与其它类型的消息不同,事务消息允许反向查询,MQ服务器能够通过它们的生产者ID查询消费者。
  • 相关阅读:
    web端三维重建算法-colmap++
    总结springboot项目中一些后端接收前端传参的方法
    Django REST Framework完整教程-RESTful规范-序列化和反序列数据-数据视图
    分布式事务专题-基本概念
    如何使用责任链默认优雅地进行参数校验?
    【狂神说Java】redis
    Leetcode—137.只出现一次的数字II【中等】
    VUE基础快速入门
    OpenAI新推出的Sora是什么?如何注册使用?
    实用的 “edge://flags“
  • 原文地址:https://blog.csdn.net/u011628753/article/details/126343030