• Spring Cloud(八):Spring Cloud Alibaba Seata 2PC、AT、XA、TCC


    事务简介

    分布式事务:https://www.processon.com/view/link/61cd52fb0e3e7441570801ab

    本地事务 JDBC 事务模式

    Connection conn = ... //获取数据库连接
    conn.setAutoCommit(false); //开启事务
    try{
       //...执行增删改查sql
       conn.commit(); //提交事务
    }catch (Exception e) {
      conn.rollback();//事务回滚
    }finally{
       conn.close();//关闭链接
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    分布式事务

    在微服务架构中,完成某一个业务功能可能需要横跨多个服务,操作多个数据库。这就涉及到到了分布式事务,需要操作的资源位于多个资源服务器上,而应用需要保证对于多个资源服务器的数据操作,要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同资源服务器的数据一致性。

    典型的分布式事务应用场景

    1. 跨库事务
      跨库事务指的是,一个应用某个功能需要操作多个库,不同的库中存储不同的业务数据。下图演示了一个服务同时操作2个库的情况:
      在这里插入图片描述

    2. 分库分表
      通常一个库数据量比较大或者预期未来的数据量比较大,都会进行分库分表。如下图,将数据库B拆分成了2个库:

    对于分库分表的情况,一般开发人员都会使用一些数据库中间件来降低sql操作的复杂性。如,对于sql:insert into user(id,name) values (1,“张三”),(2,“李四”)。这条sql是操作单库的语法,单库情况下,可以保证事务的一致性。 但是由于现在进行了分库分表,开发人员希望将1号记录插入分库1,2号记录插入分库2。所以数据库中间件要将其改写为2条sql,分别插入两个不同的分库,此时要保证两个库要不都成功,要不都失败,因此基本上所有的数据库中间件都面临着分布式事务的问题。
    在这里插入图片描述

    1. 微服务架构

    Service A完成某个功能需要直接操作数据库,同时需要调用Service B和Service C,而Service B又同时操作了2个数据库,Service C也操作了一个库。需要保证这些跨服务调用对多个数据库的操作要么都成功,要么都失败,实际上这可能是最典型的分布式事务场景。
    小结:上述讨论的分布式事务场景中,无一例外的都直接或者间接的操作了多个数据库。如何保证事务的ACID特性,对于分布式事务实现方案而言,是非常大的挑战。同时,分布式事务实现方案还必须要考虑性能的问题,如果为了严格保证ACID特性,导致性能严重下降,那么对于一些要求快速响应的业务,是无法接受的。
    在这里插入图片描述

    2PC

    两阶段提交协议(2PC),就是将提交(commit)过程划分为2个阶段(Phase)

    阶段1

    TM通知各个RM准备提交它们的事务分支。如果RM判断自己进行的工作可以被提交,那就对工作内容进行持久化,再给TM肯定答复;要是发生了其他情况,那给TM的都是否定答复。

    以mysql数据库为例,在第一阶段,事务管理器向所有涉及到的数据库服务器发出prepare"准备提交"请求,数据库收到请求后执行数据修改和日志记录等处理,处理完成后只是把事务的状态改成"可以提交",然后把结果返回给事务管理器。

    阶段2

    TM根据阶段1各个RM prepare的结果,决定是提交还是回滚事务。如果所有的RM都prepare成功,那么TM通知所有的RM进行提交;如果有RM prepare失败的话,则TM通知所有RM回滚自己的事务分支。

    以mysql数据库为例,如果第一阶段中所有数据库都prepare成功,那么事务管理器向数据库服务器发出"确认提交"请求,数据库服务器把事务的"可以提交"状态改为"提交完成"状态,然后返回应答。如果在第一阶段内有任何一个数据库的操作发生了错误,或者事务管理器收不到某个数据库的回应,则认为事务失败,回撤所有数据库的事务。数据库服务器收不到第二阶段的确认提交请求,也会把"可以提交"的事务回撤。
    在这里插入图片描述

    两阶段提交方案下全局事务的ACID特性,是依赖于RM的。一个全局事务内部包含了多个独立的事务分支,这一组事务分支要么都成功,要么都失败。各个事务分支的ACID特性共同构成了全局事务的ACID特性。也就是将单个事务分支支持的ACID特性提升一个层次到分布式事务的范畴。

    2PC存在的问题

    • 同步阻塞问题
      2PC 中的参与者是阻塞的。在第一阶段收到请求后就会预先锁定资源,一直到 commit 后才会释放。
    • 单点故障
      由于协调者的重要性,一旦协调者TM发生故障,参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。
    • 数据不一致
      若协调者第二阶段发送提交请求时崩溃,可能部分参与者收到commit请求提交了事务,而另一部分参与者未收到commit请求而放弃事务,从而造成数据不一致的问题。

    Seata

    https://seata.io/zh-cn/index.html

    Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

    Seata 提供的事务模式:

    • AT(阿里首推)
    • TCC
    • SAGA
    • XA
    • GTS (Global Transaction Service 全局事务服务商用版)

    网站:

    • 官网:https://seata.io/zh-cn/index.html
    • 源码: https://github.com/seata/seata
    • seata版本:v1.5.1

    Seata三大角色

    • TC (Transaction Coordinator) - 事务协调者
      维护全局和分支事务的状态,驱动全局事务提交或回滚。
    • TM (Transaction Manager) - 事务管理器
      定义全局事务的范围:开始全局事务、提交或回滚全局事务。
    • RM (Resource Manager) - 资源管理器
      管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

    TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端

    在 Seata 中,一个分布式事务的生命周期如下:

    • TM 请求 TC 开启一个全局事务。TC 会生成一个 XID 作为该全局事务的编号。XID会在微服务的调用链路中传播,保证将多个微服务的子事务关联在一起。
    • RM 请求 TC 将本地事务注册为全局事务的分支事务,通过全局事务的 XID 进行关联。
    • TM 请求 TC 告诉 XID 对应的全局事务是进行提交还是回滚。
    • TC 驱动 RM 们将 XID 对应的自己的本地事务进行提交还是回滚。

    在这里插入图片描述

    Seata AT模式的设计思路

    Seata AT模式的核心是对业务无侵入,是一种改进后的两阶段提交,其设计思路如下:

    • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
    • 二阶段:
      • 提交异步化,非常快速地完成。
      • 回滚通过一阶段的回滚日志进行反向补偿。

    一阶段

    业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。核心在于对业务sql进行解析,转换成undolog,并同时入库,这是怎么做的呢?
    在这里插入图片描述

    二阶段

    • 分布式事务操作成功,则TC通知RM异步删除undolog
      在这里插入图片描述

    • 分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。
      在这里插入图片描述

    Seata快速开始

    Seata分TC、TM和RM三个角色,TC(Server端)为单独服务端部署,TM和RM(Client端)由业务系统集成

    1. Seata Server(TC)环境搭建

    Server端存储模式(store.mode)支持三种:

    • file:单机模式,全局事务会话信息内存中读写并持久化本地文件root.data,性能较高
    • db:高可用模式,全局事务会话信息通过db共享,相应性能差些
    • redis:1.3及以上版本支持,性能较高,存在事务信息丢失风险,请提前配置适合当前场景的redis持久化配置

    资源目录:https://github.com/seata/seata/tree/v1.5.1/script

    • client:存放client端sql脚本,参数配置
    • config-center:各个配置中心参数导入脚本,config.txt(包含server和client)为通用参数文件
    • server:server端数据库脚本及各个容器配置

    db存储模式+Nacos(注册&配置中心)方式部署

    1. 下载安装包
    2. 建表(db模式)
    3. 配置Nacos注册中心 注意:Seata的注册中心是作用于Seata自身的,和Spring Cloud的注册中心无关
    4. 配置Nacos配置中心 注意:Seata的配置中心是作用于Seata自身的,和Spring Cloud的配置中心无关
    5. 启动Seata Server
      在这里插入图片描述

    配置Seata Server

    1. 配置将Seata Server注册到Nacos,修改conf/application.yml文件
    seata:
      registry:
    	   # support: nacos, eureka, redis, zk, consul, etcd3, sofa
    	   type: nacos
    	   nacos:
    	     application: seata-server
    	     server-addr: 127.0.0.1:8848
    	     group: SEATA_GROUP
    	     namespace:
    	     cluster: default
    	     username:
    	     password:
      config:
        # support: nacos, consul, apollo, zk, etcd3
        type: nacos
        nacos:
          server-addr: 127.0.0.1:8848
          namespace: 7e838c12-8554-4231-82d5-6d93573ddf32
          group: SEATA_GROUP
          data-id: seataServer.properties
         username:
         password:
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1. 上传配置至Nacos配置中心 https://github.com/seata/seata/tree/v1.5.1/script/config-center
    • a) 获取/seata/script/config-center/config.txt,修改为db存储模式,并修改mysql连接配置
      store.mode=db
      store.lock.mode=db
      store.session.mode=db
      store.db.driverClassName=com.mysql.jdbc.Driver
      store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
      store.db.user=root
      store.db.password=root
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      store.mode=db,由于seata是通过jdbc的executeBatch来批量插入全局锁的,根据MySQL官网的说明,连接参数中的rewriteBatchedStatements为true时,在执行executeBatch,并且操作类型为insert时,jdbc驱动会把对应的SQL优化成insert into () values (), ()的形式来提升批量插入的性能。
      根据实际的测试,该参数设置为true后,对应的批量插入性能为原来的10倍多,因此在数据源为MySQL时,建议把该参数设置为true。
    • b) 配置事务分组, 要与client配置的事务分组一致
      • 事务分组:seata的资源逻辑,可以按微服务的需要,在应用程序(客户端)对自行定义事务分组,每组取一个名字。
      • 集群:seata-server服务端一个或多个节点组成的集群cluster。 应用程序(客户端)使用时需要指定事务逻辑分组与Seata服务端集群的映射关系。
      • 事务分组如何找到后端Seata集群(TC)?
        1. 首先应用程序(客户端)中配置了事务分组(GlobalTransactionScanner 构造方法的txServiceGroup参数)。若应用程序是SpringBoot则通过seata.tx-service-group 配置。
        2. 应用程序(客户端)会通过用户配置的配置中心去寻找service.vgroupMapping .[事务分组配置项],取得配置项的值就是TC集群的名称。若应用程序是SpringBoot则通过seata.service.vgroup-mapping.事务分组名=集群名称 配置
        3. 拿到集群名称程序通过一定的前后缀+集群名称去构造服务名,各配置中心的服务名实现不同(前提是Seata-Server已经完成服务注册,且Seata-Server向注册中心报告cluster名与应用程序(客户端)配置的集群名称一致)
          拿到服务名去相应的注册中心去拉取相应服务名的服务列表,获得后端真实的TC服务列表(即Seata-Server集群节点列表)
    • c) 在nacos配置中心中新建配置,dataId为seataServer.properties,配置内容为上面修改后的config.txt中的配置信息
      • 从v1.4.2版本开始,seata已支持从一个Nacos dataId中获取所有配置信息,你只需要额外添加一个dataId配置项。
    1. 启动Seata Server
      bin/seata-server.sh -p 8091 -h 127.0.0.1 -m db
      在这里插入图片描述

    Seata 客户端

    在这里插入图片描述

    首先需要在客户端Mysql库中建表
    https://github.com/seata/seata/blob/develop/script/client/at/db/mysql.sql

    -- for AT mode you must to init this sql for you business database. the seata server not need it.
    CREATE TABLE IF NOT EXISTS `undo_log`
    (
        `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
        `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
        `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
        `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
        `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
        `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
        `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
        UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
    ) ENGINE = InnoDB
      AUTO_INCREMENT = 1
      DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1. pom.xml
    
    <dependency>
        <groupId>com.alibaba.cloudgroupId>
        <artifactId>spring-cloud-starter-alibaba-seataartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. applicaiton.yml
    seata:
      application-id: ${spring.application.name}
      # seata 服务分组,要与服务端配置service.vgroup_mapping的后缀对应
      tx-service-group: default_tx_group
      registry:
        # 指定nacos作为注册中心
        type: nacos
        nacos:
          application: seata-server
          server-addr: 127.0.0.1:8848
          namespace:
          group: SEATA_GROUP
    
      config:
        # 指定nacos作为配置中心
        type: nacos
        nacos:
          server-addr: 127.0.0.1:8848
          namespace: 7e838c12-8554-4231-82d5-6d93573ddf32
          group: SEATA_GROUP
          data-id: seataServer.properties
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    1. 在全局事务发起者中添加@GlobalTransactional注解
    @Override
    @GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
    public Order saveOrder(OrderVo orderVo){
        log.info("=============用户下单=================");
        log.info("当前 XID: {}", RootContext.getXID());
        
        // 保存订单
        Order order = new Order();
        order.setUserId(orderVo.getUserId());
        order.setCommodityCode(orderVo.getCommodityCode());
        order.setCount(orderVo.getCount());
        order.setMoney(orderVo.getMoney());
        order.setStatus(OrderStatus.INIT.getValue());
    
        Integer saveOrderRecord = orderMapper.insert(order);
        log.info("保存订单{}", saveOrderRecord > 0 ? "成功" : "失败");
        
        //扣减库存
        storageFeignService.deduct(orderVo.getCommodityCode(),orderVo.getCount());
        
        //扣减余额
        accountFeignService.debit(orderVo.getUserId(),orderVo.getMoney());
    
        //更新订单
        Integer updateOrderRecord = orderMapper.updateOrderStatus(order.getId(),OrderStatus.SUCCESS.getValue());
        log.info("更新订单id:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败");
        
        return order;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    2. Seata XA模式

    XA 模型会一直占用资源,直到第二阶段结束才会释放数据库资源

    AT 有前置镜像和后置镜像
    XA 是一直占有资源

    https://seata.io/zh-cn/docs/dev/mode/xa-mode.html

    在这里插入图片描述
    seata.data-source-proxy-mode = XA

    seata:
      # 是否开启spring-boot自动装配,默认true,包括数据源的自动代理以及GlobalTransactionScanner初始化
      enabled: true
      # 数据源代理模式 默认AT  ******
      data-source-proxy-mode: XA
      application-id: ${spring.application.name}
      # seata 服务分组,要与服务端配置service.vgroup_mapping的后缀对应
      tx-service-group: default_tx_group
      registry:
        # 指定nacos作为注册中心
        type: nacos
        nacos:
          application: seata-server
          server-addr: 127.0.0.1:8848
          namespace:
          group: SEATA_GROUP
    
      config:
        # 指定nacos作为配置中心
        type: nacos
        nacos:
          server-addr: 127.0.0.1:8848
          namespace: 7e838c12-8554-4231-82d5-6d93573ddf32
          group: SEATA_GROUP
          data-id: seataServer.properties
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    Mysql XA事务Demo

    
    <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>5.1.48version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    public class MysqlXADemo {
        public static void main(String[] args) throws SQLException {
            //true表示打印XA语句,,用于调试
            boolean logXaCommands = true;
    
            // 获得资源管理器操作接口实例 RM1
            Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_order", "root", "root");
            XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn1, logXaCommands);
            XAResource rm1 = xaConn1.getXAResource();
            
            // 获得资源管理器操作接口实例 RM2
            Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_storage", "root", "root");
            XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn2, logXaCommands);
            XAResource rm2 = xaConn2.getXAResource();
    
            // AP请求TM执行一个分布式事务,TM生成全局事务id
            byte[] gtrid = "g12345".getBytes();
            int formatId = 1;
            try {
                // ==============分别执行RM1和RM2上的事务分支====================
                // TM生成rm1上的事务分支id
                byte[] bqual1 = "b00001".getBytes();
                Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
                // 执行rm1上的事务分支
                rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
                PreparedStatement ps1 = conn1.prepareStatement("INSERT into order_tbl(user_id,commodity_code,count,money,status) VALUES (1001,2001,2,10,1)");
                ps1.execute();
                rm1.end(xid1, XAResource.TMSUCCESS);
    
                // TM生成rm2上的事务分支id
                byte[] bqual2 = "b00002".getBytes();
                Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
                // 执行rm2上的事务分支
                rm2.start(xid2, XAResource.TMNOFLAGS);
                PreparedStatement ps2 = conn2.prepareStatement("update storage_tbl set count=count-2 where commodity_code=2001");
                ps2.execute();
                rm2.end(xid2, XAResource.TMSUCCESS);
                
                // ===================两阶段提交================================
                // phase1:询问所有的RM 准备提交事务分支
                int rm1_prepare = rm1.prepare(xid1);
                int rm2_prepare = rm2.prepare(xid2);
                // phase2:提交所有事务分支
                boolean onePhase = false;
                //TM判断有2个事务分支,所以不能优化为一阶段提交
                if (rm1_prepare == XAResource.XA_OK && rm2_prepare == XAResource.XA_OK) {
                    //所有事务分支都prepare成功,提交所有事务分支
                    rm1.commit(xid1, onePhase);
                    rm2.commit(xid2, onePhase);
                } else {
                    //如果有事务分支没有成功,则回滚
                    rm1.rollback(xid1);
                    rm2.rollback(xid2);
                }
            } catch (XAException e) {
                // 如果出现异常,也要进行回滚
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    3. Seata TCC 模式 (最终一致性)

    TCC 基于分布式事务中的二阶段提交协议实现,它的全称为 Try-Confirm-Cancel,即资源预留(Try)、确认操作(Confirm)、取消操作(Cancel),他们的具体含义如下:
    Try:对业务资源的检查并预留;
    Confirm:对业务处理进行提交,即 commit 操作,只要 Try 成功,那么该步骤一定成功;
    Cancel:对业务处理进行取消,即回滚操作,该步骤回对 Try 预留的资源进行释放。

    • XA是资源层面的分布式事务,强一致性,在两阶段提交的整个过程中,一直会持有资源的锁。
    • TCC是业务层面的分布式事务,最终一致性,不会一直持有资源的锁。

    TCC 是一种侵入式的分布式事务解决方案,以上三个操作都需要业务系统自行实现,对业务系统有着非常大的入侵性,设计相对复杂,但优点是 TCC 完全不依赖数据库,能够实现跨数据库、跨应用资源管理,对这些不同数据访问通过侵入式的编码方式实现一个原子操作,更好地解决了在各种复杂业务场景下的分布式事务问题。

    以用户下单为例
    • try-commit
      在这里插入图片描述

    • try-cancel
      在这里插入图片描述

    Seata TCC模式接口如何改造
    • TCC相关注解如下:

      • @LocalTCC 适用于SpringCloud+Feign模式下的TCC,@LocalTCC一定需要注解在接口上,此接口可以是寻常的业务接口,只要实现了TCC的两阶段提交对应方法便可
      • @TwoPhaseBusinessAction 注解try方法,其中name为当前tcc方法的bean名称,写方法名便可(全局唯一),commitMethod指向提交方法,rollbackMethod指向事务回滚方法。指定好三个方法之后,seata会根据全局事务的成功或失败,去帮我们自动调用提交方法或者回滚方法。
      • @BusinessActionContextParameter 注解可以将参数传递到二阶段(commitMethod/rollbackMethod)的方法。
      • BusinessActionContext 便是指TCC事务上下文
    • TCC 幂等、悬挂和空回滚问题如何解决?
      TCC 模式中存在的三大问题是幂等、悬挂和空回滚。在 Seata1.5.1 版本中,增加了一张事务控制表,表名是 tcc_fence_log 来解决这个问题。而在@TwoPhaseBusinessAction 注解中提到的属性 useTCCFence 就是来指定是否开启这个机制,这个属性值默认是 false。

      微服务增加tcc_fence_log日志表 https://github.com/seata/seata/tree/develop/script/client/tcc/db

    	-- -------------------------------- The script use tcc fence  --------------------------------
    CREATE TABLE IF NOT EXISTS `tcc_fence_log`
    (
        `xid`           VARCHAR(128)  NOT NULL COMMENT 'global id',
        `branch_id`     BIGINT        NOT NULL COMMENT 'branch id',
        `action_name`   VARCHAR(64)   NOT NULL COMMENT 'action name',
        `status`        TINYINT       NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
        `gmt_create`    DATETIME(3)   NOT NULL COMMENT 'create time',
        `gmt_modified`  DATETIME(3)   NOT NULL COMMENT 'update time',
        PRIMARY KEY (`xid`, `branch_id`),
        KEY `idx_gmt_modified` (`gmt_modified`),
        KEY `idx_status` (`status`)
    ) ENGINE = InnoDB
    DEFAULT CHARSET = utf8mb4;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    @LocalTCC
    public interface StorageService {
    
        /**
         * Try: 库存-扣减数量,冻结库存+扣减数量
         *
         * 定义两阶段提交,在try阶段通过@TwoPhaseBusinessAction注解定义了分支事务的 resourceId,commit和 cancel 方法
         *  name = 该tcc的bean名称,全局唯一
         *  commitMethod = commit 为二阶段确认方法
         *  rollbackMethod = rollback 为二阶段取消方法
         *  BusinessActionContextParameter注解 传递参数到二阶段中
         *
         * @param commodityCode 商品编号
         * @param count 扣减数量
         * @return
         */
        @TwoPhaseBusinessAction(name = "deduct", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
        boolean deduct(@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode,
                       @BusinessActionContextParameter(paramName = "count") int count);
    
        /**
         *
         * Confirm: 冻结库存-扣减数量
         * 二阶段确认方法可以另命名,但要保证与commitMethod一致
         * context可以传递try方法的参数
         *
         * @param actionContext
         * @return
         */
        boolean commit(BusinessActionContext actionContext);
    
        /**
         * Cancel: 库存+扣减数量,冻结库存-扣减数量
         * 二阶段取消方法可以另命名,但要保证与rollbackMethod一致
         *
         * @param actionContext
         * @return
         */
        boolean rollback(BusinessActionContext actionContext);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    @Service
    @Slf4j
    public class StorageServiceImpl implements StorageService {
        
        @Autowired
        private StorageMapper storageMapper;
        
        @Transactional
        @Override
        public boolean deduct(String commodityCode, int count){
            log.info("=============冻结库存=================");
            log.info("当前 XID: {}", RootContext.getXID());
    
            // 检查库存
            checkStock(commodityCode,count);
            
            log.info("开始冻结 {} 库存", commodityCode);
            //冻结库存
            Integer record = storageMapper.freezeStorage(commodityCode,count);
            log.info("冻结 {} 库存结果:{}", commodityCode, record > 0 ? "操作成功" : "扣减库存失败");
            return true;
        }
    
        @Override
        public boolean commit(BusinessActionContext actionContext) {
            log.info("=============扣减冻结库存=================");
    
            String commodityCode = actionContext.getActionContext("commodityCode").toString();
            int count = (int) actionContext.getActionContext("count");
            //扣减冻结库存
            storageMapper.reduceFreezeStorage(commodityCode,count);
    
            return true;
        }
    
        @Override
        public boolean rollback(BusinessActionContext actionContext) {
            log.info("=============解冻库存=================");
    
            String commodityCode = actionContext.getActionContext("commodityCode").toString();
            int count = (int) actionContext.getActionContext("count");
            //扣减冻结库存
            storageMapper.unfreezeStorage(commodityCode,count);
    
            return true;
        }
    
        private void checkStock(String commodityCode, int count){
            
            log.info("检查 {} 库存", commodityCode);
            Storage storage = storageMapper.findByCommodityCode(commodityCode);
            if (storage.getCount() < count) {
                log.warn("{} 库存不足,当前库存:{}", commodityCode, count);
                throw new RuntimeException("库存不足");
            }
            
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    @Override
    @GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
    public Order saveOrder(OrderVo orderVo) {
        log.info("=============用户下单=================");
        log.info("当前 XID: {}", RootContext.getXID());
    
        //获取全局唯一订单号  测试使用
        Long orderId = UUIDGenerator.generateUUID();
    
        //阶段一: 创建订单
        Order order = orderService.prepareSaveOrder(orderVo,orderId);
    
        //扣减库存
        storageFeignService.deduct(orderVo.getCommodityCode(), orderVo.getCount());
        //扣减余额
        accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney());
    
        return order;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    在 TCC 模型执行的过程中,还可能会出现各种异常,其中最为常见的有空回滚、幂等、悬挂等
    1. 要想防止空回滚,那么必须在 Cancel 方法中识别这是一个空回滚,Seata 是如何做的呢?
      Seata 的做法是新增一个 TCC 事务控制表,包含事务的 XID 和 BranchID 信息,在 Try 方法执行时插入一条记录,表示一阶段执行了,执行 Cancel 方法时读取这条记录,如果记录不存在,说明 Try 方法没有执行。

    2. Seata 是如何处理幂等问题的呢?
      同样的也是在 TCC 事务控制表中增加一个记录状态的字段 status,该字段有 3 个值,分别为:

      • tried:1
      • committed:2
      • rollbacked:3

      二阶段 Confirm/Cancel 方法执行后,将状态改为 committed 或 rollbacked 状态。当重复调用二阶段 Confirm/Cancel 方法时,判断事务状态即可解决幂等问题。

    3. Seata 是怎么处理悬挂的呢?
      在 TCC 事务控制表记录状态的字段 status 中增加一个状态:

      • suspended:4

      当执行二阶段 Cancel 方法时,如果发现 TCC 事务控制表有相关记录,说明二阶段 Cancel 方法优先一阶段 Try 方法执行,因此插入一条 status=4 状态的记录,当一阶段 Try 方法后面执行时,判断 status=4 ,则说明有二阶段 Cancel 已执行,并返回 false 以阻止一阶段 Try 方法执行成功。

    Seata 源码分析

    设计流程 https://www.processon.com/view/link/6311bfda1e0853187c0ecd8c

    工作流程 https://www.processon.com/view/link/6007f5c00791294a0e9b611a

    源码 https://www.processon.com/view/link/5f743063e0b34d0711f001d2

    在这里插入图片描述

    1. Spring Boot TC
    @Component
    public class ServerRunner implements CommandLineRunner, DisposableBean {
        ...
        @Override
        public void run(String... args) {
            Server.start(args);
        }
        ...
    }
    
    2. seata client 入口
    SeataFeignClient#execute =》 headers.put("TX_XID", seataXid)
    SeataHandlerInterceptor#preHandle =RootContext.bind(request.getHeader("TX_XID"))
    
    3. GlobalTransactional TM TC RM 
    GlobalTransactionalInterceptor#invoke => MethodInterceptor
    
    TM TransactionManager
    
    TC DefaultCoordinator
    
    RM ResourceManager
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    关键类

    • TransactionManager (TM)

      • DefaultTransactionManager

      • TransactionManagerHolder为创建单例TransactionManager的工厂,可以使用EnhancedServiceLoader的spi机制加载用户自定义的类,默认为DefaultTransactionManager。

      • GlobalTransaction 接口提供给用户开启事务,提交,回滚,获取状态等方法。

      • DefaultGlobalTransaction是GlobalTransaction接口的默认实现,它持有TransactionManager对象,默认开启事务超时时间为60秒,默认名称为default,因为调用者的业务方法可能多重嵌套创建多个GlobalTransaction对象开启事务方法,因此GlobalTransaction有GlobalTransactionRole角色属性,只有Launcher角色的才有开启、提交、回滚事务的权利。

      • GlobalTransactionContext为操作GlobalTransaction的工具类,提供创建新的GlobalTransaction,获取当前线程有的GlobalTransaction等方法。

      • GlobalTransactionScanner继承AbstractAutoProxyCreator类,即实现了SmartInstantiationAwareBeanPostProcessor接口,会在spring容器启动初始化bean的时候,对bean进行代理操作。wrapIfNecessary为继承父类代理bean的核心方法,如果用户配置了service.disableGlobalTransaction为false属性则注解不生效直接返回,否则对GlobalTransactional或GlobalLock的方法进行拦截代理。

      • GlobalTransactionalInterceptor实现aop的MethodInterceptor接口,对有@GlobalTransactional或GlobalLock注解的方法进行代理。

      • TransactionalTemplate模板类提供了一个开启事务,执行业务,成功提交和失败回滚的模板方法execute(TransactionalExecutor business)。

    • DefaultCoordinator (TC)

      • DefaultCoordinator即为TC,全局事务默认的事务协调器。它继承AbstractTCInboundHandler接口,为TC接收RM和TM的request请求数据,是进行相应处理的处理器。实现TransactionMessageHandler接口,去处理收到的RPC信息。实现ResourceManagerInbound接口,发送至RM的branchCommit,branchRollback请求。

      • Core接口为seata处理全球事务协调器TC的核心处理器,它继承ResourceManagerOutbound接口,接受来自RM的rpc网络请求(branchRegister,branchReport,lockQuery)。同时继承TransactionManager接口,接受来自TM的rpc网络请求(begin,commit,rollback,getStatus),另外提供提供3个接口方法。

      • GlobalSession是seata协调器DefaultCoordinator管理维护的重要部件,当用户开启全局分布式事务,TM调用begin方法请求至TC,TC则创建GlobalSession实例对象,返回唯一的xid。它实现SessionLifecycle接口,提供begin,changeStatus,changeBranchStatus,addBranch,removeBranch等操作session和branchSession的方法。

      • BranchSession为分支session,管理分支数据,受globalSession统一调度管理,它的lock和unlock方法由lockManger实现。

      • DefaultLockManager是LockManager的默认实现,它获取branchSession的lockKey,转换成List,委派Locker进行处理。

      • Locker接口提供根据行数据获取锁,释放锁,是否锁住和清除所有锁的方法。

    • ResourceManager (RM)

      • ResourceManager是seata的重要组件之一,RM负责管理分支数据资源的事务。

      • AbstractResourceManager实现ResourceManager提供模板方法。DefaultResourceManager适配所有的ResourceManager,所有方法调用都委派给对应负责的ResourceManager处理。

      • DataSourceManager 此为AT模式核心管理器,DataSourceManager继承AbstractResourceManager,管理数据库Resouce的注册,提交以及回滚等

      • AsyncWorker DataSourceManager事务提交委派给AsyncWorker进行提交的,因为都成功了,无需回滚成功的数据,只需要删除生成的操作日志就行,采用异步方式,提高效率。

      • UndoLogManager

      • Resource能被ResourceManager管理并且能够关联GlobalTransaction。

      • DataSourceProxy实现Resource接口,BranchType为AT自动模式。它继承AbstractDataSourceProxy代理类,所有的DataSource相关的方法调用传入的targetDataSource代理类的方法,除了创建connection方法为创建ConnectionProxy代理类。对象初始化时获取连接的jdbcUrl作为resourceId,并注册至DefaultResourceManager进行管理。同时还提供获取原始连接不被代理的getPlainConnection方法。

      • ExecuteTemplate为具体statement的execute,executeQuery和executeUpdate执行提供模板方法

    Apache ShardingSphere 整合seata

    • Local 本地事务
    • XA 事务
    • BASE 柔性事务(Seata AT)

    https://shardingsphere.apache.org/document/current/cn/reference/transaction/base-transaction-seata/

    在这里插入图片描述

    -- for AT mode you must to init this sql for you business database. the seata server not need it.
    CREATE TABLE IF NOT EXISTS `undo_log`
    (
        `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
        `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
        `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
        `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
        `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
        `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
        `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
        UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
    ) ENGINE = InnoDB
      AUTO_INCREMENT = 1
      DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1. 引入依赖
    <dependency>
    	<groupId>com.alibaba.cloudgroupId>
    	<artifactId>spring-cloud-starter-alibaba-seataartifactId>
    	<version>2.2.8.RELEASEversion>
    	<exclusions>
    		<exclusion>
    			<groupId>io.seatagroupId>
    			<artifactId>seata-spring-boot-starterartifactId>
    		exclusion>
    	exclusions>
    dependency>
    
    <dependency>
    	<groupId>io.seatagroupId>
    	<artifactId>seata-spring-boot-starterartifactId>
    	<version>1.5.1version>
    dependency>
    
    
    <dependency>
    	<groupId>org.apache.shardingspheregroupId>
    	<artifactId>sharding-jdbc-spring-boot-starterartifactId>
    	<version>4.1.1version>
    dependency>
    
     <dependency>
         <groupId>org.apache.shardingspheregroupId>
         <artifactId>sharding-transaction-base-seata-atartifactId>
         <version>4.1.1version>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    1. 配置 resources/seata.conf
      包含 Seata 柔性事务的应用启动时,用户配置的数据源会根据 seata.conf 的配置,适配为 Seata 事务所需的 DataSourceProxy,并且注册至 RM 中。
    client {
        application.id = demo-order-curr
        transaction.service.group = default_tx_group
    }
    
    • 1
    • 2
    • 3
    • 4
    1. resources/application.yml
    • enable-auto-data-source-proxy false
    • service.vgroup-mapping.default_tx_group default
    seata:
      application-id: demo-order-curr
      tx-service-group: default_tx_group
      #关闭数据源自动代理,交给sharding-jdbc
      enable-auto-data-source-proxy: false
      registry:
        type: nacos
        nacos:
          application: seata-server
          server-addr: nacos.localhost.com:8848
          group: SEATA_GROUP
      config:
        nacos:
          server-addr: nacos.localhost.com:8848
          namespace: seata-config
          group: SEATA_GROUP
          data-id: seataServer.properties
      service:
        vgroup-mapping:
          default_tx_group: default
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    1. 开启全局事务配置
    //注意:@GlobalTransactional 和 @ShardingTransactionType 不能同时出现,此处不能使用 @GlobalTransactional
    //@GlobalTransactional(name = "generateOrder",rollbackFor = Exception.class)
    //全局事务交给@SeataATShardingTransactionManager管理
    @ShardingTransactionType(TransactionType.BASE)
    @Transactional
    public CommonResult generateOrder(OrderParam orderParam, Long memberId)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    柔性事务:可靠消息最终一致性方案实现

    方法一 定时任务

    在这里插入图片描述

    方法二 RocketMQ 事务消息

    在这里插入图片描述

    public interface RocketMQLocalTransactionListener {
    	/**
    	发送prepare消息成功此方法被回调,该方法用于执行本地事务
    	@param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
    	@param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
    	@return 返回事务状态,COMMIT :提交 ROLLBACK :回滚 UNKNOW :回调
    	*/
    	RocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg);
    	/**
    	@param msg 通过获取transactionId来判断这条消息的本地事务执行状态
    	@return 返回事务状态,COMMIT :提交 ROLLBACK :回滚 UNKNOW :回调
    	*/
    	RocketMQLocalTransactionState checkLocalTransaction(Message msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    定义不同的监听器绑定对应的生产者

    调用

    /*实际进行真实库存的扣减*/
    // todo 分布式事务
    // PO :可以使用MQ进行异步扣减
    // 使用事务消息机制发送扣减库存消息
    reduceStockMsgSender.sendReduceStockMsg(orderId,payType,orderDetail)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    定义扩建库存生产者

    @ExtRocketMQTemplateConfiguration
    public class ExtRocketMQTemplate extends RocketMQTemplate {
    }
    
    • 1
    • 2
    • 3

    发送

    @Component
    public class ReduceStockMsgSender {
    
        @Autowired
        private ExtRocketMQTemplate extRocketMQTemplate;
    
        /**
         * 使用事务消息机制发送扣减库存消息
         * @param orderId
         * @param payType
         * @param orderDetail
         * @return
         */
        public boolean sendReduceStockMsg(Long orderId, Integer payType, OmsOrderDetail orderDetail){
    
            List<StockChanges> stockChangesList = new ArrayList<>();
            for(OmsOrderItem omsOrderItem : orderDetail.getOrderItemList()){
                stockChangesList.add(new StockChanges(omsOrderItem.getProductSkuId(),omsOrderItem.getProductQuantity()));
            }
            String destination = "reduce-stock";
    
            StockChangeEvent stockChangeEvent = new StockChangeEvent();
            stockChangeEvent.setPayType(payType);
            stockChangeEvent.setOrderId(orderId);
            stockChangeEvent.setStockChangesList(stockChangesList);
    
            //TODO  全局事务id   可以用于幂等校验
            String transactionId = UUID.randomUUID().toString();
    
            stockChangeEvent.setTransactionId(transactionId);
            Message<StockChangeEvent> message = MessageBuilder.withPayload(stockChangeEvent)
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                    .setHeader("orderId",orderId)
                    .setHeader("payType",payType)
                    .build();
            //destination:目的地(主题),这里发送给reduce-stock这个topic
            //message:发送给消费者的消息体,需要使用MessageBuilder.withPayload() 来构建消息
            //arg:参数
            TransactionSendResult sendResult = extRocketMQTemplate.sendMessageInTransaction(destination,message,orderId);
            return SendStatus.SEND_OK == sendResult.getSendStatus();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    监听事务

    @Slf4j
    @RocketMQTransactionListener(rocketMQTemplateBeanName="extRocketMQTemplate") //一个事物监听器对应一个事物流程
    public class ReduceStockMsgListener implements RocketMQLocalTransactionListener {
    
        @Autowired
        private OmsOrderMapper omsOrderMapper;
    
        @Autowired
        private OmsPortalOrderService portalOrderService;
    
        /**
         * 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
         */
        @Override
        public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
            
            try {
                //解析message
                Long orderId = Long.parseLong(String.valueOf(arg));
                String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
                Integer payType = Integer.valueOf((String)message.getHeaders().get("payType"));
    
                //修改订单状态
                portalOrderService.updateOrderStatus(orderId,payType,transactionId);
    
                //当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
                return RocketMQLocalTransactionState.COMMIT;
            } catch (Exception e) {
                e.printStackTrace();
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        }
        
        /**
         * 事务状态回查
         * @param message
         * @return
         */
        @Override
        public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
    
            String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
            int existTx = omsOrderMapper.isExistTx(transactionId);
            if (existTx > 0) {
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                return RocketMQLocalTransactionState.UNKNOWN;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    消息体

    @Data
    public class StockChangeEvent {
    
        /**
         * 事务id
         */
        private String transactionId;
    
        private List<StockChanges> stockChangesList;
    
        private Long orderId;
        /**
         * 支付方式:0->未支付;1->支付宝;2->微信
         */
        private Integer payType;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    表SQL

    CREATE TABLE IF NOT EXISTS `local_transaction_log`
    (
    `tx_no` VARCHAR(128) NOT NULL COMMENT '分布式事务ID',
    `created` DATETIME(6)  NOT NULL COMMENT 'create datetime',
    UNIQUE KEY `local_transaction_log_key` (`tx_no`)
    ) ENGINE = INNODB 
    DEFAULT CHARSET = utf8mb4 COMMENT ='分布式事务控制表';
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    消费者

    @Slf4j
    @Component
    @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}",topic = "${rocketmq.consumer.topic}")
    public class ReduceStockMsgConsumer implements RocketMQListener<StockChangeEvent> {
    
        @Autowired
        private StockManageService stockManageService;
        
        /**
         * 接收消息
         */
        @Override
        public void onMessage(StockChangeEvent stockChangeEvent) {
            log.info("开始消费消息:{}",stockChangeEvent);
    
            stockManageService.reduceStock(stockChangeEvent);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    消费者幂等性实现

    @Override
    @Transactional
    public void reduceStock(StockChangeEvent stockChangeEvent) {
        //幂等性校验
        if(skuStockMapper.isExistTx(stockChangeEvent.getTransactionId())>0){
            return ;
        }
        List<StockChanges> stockChangesList = stockChangeEvent.getStockChangesList();
        //扣减冻结库存
        skuStockMapper.updateSkuStock(stockChangesList);
        //添加事务记录,用于幂等
        skuStockMapper.addTx(stockChangeEvent.getTransactionId());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    校验、插入

    @Select("select count(1) from local_transaction_log where tx_no = #{txNo}")
    int isExistTx(String txNo);
    
    @Insert("insert into local_transaction_log values(#{txNo},now());")
    int addTx(String txNo);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    配置

    rocketmq:
      name-server: rocketmq.localhost.com:9876
      consumer:
        group: stock_consumer_group
        topic: reduce-stock
    
    • 1
    • 2
    • 3
    • 4
    • 5
  • 相关阅读:
    LabVIEW开发虚拟与现实融合的数字电子技术渐进式实验系统
    【Linux】权限管理-权限的概念,umask,粘滞位
    Uniapp小程序 时间段选择限制(开始时间 结束时间相互限制)
    《中国垒球》:跨界互动·全明星赛
    FFplay文档解读-50-多媒体过滤器四
    1.15 - 输入输出系统
    489 - Hangman Judge (UVA)
    如果你要去拜访国外客户需要做哪些准备
    (77)MIPI DSI LLP介绍(十七)
    从零到一完成Midway.js登录、注册、鉴权功能
  • 原文地址:https://blog.csdn.net/menxu_work/article/details/127288148