• RocketMQ 分布式事务消息实战指南:确保数据一致性的关键设计


    在这里插入图片描述

    🔭 嗨,您好 👋 我是 vnjohn,在互联网企业担任 Java 开发,CSDN 优质创作者
    📖 推荐专栏:Spring、MySQL、Nacos、Java,后续其他专栏会持续优化更新迭代
    🌲文章所在专栏:RocketMQ
    🤔 我当前正在学习微服务领域、云原生领域、消息中间件等架构、原理知识
    💬 向我询问任何您想要的东西,ID:vnjohn
    🔥觉得博主文章写的还 OK,能够帮助到您的,感谢三连支持博客🙏
    😄 代词: vnjohn
    ⚡ 有趣的事实:音乐、跑步、电影、游戏

    目录

    前言

    在上一篇文章:保护数据完整性:探索 RocketMQ 分布式事务消息的力量 详细分析了「事务消息设计方面及源码相关层面」讲解,事务半消息的发送及提交、事务消息的补偿过程

    在 RocketMQ 中由于网络故障原因或业务应用程序异常宕机导致事务消息未及时的完成处理,提供了事务消息补偿机制>检查本地事务执行状态的方法,为整个流程二阶段提交完成了不可忽视的异常消息补偿机制。

    接下来,会通过以下两个链路中的第一条链路进行实战演练,确保在 RocketMQ 事务消息处理过程中,这两者的事务状态能够确保一致完成.

    1、创建订单完成、预扣减库存
    2、订单支付完成、实扣减库存

    业务设计流程

    在这里插入图片描述

    1、事务生产者发送半消息到 Broker 服务端的 Half Topic 中,实际发送半消息的 Topic 是真实的 Topic,在这里会被替换为「RMQ_SYS_TRANS_HALF_TOPIC」存储到日志文件中

    2、在 Broker 服务端将半消息存储到日志文件以后,若发送半消息的结果是成功的,那么就会执行「订单服务客户端」本地事务方法「executeLocalTransaction」

    3、会同步等待本地事务方法执行的结果,再根据执行结果、消息体投递消息类型「EndTransactionRequestHeader」给到 Broker 服务端进行处理,该请求由 Broker 服务端「EndTransactionProcessor」进行处理.

    4、在 EndTransactionProcessor 中会根据本地事务处理的结果,进行判别

    1、若本地事务执行成功,在 Broker 服务端会将半消息对应的 Topic 调整为真实的 Topic 消息进行存储到日志文件中,随即在库存服务的消费者才能消费到这条消息,从而再对库存进行扣减,同时标记好半消息,确保在定时检查事务消息时不会再次被扫描到进行处理
    2、若本地事务执行失败,在 Broker 服务端会将半消息标记为「已处理」不会让定时触发的事务消息检查机制进行扫描到

    当然,定时任务的数据处理,不能确保它有时间的误差性,所以说执行成功或执行失败的事务消息,会在补偿机制进行再一次的校验

    业务设计源码

    基础 SQL 脚本

    CREATE TABLE `order` (
      `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
      `order_no` varchar(64) COLLATE utf8mb4_unicode_ci DEFAULT NULL COMMENT '订单编号',
      `amount` bigint DEFAULT NULL COMMENT '订单金额',
      `sku_id` bigint DEFAULT NULL COMMENT '商品skuId',
      `user_id` bigint DEFAULT NULL COMMENT '用户id',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
    
    CREATE TABLE `stock` (
      `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键',
      `sku_id` bigint DEFAULT NULL COMMENT '商品skuId',
      `lock_stock` int DEFAULT NULL COMMENT '锁定库存',
      `stock` int DEFAULT NULL COMMENT '真实库存',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    基础依赖

    <properties>
        <maven.compiler.source>8maven.compiler.source>
        <maven.compiler.target>8maven.compiler.target>
        <spring-boot.version>2.6.7spring-boot.version>
        <jackson.version>2.11.0jackson.version>
        <mysql.version>8.0.17mysql.version>
        <alibaba-druid.version>1.2.8alibaba-druid.version>
        <mybatis-plus.version>3.4.2mybatis-plus.version>
    properties>
    <dependencies>
        <dependency>
            <groupId>com.baomidougroupId>
            <artifactId>mybatis-plus-boot-starterartifactId>
        dependency>
        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>druid-spring-boot-starterartifactId>
        dependency>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>
        <dependency>
            <groupId>org.vnjohngroupId>
            <artifactId>blog-commonartifactId>
            <version>1.0-SNAPSHOTversion>
        dependency>
        <dependency>
            <groupId>mysqlgroupId>
            <artifactId>mysql-connector-javaartifactId>
            <scope>runtimescope>
        dependency>
        <dependency>
            <groupId>org.apache.rocketmqgroupId>
            <artifactId>rocketmq-clientartifactId>
        dependency>
        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    基础配置

    订单服务、库存服务在实际的生产过程中,会各自有各自的库,在本地环境中演练中采用一个库进行模拟.

    订单服务,作为事务消息生产者

    server:
      port: 8088
    
    spring:
      datasource:
        url: jdbc:mysql://127.0.0.1:3306/rocketmq_transaction_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
        username: root
        password: 12345678
        driver-class-name: com.mysql.cj.jdbc.Driver
        type: com.alibaba.druid.pool.DruidDataSource
        druid:
          filters: stat
          maxActive: 30
          initialSize: 1
          maxWait: 10000
          # 保持连接活跃
          keep-alive: true
          minIdle: 1
          timeBetweenEvictionRunsMillis: 60000
          minEvictableIdleTimeMillis: 300000
          validationQuery: select 'x'
          testWhileIdle: true
          testOnBorrow: false
          testOnReturn: false
          poolPreparedStatements: true
          maxOpenPreparedStatements: 20
    
    mybatis-plus:
      configuration:
        log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开启sql日志
      type-aliases-package: org.vnjohn.*.*.model # 注意:对应实体类的路径
    
    rocketmq:
      transaction:
        producer: order_transaction
      bootstraps:
      namesrv-addr: 172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    库存服务,作为事务消息消费者

    server:
      port: 8085
    
    spring:
      datasource:
        url: jdbc:mysql://127.0.0.1:3306/rocketmq_transaction_test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false
        username: root
        password: 12345678
        driver-class-name: com.mysql.cj.jdbc.Driver
        type: com.alibaba.druid.pool.DruidDataSource
        druid:
          filters: stat
          maxActive: 30
          initialSize: 1
          maxWait: 10000
          # 保持连接活跃
          keep-alive: true
          minIdle: 1
          timeBetweenEvictionRunsMillis: 60000
          minEvictableIdleTimeMillis: 300000
          validationQuery: select 'x'
          testWhileIdle: true
          testOnBorrow: false
          testOnReturn: false
          poolPreparedStatements: true
          maxOpenPreparedStatements: 20
    
    mybatis-plus:
      configuration:
        log-impl: org.apache.ibatis.logging.stdout.StdOutImpl # 开启sql日志
      type-aliases-package: org.vnjohn.*.*.model # 注意:对应实体类的路径
    
    rocketmq:
      bootstraps:
      namesrv-addr: 172.16.249.10:9876;172.16.249.11:9876;172.16.249.12:9876
      consumer-group: order_transaction_group
      order:
        create:
          topic: order_transaction
          tag: withholding_stock
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    基础依赖代码库

    /**
     * Spring Context 工具类
     *
     * @author vnjohn
     */
    @Component
    public class SpringContextUtils implements ApplicationContextAware {
        public final static String SPRING_CONTEXT_UTILS_COMPONENT = "springContextUtils";
        public static ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            SpringContextUtils.applicationContext = applicationContext;
        }
    
        /**
         * 获取HttpServletRequest
         */
        public static HttpServletRequest getHttpServletRequest() {
            return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
        }
    
        public static HttpServletResponse getHttpServletResponse() {
            return ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getResponse();
        }
    
        public static Object getBean(String name) {
            return applicationContext.getBean(name);
        }
    
        public static <T> T getBean(Class<T> c) {
            return applicationContext.getBean(c);
        }
    
        public static <T> Map<String, T> getBeanOfType(Class<T> c) {
            return applicationContext.getBeansOfType(c);
        }
    
        public static <T> T getBean(String name, Class<T> requiredType) {
            return applicationContext.getBean(name, requiredType);
        }
    
        public static boolean containsBean(String name) {
            return applicationContext.containsBean(name);
        }
    
        public static boolean isSingleton(String name) {
            return applicationContext.isSingleton(name);
        }
    
        public static Class<? extends Object> getType(String name) {
            return applicationContext.getType(name);
        }
    
        /**
         * 获取当前环境
         */
        public static String getActiveProfile() {
            return applicationContext.getEnvironment().getActiveProfiles()[0];
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    模块代码

    订单服务

    订单 DO 实体

    @Data
    @TableName("`order`")
    @EqualsAndHashCode(callSuper = false)
    public class OrderDO {
        @TableId(type = IdType.AUTO)
        private Long id;
    
        /**
         * 订单编号
         */
        private String orderNo;
    
        /**
         * 订单金额
         */
        private Long amount;
    
        /**
         * 商品id
         */
        private Long skuId;
    
        /**
         * 用户id
         */
        private Long userId;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    订单数据库映射层

    /**
     * @author vnjohn
     * @since 2023/11/15
     */
    public interface OrderMapper extends BaseMapper<OrderDO> {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    订单仓储层

    /**
     * @author vnjohn
     * @since 2023/11/2
     */
    @Component
    public class OrderRepository {
        @Resource
        private OrderMapper orderMapper;
    
        public Order queryByOrderNo(String orderNo) {
            OrderDO orderDO = orderMapper.selectOne(new QueryWrapper<OrderDO>()
                    .lambda().eq(OrderDO::getOrderNo, orderNo));
            return BeanUtils.copy(orderDO, Order.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    库存服务

    库存 DO 实体

    /**
     * @author vnjohn
     * @since 2023/11/15
     */
    @Data
    @TableName("`stock`")
    @EqualsAndHashCode(callSuper = false)
    public class StockDO {
        @TableId(type = IdType.AUTO)
        private Long id;
    
        private Long skuId;
    
        private Integer lockStock;
    
        private Integer stock;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    库存数据库映射层

    /**
     * @author vnjohn
     * @since 2023/11/15
     */
    public interface StockMapper extends BaseMapper<StockDO> {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    库存仓储层

    /**
     * @author vnjohn
     * @since 2023/11/15
     */
    @Slf4j
    @Component
    public class StockRepository {
        @Resource
        private StockMapper stockMapper;
    
        public void preDecreaseStock(String orderNo, Long skuId) {
            // 订单号与 SkuId 可用于做日志记录,这里默认给它数量记为 1
            log.info("订单号:{}", orderNo);
            StockDO stockDO = stockMapper.selectOne(new QueryWrapper<StockDO>()
                    .lambda().eq(StockDO::getSkuId, skuId));
            if (null == stockDO) {
                return;
            }
            int currentLockStock = stockDO.getLockStock() + 1;
            stockDO.setLockStock(currentLockStock);
            // 此处最好采用乐观锁+ CAS 方式进行更新
            stockMapper.updateById(stockDO);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    生成订单

    订单领域实体

    /**
     * @author vnjohn
     * @since 2023/11/2
     */
    @Data
    public class Order {
        /**
         * id
         */
        private Long id;
    
        /**
         * 订单编号
         */
        private String orderNo;
    
        /**
         * 订单金额
         */
        private Long amount;
    
        /**
         * 商品id
         */
        private Long skuId;
    
        /**
         * 用户id
         */
        private Long userId;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    创建订单实体

    /**
     * @author vnjohn
     * @since 2023/11/2
     */
    @Data
    public class CreateOrder {
        /**
         * 订单编号
         */
        private String orderNo;
    
        /**
         * 订单金额
         */
        private Long amount;
    
        /**
         * 商品id
         */
        private Long skuId;
    
        /**
         * 用户id
         */
        private Long userId;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    创建订单领域执行器

    /**
     * @author vnjohn
     * @since 2023/11/2
     */
    @Component
    public class OrderCreateHandler {
        @Resource
        private OrderMapper orderMapper;
    
        public Boolean handle(CreateOrder order) {
            // 在这里模拟订单异常或创建系统
            // 1、订单创建逻辑,涉及到表结构及数据会比较多,这里不做多阐述
            String orderNo = order.getOrderNo();
            return orderMapper.insert(BeanUtils.copy(order, OrderDO.class)) > 0;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    RocketMQ 事务消息

    订单服务生产者

    首先要在订单服务能够投递事务消息,应该先实例化一个事务生产者

    事务生产者实例

    /**
     * 订单服务专门用来提供事务消息的生产者
     *
     * @author vnjohn
     * @since 2023/11/2
     */
    @Slf4j
    @Component
    public class OrderTransactionProducer {
        @Value("${rocketmq.transaction.producer}")
        private String transactionProducerName;
    
        @Value("${rocketmq.namesrv-addr}")
        private String namesrvAddr;
    
        private static TransactionMQProducer PRODUCER;
    
        /**
         * 获取事务生产者实例对象
         *
         * @return 事务生产者实例
         */
        public static TransactionMQProducer getInstance() {
            return PRODUCER;
        }
    
        /**
         * 若未定义线程,检测事务半消息的线程默认只有一个,当同时出现多条事务半消息需要检测时,就退化为队列的方式进行入队,要进行排队处理,从而降低了并发、并行数
         *
         * 

    * public TransactionMQProducer(String namespace, String producerGroup, RPCHook rpcHook) { * super(namespace, producerGroup, rpcHook); * this.checkThreadPoolMinSize = 1; * this.checkThreadPoolMaxSize = 1; * this.checkRequestHoldMax = 2000; * } *

    */
    @PostConstruct public void initTransactionProducer() { try { PRODUCER = new TransactionMQProducer(transactionProducerName); PRODUCER.setNamesrvAddr(namesrvAddr); PRODUCER.setTransactionListener(new OrderTransactionListener()); // 自定义线程池处理,用于追踪消息投递时的日志,能够追踪到具体的投放线程,所必要参数,不设置采用的是默认的线程池 // producer.setExecutorService(); PRODUCER.start(); } catch (MQClientException e) { e.printStackTrace(); log.error("创建事务生产者异常,", e); } } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    事务生产者监听器

    与事务生产者必须绑定好的一个关键>监听器,用这个监听器来判别如何做事务消息的后续处理

    /**
     * 订单服务「本地事务消息-半消息」处理
     *
     * @author vnjohn
     * @since 2023/11/2
     */
    @Slf4j
    public class OrderTransactionListener implements TransactionListener {
        /**
         * unknow 消息最大重试次数设置为 3,当超过 3 次以后进行默认成功且记录到本地日志表中
         */
        private static Integer MAX_RETRY_TIME = 3;
    
        /**
         * 用于存储本地事务执行的结果:事务id->订单id
         */
        private ConcurrentHashMap<String, String> TRANSACTION_ORDER_MAP = new ConcurrentHashMap<>();
    
        /**
         * 用于存储本地事务检查次数的结果:事务id->check 次数
         * 源码中会默认检查为 15 次,一次的时间间隔为 6s,由于那个属于全局的配置
         * 在这里可自定义适配次数,时间还是按照默认的配置来进行处理
         */
        private ConcurrentHashMap<String, Integer> UNKNOW_TRANSACTION_CHECK_MAP = new ConcurrentHashMap<>();
    
    
        /**
         * 执行本地事务的处理逻辑
         *
         * @param message 待发送的消息内容
         * @param o
         * @return
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            // 当前执行的事务 id
            String transactionId = message.getTransactionId();
            CreateOrder createOrder = (CreateOrder) o;
            TRANSACTION_ORDER_MAP.put(transactionId, createOrder.getOrderNo());
            // TODO 捕获创建订单后的执行结果
            try {
                // 伪代码创建订单
                OrderRepository orderRepository = SpringContextUtils.getBean(OrderRepository.class);
                Order order = orderRepository.queryByOrderNo(createOrder.getOrderNo());
                // 订单已存在,不再做处理
                if (Objects.nonNull(order)) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                OrderCreateHandler orderCreateHandler = SpringContextUtils.getBean(OrderCreateHandler.class);
                Boolean handleResult = orderCreateHandler.handle(createOrder);
                if (handleResult) {
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            } catch (Exception orderException) {
                // 消息进行回滚,在消费者那一侧是无法观察此消息的
                log.error("创建订单出现异常,", orderException);
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            // 由于网络问题,导致非业务异常,可能是订单已经写入数据库成功了没有及时地去处理事务消息状态
            // 在这里需要去进行 check 检查本地事务是否执行有误
            return LocalTransactionState.UNKNOW;
        }
    
        /**
         * 事务 producer 会从 broker 获取到未处理的事务消息列表,进行依次处理
         * 检查本地事务状态,当出现「事务消息生产者」宕机时,该方法仍然会对未处理的事务消息进行检测
         * 对于 unknow 消息,会 1s 进行一次定期处理,该参数可调整
         *
         * @param messageExt 消息扩展类信息
         * @return 本地事务执行状态
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            // 检查本地事务表是否存在订单号
            String transactionId = messageExt.getTransactionId();
            log.info("checkLocalTransaction transactionId:{}", transactionId);
            String orderNo = TRANSACTION_ORDER_MAP.get(transactionId);
            // unknow 消息进行重试三次,超出后不再做处理,不然 unknow 消息会一直在控制台打印进行处理,视为无效的工作
            // 增加补偿机制,用于处理 unknow 重试的消息,当重试的消息是提交或回滚状态,则调用相关的方法进行处理
            Integer checkCount = UNKNOW_TRANSACTION_CHECK_MAP.get(transactionId);
            if (Objects.isNull(checkCount) || checkCount < MAX_RETRY_TIME) {
                checkCount = Objects.isNull(checkCount) ? 1 : ++checkCount;
                log.info("transactionId-{},check 检查次数:{}", transactionId, checkCount);
                UNKNOW_TRANSACTION_CHECK_MAP.put(transactionId, checkCount);
                return LocalTransactionState.UNKNOW;
            }
            // 检查次数超出预定阈值,可记录到日志
            if (checkCount.equals(MAX_RETRY_TIME)) {
                log.info("transactionId-{},check 检查次数超出", transactionId);
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
            // 反查订单数据当前订单是否已经创建
            OrderRepository orderRepository = SpringContextUtils.getBean(OrderRepository.class);
            Order order = orderRepository.queryByOrderNo(orderNo);
            if (Objects.nonNull(order)) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101

    事务消息实体

    /**
     * 订单库存消息体
     *
     * @author vnjohn
     * @since 2023/11/2
     */
    @Data
    public class OrderWithStockMessage {
        /**
         * 订单编号
         */
        private String orderNo;
    
        /**
         * 商品id
         */
        private Long skuId;
    
        /**
         * 数量
         */
        private Integer quantity;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    事务消息投递

    /**
     * @author vnjohn
     * @since 2023/11/2
     */
    @Slf4j
    @Component
    public class OrderUnifiedCommandHandler {
    
        public void handler(String orderNo) {
            // 1、订单数据幂等校验成功
            // 2、调用生产者发送事务半消息
            // 假设接收到订单支付已完成的标识
            OrderWithStockMessage stockMessageBody = new OrderWithStockMessage();
            stockMessageBody.setSkuId(1001L);
            stockMessageBody.setQuantity(2);
            stockMessageBody.setOrderNo(orderNo);
            Message stockMessage = new Message(
                    "order_transaction",
                    "withholding_stock",
                    UUID.randomUUID().toString(),
                    JsonUtils.objToJsonStr(stockMessageBody).getBytes()
            );
            CreateOrder order = new CreateOrder();
            order.setSkuId(1001L);
            order.setAmount(100L);
            order.setUserId(888L);
            order.setOrderNo(orderNo);
    
            // putUserProperty 该方法,通过网络传递给 consumer 一些用户自定义参数,可以用来校验做其他的业务逻辑处理
            // stockMessage.putUserProperty("action", );
    
            // 发送的是半消息
            // Message msg, Object arg
            // 第一个参数:本地事务处理成功以后,需要进行发送的消息体内容
            // 第二个参数:作为本地事务用于检测或执行本地事务时的对象体
            try {
                TransactionSendResult transactionSendResult = OrderTransactionProducer.getInstance().sendMessageInTransaction(stockMessage, order);
                log.info("事务执行结果:{}", JsonUtils.objToJsonStr(transactionSendResult.getLocalTransactionState()));
            } catch (MQClientException e) {
                log.error("订单预生成,事务半消息 send fail,", e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    库存服务消费者

    /**
     * 订单消费者,消费来自订单服务投递的消息
     *
     * @author vnjohn
     * @since 2023/11/2
     */
    @Slf4j
    @Component
    public class CreateOrderPreStockConsumer {
        @Value("${rocketmq.consumer-group}")
        private String consumerGroup;
    
        @Value("${rocketmq.namesrv-addr}")
        private String namesrvAddr;
    
        @Value("${rocketmq.order.create.topic}")
        private String orderCreateTopic;
    
        @Value("${rocketmq.order.create.tag}")
        private String orderCreateTag;
    
        @PostConstruct
        public void initCreateOrderConsumer() {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(namesrvAddr);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            try {
                consumer.subscribe(orderCreateTopic, orderCreateTag);
                consumer.registerMessageListener(new CreateOrderStockMessageListener());
                consumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
    
        public static class CreateOrderStockMessageListener implements MessageListenerConcurrently {
    
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                MessageExt messageExt = list.get(0);
                String msgId = messageExt.getMsgId();
                byte[] body = messageExt.getBody();
                String bodyValue = new String(body);
                log.info("当前订单创建成功,预扣减库存信息:{}", bodyValue);
                String orderNo = JSON.parseObject(bodyValue).getString("orderNo");
                Long skuId = JSON.parseObject(bodyValue).getLong("skuId");
                StockRepository stockRepository = SpringContextUtils.getBean(StockRepository.class);
                stockRepository.preDecreaseStock(orderNo, skuId);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    //            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    Tips

    采用两阶段提交的事务消息,先提交一个半消息,然后执行本地事务,再发送一个 commit 的半消息;若这个 commit 半消息失败了,MQ 是基于第一个半消息不断的反查本地事务执行状态来进行后续流程的推进的,这样只有当本地事务提交成功,最终 MQ 消息也会发送成功,若本地事务 rollback,那么 MQ 消息不会再进行发送,会标记这条半消息的状态为「已处理」从而保证了两者之间的一致性

    执行本地事务方法+本地事务定时检查,结合起来来保证事务消息执行的一致性

    总结

    该篇博文主要是通过实际的业务代码来进行 RocketMQ 事务消息实战,上一篇博文从 RocketMQ 事务消息的整体设计以及相关的源码的讲解,这篇通过订单生成、库存预扣减的简单例子来对事务消息的这块流程进行细粒化的业务设计,事务消息生产者的本地事务消息与补偿事务消息结合起来保证订单创建成功以后,库存才进行预扣减,希望这篇简单的 RocketMQ 事务消息实战博文能够帮助到您理解事务消息的实际应用,期待三连支持!

    🌟🌟🌟愿你我都能够在寒冬中相互取暖,互相成长,只有不断积累、沉淀自己,后面有机会自然能破冰而行!

    博文放在 微信体系 专栏里,欢迎订阅,会持续更新!

    如果觉得博文不错,关注我 vnjohn,后续会有更多实战、源码、架构干货分享!

    推荐专栏:Spring、MySQL,订阅一波不再迷路

    大家的「关注❤️ + 点赞👍 + 收藏⭐」就是我创作的最大动力!谢谢大家的支持,我们下文见!

  • 相关阅读:
    利用Openssl写一个简陋的https劫持
    ASUS华硕ZenBook灵耀X逍遥UXF3000E_UX363EA原装出厂预装Win11系统工厂模式安装包
    菱形(曼哈顿距离) C++实现
    java毕业设计项目pring boot项目源代码+mysql+freemark校园竞赛报名管理平台
    unity学习笔记-有关打包到安卓开机黑屏时间过长的心得
    C++从静态类型到单例模式
    Latex+论文+工具+邮箱客户端不能登录
    高手过招不用鼠标,一款超好用的跨平台命令行界面库
    C#,排列组合的堆生成法(Heap’s Algorithm for generating permutations)算法与源代码
    软件工程导论第六版 第五章 总体设计
  • 原文地址:https://blog.csdn.net/vnjohn/article/details/134443109