• Seata四大模式之AT模式详解及代码实现


      😊 @ 作者: 一恍过去
      🎊 @ 社区: Java技术栈交流
      🎉 @ 主题: Seata四大模式之AT模式详解及代码实现
      ⏱️ @ 创作时间: 2022年06月20日

      1、实现机制

      1.1 提交阶段

      AT模式是Seata的默认模式,满足两阶段提交协议:

      • 一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
      • 二阶段:
        • 提交异步化,非常快速地完成。
        • 回滚通过一阶段的回滚日志进行反向补偿。

      1.2 实现逻辑

      在这里插入图片描述

      一阶段:
      1、解析 SQL,获取执行的SQL语句的信息;
      2、根据解析得到的条件信息,执行查询语句,获取数据,生成前镜像;
      3、执行SQL语句,并且根据前镜像中的主键查询数据,生成后镜像;
      4、把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中;
      5、在提交事务前,向 TC 注册分支事务,并且根据主键值获取全局锁;
      6、提交本地事务,包含业务SQL、UNDO_LOG日志生成SQL;
      7、将本地事务提交的结果上报给 TC。
      说明:前后镜像实质是一个json串,记录了sql语句中的字段信息,比如:

      	 "beforeImage": {
      			"rows": [{
      				"fields": [{
      					"name": "id",
      					"type": 4,
      					"value": 1
      				}]
      			}],
      			"tableName": "product"
      		}
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10

      二阶-事务提交
      1、分支收到 TC 的事务提交请求,上报提交成功,并且异步的删除全局锁和 UNDO LOG 记录;

      二阶-事务回滚
      1、通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录;
      2、将UNDO LOG 中的后镜与当前数据进行比较,验证数据是否有被第三方篡改;
      3、根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚语句;
      4、提交本地事务,将分支事务回滚的结果上报给 TC;

      1.3 优缺点

      • 优点:
        • 一阶段完成直接提交事务,释放数据库资源,性能比较好。
        • 利用全局锁实现读写隔离。
        • 没有代码侵入,框架自动完成回滚和提交。
      • 缺点:
        • 两阶段之间属于软状态,无法保证数据强一致性,只能是数据最终一致性
        • 需要额外维护undo_log表。

      2、读写隔离

      2.1 写隔离

      在一阶段本地事务提交前,确保要先获取全局锁 ,如果没有获取到全局锁就不能进行事务提交,如果尝试获取全局锁超时,那么回滚本地事务并且释放本地锁;逻辑如下:

      正常提交: tx1先开启本地事务,执行更新操作,获取全局锁并且进行事务提交,二阶段tx1全局提交,释放全局锁 ,其他事务拿到·全局锁·提交本地事务。在tx1 事务提交前,全局锁被 tx1 持有,其他事务需要重试等待获取全局锁
      在这里插入图片描述

      事务回滚: tx1先开启本地事务,执行更新操作,获取全局锁并且进行事务提交,二阶段tx1全局回滚,tx1重新获取该数据的本地锁,实现分支的回滚。在tx1进行回滚时,其他分支事务一直在等待获取全局锁,tx1需要一直重试事务回滚,直到其他事务获取全局锁超时,并且在整个回滚过程中全局锁一直被 tx1所持有的,所以不会发生脏写的情况;
      在这里插入图片描述

      2.2 读隔离

      Seata-AT 模式的默认全局隔离级别是读未提交;目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理实现读已提交SELECT FOR UPDATE 语句的执行会申请全局锁 ,如果全局锁被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询语句一直被锁值 ,直到获取全局锁,此时读取的相关数据是已提交的读已提交

      在这里插入图片描述

      3、代码实现

      创建两个SpringBoot工程,分别为storage-serviceorder-service,模拟从在order-service服务中新增订单,然后调用storage-service服务新增库存扣减记录;核心代码如下,完整代码参考文末github地址

      3.1 建表语句

      -- 数据库名称: seata-at-demo.sql
      
      -- 订单表
      CREATE TABLE `tb_order`
      (
          `id`    int(11) NOT NULL COMMENT '主键',
          `count` int(11) NULL DEFAULT 0 COMMENT '下单数量',
          `money` int(11) NULL DEFAULT 0 COMMENT '金额',
          PRIMARY KEY (`id`) USING BTREE
      ) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;
      
      
      -- 库存表
      CREATE TABLE `tb_storage`
      (
          `id`       int(11) NOT NULL COMMENT '主键',
          `order_id` int(11) NOT NULL COMMENT '订单ID',
          `count`    int(11) NOT NULL DEFAULT 0 COMMENT '库存',
          PRIMARY KEY (`id`) USING BTREE
      ) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = COMPACT;
      
      
      -- undo_log表
      CREATE TABLE `undo_log`
      (
          `branch_id`     bigint(20) NOT NULL COMMENT 'branch transaction id',
          `xid`           varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id',
          `context`       varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization',
          `rollback_info` longblob                                                NOT NULL COMMENT 'rollback info',
          `log_status`    int(11) NOT NULL COMMENT '0:normal status,1:defense status',
          `log_created`   datetime(6) NOT NULL COMMENT 'create datetime',
          `log_modified`  datetime(6) NOT NULL COMMENT 'modify datetime',
          UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE
      ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34

      3.2 order-service服务

      3.2.1 yaml配置

      server:
        port: 8082
      spring:
        application:
          name: order-service
        datasource:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3307/seata-at-demo?useUnicode=true&useSSL=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lhzlx
        cloud:
          nacos:
            discovery:
              server-addr: 127.0.0.1:8848
              namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
              group: test
      
      seata:
        enabled: true
        application-id: ${spring.application.name}
        # 事务组的名称,对应service.vgroupMapping.default_tx_group=xxx中配置的default_tx_group
        tx-service-group: default_tx_group
        # 配置事务组与集群的对应关系
        service:
          vgroup-mapping:
            # default_tx_group为事务组的名称,default为集群名称
            default_tx_group: default
          disable-global-transaction: false
        registry:
          type: nacos
          nacos:
            application: seata-server
            server-addr: 127.0.0.1:8848
            group: SEATA_GROUP
            namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
            username: nacos
            password: nacos
            cluster: default
        config:
          type: nacos
          nacos:
            server-addr: 162.14.115.18:8848
            group: SEATA_GROUP
            namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
            username: nacos
            password: nacos
            data-id: seataServer.properties
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47

      3.2.2 Service

      上游服务通过@GlobalTransactional注解开启全局事务,使用storageClient进行feign调用

      @Slf4j
      @Service
      public class OrderServiceImpl implements OrderService {
      
          @Resource
          private StorageClient storageClient;
      
          @Resource
          private OrderMapper orderMapper;
      
      
          /**
           * 创建订单
           *
           * @param order
           * @return
           */
          @Override
          @GlobalTransactional
          public Long create(Order order) {
              // 创建订单
              long id = new Random().nextInt(999999999);
              order.setId(id);
              orderMapper.insert(order);
              try {
                  // 记录库存信息
                  storageClient.deduct(order.getId(), order.getCount());
      			// 模拟异常
                  // int a = 1 / 0;
              } catch (FeignException e) {
                  log.error("下单失败,原因:{}", e.contentUTF8(), e);
                  throw new RuntimeException(e.contentUTF8(), e);
              }
              return order.getId();
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37

      3.2.3 StorageClient

      @FeignClient("storage-service")
      public interface StorageClient {
          /**
           * 扣减库存
           *
           * @param orderId
           * @param count
           */
          @PostMapping("/storage")
          void deduct(@RequestParam("orderId") Long orderId, @RequestParam("count") Integer count);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11

      3.3 storage-service服务

      3.3.1 yaml配置

      server:
        port: 8081
      spring:
        application:
          name: storage-service
        datasource:
          driver-class-name: com.mysql.cj.jdbc.Driver
          url: jdbc:mysql://127.0.0.1:3307/seata-at-demo?useUnicode=true&useSSL=false&zeroDateTimeBehavior=convertToNull&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai
          username: root
          password: lhzlx
        cloud:
          nacos:
            discovery:
              server-addr: 127.0.0.1:8848
              namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
              group: test
              # 在dev环境进行debug时,可以将时间设置长一些
              #heart-beat-interval: 1000 #心跳间隔。单位为毫秒,默认5*1000
              heart-beat-timeout: 300000 #心跳暂停,收不到心跳,会将实例设为不健康。单位为毫秒,默认15*1000
              ip-delete-timeout: 4000000 #Ip删除超时,收不到心跳,会将实例删除。单位为毫秒,默认30*1000
      
      
      seata:
        enabled: true
        application-id: ${spring.application.name}
        # 事务组的名称,对应service.vgroupMapping.default_tx_group=xxx中配置的default_tx_group
        tx-service-group: default_tx_group
        # 配置事务组与集群的对应关系
        service:
          vgroup-mapping:
            # default_tx_group为事务组的名称,default为集群名称
            default_tx_group: default
          disable-global-transaction: false
        registry:
          type: nacos
          nacos:
            application: seata-server
            server-addr: 162.14.115.18:8848
            group: SEATA_GROUP
            namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
            username: nacos
            password: nacos
            cluster: default
        config:
          type: nacos
          nacos:
            server-addr: 162.14.115.18:8848
            group: SEATA_GROUP
            namespace: 64ed9ca7-d705-4655-b4e4-f824e420a12a
            username: nacos
            password: nacos
            data-id: seataServer.properties
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52

      3.3.2 StorageService

      @Slf4j
      @Service
      public class StorageServiceImpl implements StorageService {
      
          @Resource
          private StorageMapper storageMapper;
      
          /**
           * 扣除存储数量
           *
           * @param orderId
           * @param count
           */
          @Override
          public void deduct(Long orderId, int count) {
              log.info("开始记录库存信息");
              try {
                  long id = new Random().nextInt(999999999);
                  Storage storage = new Storage();
                  storage.setId(id);
                  storage.setOrderId(orderId);
                  storage.setCount(count);
      
                  storageMapper.insert(storage);
      			// 模拟异常
                  // int a = 1 / 0;
              } catch (Exception e) {
                  throw new RuntimeException("扣减库存失败,可能是库存不足!", e);
              }
              log.info("库存信息记录成功");
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32

      4 测试

      测试时没有做截图进行演示,只说明了结果,可以运行代码设置异常进行验证

      4.1 下游服务异常

      order-service服务中正常,在storage-service服务的service中抛出异常,观察数据是否成功回滚;如果tb_ordertb_storage都不存在数据,则表示全局事务成功;

      4.2 上游服务异常

      order-service服务在执行storageClient.deduct()方法后抛出异常,在storage-service服务中正常,观察数据是否成功回滚;如果tb_ordertb_storage都不存在数据,则表示全局事务成功;

      4.3 数据最终一致性验证

      我们可以在上游服务执行完orderMapper.insert(order);``后马上进入断点,测试去观察数据库会发现tb_order中存在数据,再放行断点使程序执行异常,再次观察数据库会发现tb_order中的数据已经被删除了;

      5、源码地址

      Seata值AT模式代码实现:《seata-at-demo》

    • 相关阅读:
      Vue(七):vue项目发布到服务器
      【猛地学】vxe-table(支持大数据量的可编辑vue表格插件)
      【PHP手麻系统源码】手术麻醉相关的各项数据的记录、管理和应用
      如何让 Docker 在没有缓存的情况下重建镜像
      总结一些 spark 处理小trick
      深度学习之基于百度飞桨PaddleOCR图像字符检测识别系统
      网络小说作家写手提问常用的ChatGPT通用提示词模板
      Python 爬虫零基础:探索网络数据的神秘世界
      【Eclipse】取消按空格自动补全,以及出现没有src的解决办法
      Docker安装MySql教程步骤
    • 原文地址:https://blog.csdn.net/zhuocailing3390/article/details/125344747