• 分布式事务之Seata TCC


    TCC

    1. TCC是一种资源,实现了Try、Confirm、Cancel三个操作接口。与2PC不同的是,TCC是一种应用层实现的两阶段提交协议。在TCC分布式事务中,对每个业务操作都会分为Try、Confirm和Cancel三个阶段
    2. Try阶段:准备执行业务的阶段,这个阶段尝试执行业务,重点关注如下事项
      • 完成所有的业务检查,确保数据的一致性
      • 预留必要的业务资源,确保数据的隔离性
    3. Confirm阶段:确认执行业务的节点,在这个阶段确认执行的业务,重点关注如下事项
      • Try阶段执行成功后,Confirm真正地执行业务
      • 不做任何业务逻辑检查,直接将数据持久化到数据库(不会做二次检查,直接使用Try预留的业务资源)
      • Confirm操作需满足幂等性,因为Confirm失败后需要进行重试
    4. Cancel阶段:取消执行业务,重点关注如下事项
      • 释放Try阶段预留的业务资源
      • 将数据库中的数据恢复到最初的状态
      • Cancel操作需满足幂等性,因为Cancel失败后需要进行重试
    5. 由于使用TCC分布式事务时,各业务系统的事务未达到最终状态时,会存在短暂的数据不一致现象。因此业务系统需要具备兼容数据最终一致性之前带来的可见性问题的能力
    6. TCC使用场景
      • 具有强隔离性、严格一致性要求的业务场景
      • 执行时间比较短的业务
    7. TCC优点
      • 应用层实现逻辑,锁定资源的粒度变小(不会锁定所有资源),提升系统性能
      • Try、Confirm、Cancel都实现幂等性(Try一般也要实现幂等),能够保证分布式事务执行后的数据一致性
      • 相比与XA规范,无论主业务还是分支业务,都能集群部署,解决单点问题
    8. TCC缺点:对应用侵入性强,实现难度较大。业务逻辑的每个分支都需要实现try、confirm、cancel三个操作,应用侵入性较强,都要实现幂等,改造成本高。

    对比

    1. TCC与XA两阶段提交很类似

      • 在第一阶段,XA是写本地的redo和undo日志,还未提交事务,Try是通过独立的事务在业务上预留资源。

      • 在第二阶段,XA或者提交事务或者回滚事务,TCC或者执行业务或者补偿业务。

      • XA始终持有资源锁,在两个阶段中保持同步阻塞。而TCC是业务层面的分布式事务,每一个阶段都是一个独立的事务,不持有资源锁。TCC没有XA事务的同步阻塞和单点故障的问题。

    2. TCC与Saga相比多了一个Try阶段,显得更重,更加复杂

      • 普通业务要实现TCC,需要修改原有的业务逻辑,而Saga添加一个补偿动作就可以了
      • TCC的补偿是释放锁定的资源,Saga的补偿是回滚到原有的状态。
      • TCC最少通信次数为2n,而Saga为n(n=sub-transaction的数量)
      • TCC与Saga都实现了最终一致性,当Confirm或Cancel失败时,都需要人工处理,但TCC通过锁定资源相对实现了隔离性。

    注意的问题

    1. 在使用TCC时,需要注意空回滚幂等悬挂问题
    2. 空回滚问题
      • 含义:Try未执行,Cancel执行了
      • 出现的原因:Try阶段网络超时。Try阶段事务回滚,触发Cancel。未收到Try,直接收到Cancel
      • 解决方案:判断一阶段try是否执行(一般需要插入流水状态表),如果执行了就正常回滚,如果未执行,则是空回滚。需要一张状态表,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel 里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。
    3. 悬挂问题
      • 含义: 二阶段 Cancel 接口比 Try 接口先执行,导致预留资源无法继续处理。因为允许空回滚的原因,Cancel 接口认为 Try 接口没执行,空回滚直接返回成功,对于 Seata 框架来说,认为分布式事务的二阶段接口已经执行成功,整个分布式事务就结束了。Seata认为分布式事务结束,但是此时try才刚刚开始执行,最终导致预留资源不能被处理(Seata框架已经认为结束),这样就导致了悬挂
      • 导致的原因:在 RPC 调用时,先注册分支事务,再执行 RPC 调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,发起方就会通知 TC 回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者,真正执行,从而造成悬挂。
      • 解决方案:保证二阶段执行完成,一阶段就不能执行。执行二阶段后,在状态表中更新记录为已回滚,一阶段执行时,先读取改记录,如果记录已经存在,则认为二阶段已经执行(可以使用悲观锁或者乐观锁重试)
    4. 幂等问题:TCC 的二阶段 Confirm 和 Cancel 接口需要保证幂等,不会重复使用或者释放资源。实际上Try阶段也应该幂等,毕竟网络调用可能会导致重试
      • 导致的原因:提交或回滚是一次 TC 到参与者的网络调用,网络故障、参与者宕机等都有可能造成参与者 TCC 资源实际执行了二阶段,但是 TC 没有收到返回结果的情况,这时,TC 就会重复调用,直至调用成功,整个分布式事务结束。
      • 解决方案:事务控制表增加状态字段和唯一标识字段

    实现细节

    1. Try方法实现
      • Try需要能够告诉二阶段接口,已经预留资源成功(通常使用状态表,并且增加业务唯一标识)
      • 检查第二阶段是否已经执行完成,如果已完成,则不再执行
    2. Confirm方法实现
      • Confirm 方法不允许空回滚,即一定要在Try之后执行
      • 锁定状态记录,如果状态记录为空,则终止执行(抛出异常返回失败)。
      • 如果状态记录不为空,检查状态,如果是初始状态,则正常二阶段提交。如果是状态已经提交,则防止重复提交,直接返回成功。如果状态是已回滚,则终止执行(抛出异常返回失败)
    3. Cancel方法实现
      • 锁定状态记录,如果状态记录为空,表示Try未执行,即空回滚。空回滚需要插入一条事务状态记录(取消状态),确保后续的 Try 方法不会再执行。如果插入失败,则表示当前Try正在执行,直接返回失败,等待重试。如果插入成功,可确保后续Try不会执行
      • 如果一开始读取事务记录不为空,则说明 Try 方法已经执行完毕,再检查状态是否为初始化,如果是,则还没有执行过其他二阶段方法,正常执行 Cancel 逻辑。如果状态为已回滚,则说明这是重复调用,允许幂等,直接返回成功即可。如果状态为已提交,则同样是一个异常,一个已提交的事务,不能再次回滚。

    Seata TCC实战

    1. TCC流程图
      在这里插入图片描述

    2. 业务场景:A银行用户向B银行用户转账1000元,那么对应的TCC三阶段逻辑为

      账户0(A银行)账户1(B银行)
      try方法冻结金额+1000
      余额-1000
      插入转账流水(用于幂等)
      冻结金额+1000
      创建转账流水(用于幂等)
      confirm冻结金额-1000
      更新转账流水状态(用于幂等)
      冻结金额-1000
      金额+1000
      更新流水状态(用于幂等)
      cancel冻结金额-1000
      金额+1000
      更新转账流水状态(用于幂等)
      冻结金额-1000
      更新流水状态(用于幂等)
    3. 项目源码:https://github.com/jannal/transaction/blob/master/seata-tcc

    4. 添加依赖

      //seata
      compile 'io.seata:seata-spring-boot-starter:1.4.2'
      
      • 1
      • 2
    5. 服务模块

      服务描述
      seata-service-account0账户服务(A银行)
      seata-service-account1积分服务(B银行)
      seata-service-aggregation聚合服务

    A银行代码

    1. 数据库设计

      CREATE DATABASE IF NOT EXISTS seata_tcc_bank0 DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_bin;
      
      DROP TABLE IF EXISTS `t_account`;
      CREATE TABLE `t_account`
      (
          `id`             int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
          `account_id`     varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '账户标识',
          `amount`         decimal(20, 2)                  NOT NULL DEFAULT '0.00' COMMENT '金额',
          `freezed_amount` decimal(20, 2)                  NOT NULL DEFAULT '0.00' COMMENT '冻结金额',
          `create_time`    datetime                        NOT NULL COMMENT '创建时间',
          `update_time`    datetime                        NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
          PRIMARY KEY (`id`),
          UNIQUE KEY `uniq_accout` (`account_id`) USING BTREE
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='账户表';
      
      DROP TABLE IF EXISTS `t_transfer_serial`;
      CREATE TABLE `t_transfer_serial`
      (
          `id`                     bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
          `transfer_serial_number` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '转账流水号',
          `account_from_id`        varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '转账账户',
          `account_to_id`          varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '进账账户',
          `amount`                 decimal(20, 2)                  NOT NULL DEFAULT '0.00' COMMENT '金额',
          `status`                 tinyint(4) NOT NULL DEFAULT '1' COMMENT '状态,1待处理,2处理,3废弃',
          `create_time`            datetime                        NOT NULL COMMENT '创建时间',
          `update_time`            datetime                        NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
          PRIMARY KEY (`id`) USING BTREE,
          UNIQUE KEY `unqi_transfer` (`transfer_serial_number`) USING BTREE
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
      
      INSERT INTO `seata_tcc_bank0`.`t_account`(`id`, `account_id`, `amount`, `freezed_amount`, `create_time`, `update_time`)
      VALUES (1, 'jannal', 10000.00, 0.00, '2022-05-03 17:23:37', '2022-05-03 17:23:39');
      
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
    2. seata配置

      seata.enabled=true
      seata.application-id=account0-tcc-provider-seata
      seata.registry.type=nacos
      # Server和Client端的值需一致,默认seata-server
      seata.registry.nacos.application=seata-server
      seata.registry.nacos.server-addr=192.168.101.8:8848
      seata.registry.nacos.group=DEFAULT_GROUP
      seata.registry.nacos.cluster=default
      seata.registry.nacos.username=root
      seata.registry.nacos.password=root
      seata.registry.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
      seata.config.type=nacos
      seata.config.nacos.data-id=seataServer.properties
      seata.config.nacos.server-addr=192.168.101.8:8848
      seata.config.nacos.group=DEFAULT_GROUP
      seata.config.nacos.username=root
      seata.config.nacos.password=root
      seata.config.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
      # nacos server默认值是my_test_tx_group
      seata.tx-service-group=my_test_tx_group
      seata.service.vgroup-mapping.my_test_tx_group=default
      seata.service.disable-global-transaction=false
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
    3. 账户的业务逻辑

      public interface AccountTransferService {
          /**
           * 尝试转账
           */
          public void tryPayment(TransferSerial transferSerial);
          /**
           * 确定扣款
           */
          public void confirmPayment(String transferSerialNumber);
          /**
           * 取消扣款
           */
          public void cancelPayment(TransferSerial transferSerial);
      }
      @Service
      @Slf4j
      public class AccountTransferServiceImpl implements AccountTransferService {
          @Autowired
          private AccountMapper accountMapper;
          @Autowired
          private TransferSerialMapper transferSerialMapper;
      
          @Override
          @Transactional(rollbackFor = Exception.class)
          public void tryPayment(TransferSerial transferSerial) {
              final String transactionId = RootContext.getXID();
              String accountFromId = transferSerial.getAccountFromId();
              String transferSerialNumber = transferSerial.getTransferSerialNumber();
              BigDecimal amount = transferSerial.getAmount();
              log.info("事务ID:[{}],tryPayment -> transferSerialNumber:[{}], accountFromId:[{}] , accountToId:[{}]"
                      , transactionId
                      , transferSerialNumber
                      , accountFromId
                      , transferSerial.getAccountToId()
              );
              TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
              if (transferSerialExist != null) {
                  log.warn("事务ID:[{}],{}已经处理,忽略重复提交", transactionId, transferSerialNumber);
                  return;
              }
              Account accountExist = accountMapper.findByAmountIdForUpdate(accountFromId);
              if (accountExist == null) {
                  throw ValidateParamsException.valueOfParamsIllegal(accountFromId + "账户不存在");
              }
              //判断账户余额是否充足
              if (accountExist.getAmount().subtract(amount).compareTo(BigDecimal.ZERO) < 0) {
                  throw ValidateParamsException.valueOfParamsIllegal(accountFromId + "账户余额不足");
              }
              Account accountNew = new Account();
              accountNew.setId(accountExist.getId());
              accountNew.setAmount(accountExist.getAmount().subtract(amount));
              accountNew.setFreezedAmount(accountExist.getFreezedAmount().add(amount));
              accountNew.setUpdateTime(new Date());
              accountMapper.update(accountNew);
      
              Date date = new Date();
              transferSerial.setCreateTime(date);
              transferSerial.setUpdateTime(date);
              transferSerial.setStatus(TransferSerial.Status.PENDING.getStatus());
              transferSerialMapper.insert(transferSerial);
      
          }
      
          @Override
          public void confirmPayment(String transferSerialNumber) {
              final String transactionId = RootContext.getXID();
              TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
              // 发生空提交,这是不允许的,需要终止
              if (transferSerialExist == null) {
                  //log.warn("事务ID:[{}],{}不存在,忽略提交", transactionId, transferSerialNumber);
                  throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "事务状态不存在");
              }
              if (TransferSerial.Status.DISCARD.getStatus() == transferSerialExist.getStatus()) {
                  log.warn("事务ID:[{}],{}是回滚状态,已经回滚的事务不能再次提交", transactionId, transferSerialNumber);
                  throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "已经回滚的事务不能再次提交");
              }
              if (TransferSerial.Status.FINISHED.getStatus() == transferSerialExist.getStatus()) {
                  log.warn("事务ID:[{}],{}已经是完成状态,忽略重复提交", transactionId, transferSerialNumber);
                  return;
              }
              //更新流水状态
              TransferSerial transferSerialNew = new TransferSerial();
              transferSerialNew.setId(transferSerialExist.getId());
              transferSerialNew.setStatus(TransferSerial.Status.FINISHED.getStatus());
              transferSerialMapper.update(transferSerialNew);
      
              //这里使用悲观锁(由于try已经预留资源,这里无需再次判断金额是否超出余额)
              Account account = accountMapper.findByAmountIdForUpdate(transferSerialExist.getAccountFromId());
              //冻结金额释放
              Account accountNew = new Account();
              accountNew.setId(account.getId());
              accountNew.setFreezedAmount(account.getFreezedAmount().subtract(transferSerialExist.getAmount()));
              accountNew.setUpdateTime(new Date());
              accountMapper.update(accountNew);
          }
      
          @Override
          public void cancelPayment(TransferSerial transferSerial) {
              final String transactionId = RootContext.getXID();
              final String transferSerialNumber = transferSerial.getTransferSerialNumber();
              //非悲观锁,避免锁表
              TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumber(transferSerialNumber);
              Date date = new Date();
              // 如果插入失败(Try正在插入),就等待重试。插入成功,可确保后续Try不会执行
              if (transferSerialExist == null) {
                  log.warn("事务ID:[{}],{}不存在,插入取消状态", transactionId, transferSerialNumber);
                  // 插入一条空记录,确保后续的try不会被执行
                  transferSerial.setCreateTime(date);
                  transferSerial.setUpdateTime(date);
                  transferSerial.setStatus(TransferSerial.Status.DISCARD.getStatus());
                  transferSerialMapper.insert(transferSerial);
                  return;
              }
              //存在后再使用悲观锁,防止悬挂
              transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
              if (TransferSerial.Status.DISCARD.getStatus() == transferSerialExist.getStatus()) {
                  log.warn("事务ID:[{}],{}是回滚状态,忽略重复提交", transactionId, transferSerialNumber);
                  return;
              }
      
              if (TransferSerial.Status.FINISHED.getStatus() == transferSerialExist.getStatus()) {
                  log.warn("事务ID:[{}],{}是完成状态,已经提交的事务不能进行回滚", transactionId, transferSerialNumber);
                  throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "已经提交的事务不能进行回滚");
              }
      
              Account account = accountMapper.findByAmountIdForUpdate(transferSerialExist.getAccountFromId());
              if (account.getFreezedAmount().subtract(transferSerialExist.getAmount())
                      .compareTo(BigDecimal.ZERO) < 0) {
                  // 除非数据库数据被篡改,一般不可能出现这种情况,这里做一个防御
                  String message = String.format("事务ID:[%s],冻结金额[%s]-转账金额[%s]<0,检查数据是否错误",
                          transactionId,
                          account.getFreezedAmount(),
                          transferSerialExist.getAmount());
                  throw ValidateParamsException.valueOfParamsIllegal(message);
              }
              //更新流水状态
              TransferSerial transferSerialNew = new TransferSerial();
              transferSerialNew.setId(transferSerialExist.getId());
              transferSerialNew.setStatus(TransferSerial.Status.DISCARD.getStatus());
              transferSerialMapper.update(transferSerialNew);
      
              //冻结金额释放
              Account accountNew = new Account();
              accountNew.setId(account.getId());
              accountNew.setAmount(account.getAmount().add(transferSerialExist.getAmount()));
              accountNew.setFreezedAmount(account.getFreezedAmount().subtract(transferSerialExist.getAmount()));
              accountNew.setUpdateTime(new Date());
              accountMapper.update(accountNew);
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
    4. 账户的Dubbo服务接口,整合Seata。

      public interface AccountTransferFacadeService {
          /**
           *   @TwoPhaseBusinessAction: 定义两阶段提交
           *   name = 该tcc的bean名称,全局唯一
           *   commitMethod = confirmTransfer为二阶段确认方法
           *   rollbackMethod = cancelTransfer为二阶段取消方法
           *   BusinessActionContextParameter注解 传递参数到二阶段中
           *   BusinessActionContext TCC事务上下文
           * try阶段,from给to转钱
           */
          @TwoPhaseBusinessAction(name = "accountTransferFacadeService", commitMethod = "confirmTransfer", rollbackMethod = "cancelTransfer")
          boolean prepareTransfer(@BusinessActionContextParameter(paramName = "accountTransferFacadeRequestDTO") AccountTransferFacadeRequestDTO accountTransferRequestDTO);
      
          /**
           * 确认方法,保证与commitMethod一致
           */
          boolean confirmTransfer(BusinessActionContext context);
      
          /**
           * 确认方法,保证与rollbackMethod一致
           */
          boolean cancelTransfer(BusinessActionContext context);
      
      }
      @Slf4j
      @DubboService(version = "1.0.0")
      public class AccountTransferFacadeServiceImpl implements AccountTransferFacadeService {
      
          @Autowired
          private AccountTransferService accountTransferService;
      
          @Override
          public boolean prepareTransfer(AccountTransferFacadeRequestDTO accountTransferFacadeRequestDTO) {
              TransferSerial transferSerial = new TransferSerial();
              BeanUtils.copyProperties(accountTransferFacadeRequestDTO, transferSerial);
              accountTransferService.tryPayment(transferSerial);
              return true;
          }
      
          @Override
          public boolean confirmTransfer(BusinessActionContext context) {
              //返回的是com.alibaba.fastjson.JSONObject,不可直接强制转换为对象
              JSONObject jsonContext = (JSONObject) context.getActionContext("accountTransferFacadeRequestDTO");
              AccountTransferFacadeRequestDTO accountTransferRequestDTO = JSONObject.toJavaObject(jsonContext, AccountTransferFacadeRequestDTO.class);
              accountTransferService.confirmPayment(accountTransferRequestDTO.getTransferSerialNumber());
              return true;
          }
      
          @Override
          public boolean cancelTransfer(BusinessActionContext context) {
              JSONObject jsonContext = (JSONObject) context.getActionContext("accountTransferFacadeRequestDTO");
              AccountTransferFacadeRequestDTO accountTransferRequestDTO = JSONObject.toJavaObject(jsonContext, AccountTransferFacadeRequestDTO.class);
              TransferSerial transferSerial = new TransferSerial();
              BeanUtils.copyProperties(accountTransferRequestDTO, transferSerial);
              accountTransferService.cancelPayment(transferSerial);
              return true;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58

    B银行代码

    1. 数据库设计

      CREATE DATABASE IF NOT EXISTS seata_tcc_bank1 DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_bin;
      
      DROP TABLE IF EXISTS `t_account`;
      CREATE TABLE `t_account`
      (
          `id`             int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
          `account_id`     varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '账户标识',
          `amount`         decimal(20, 2)                  NOT NULL DEFAULT '0.00' COMMENT '金额',
          `freezed_amount` decimal(20, 2)                  NOT NULL DEFAULT '0.00' COMMENT '冻结金额',
          `create_time`    datetime                        NOT NULL COMMENT '创建时间',
          `update_time`    datetime                        NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
          PRIMARY KEY (`id`),
          UNIQUE KEY `uniq_accout` (`account_id`) USING BTREE
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='账户表';
      
      DROP TABLE IF EXISTS `t_transfer_serial`;
      CREATE TABLE `t_transfer_serial`
      (
          `id`                     bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
          `transfer_serial_number` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '转账流水号',
          `account_from_id`        varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '转账账户',
          `account_to_id`          varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '进账账户',
          `amount`                 decimal(20, 2)                  NOT NULL DEFAULT '0.00' COMMENT '金额',
          `status`                 tinyint(4) NOT NULL DEFAULT '1' COMMENT '状态,1待处理,2处理,3废弃',
          `create_time`            datetime                        NOT NULL COMMENT '创建时间',
          `update_time`            datetime                        NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
          PRIMARY KEY (`id`) USING BTREE,
          UNIQUE KEY `unqi_transfer` (`transfer_serial_number`) USING BTREE
      ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
      
      INSERT INTO `t_account`(`id`, `account_id`, `amount`, `freezed_amount`, `create_time`, `update_time`)
      VALUES (1, 'tom', 2000.00, 0.00, '2022-05-03 17:24:15', '2022-05-03 17:24:17');
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
    2. Seata配置

      seata.enabled=true
      seata.application-id=account1-tcc-provider-seata
      seata.registry.type=nacos
      # Server和Client端的值需一致,默认seata-server
      seata.registry.nacos.application=seata-server
      seata.registry.nacos.server-addr=192.168.101.8:8848
      seata.registry.nacos.group=DEFAULT_GROUP
      seata.registry.nacos.cluster=default
      seata.registry.nacos.username=root
      seata.registry.nacos.password=root
      seata.registry.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
      seata.config.type=nacos
      seata.config.nacos.data-id=seataServer.properties
      seata.config.nacos.server-addr=192.168.101.8:8848
      seata.config.nacos.group=DEFAULT_GROUP
      seata.config.nacos.username=root
      seata.config.nacos.password=root
      seata.config.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
      # nacos server默认值是my_test_tx_group
      seata.tx-service-group=my_test_tx_group
      seata.service.vgroup-mapping.my_test_tx_group=default
      seata.service.disable-global-transaction=false
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
    3. 账户业务逻辑

      public interface AccountReceiveService {
          /**
           * 尝试收款
           */
          public void tryReceive(TransferSerial transferSerial);
          /**
           * 确定收款
           */
          public void confirmReceive(String transferSerialNumber);
          /**
           * 取消收款
           */
          public void cancelReceive(TransferSerial transferSerial);
      }
      @Service
      @Slf4j
      public class AccountReceiveServiceImpl implements AccountReceiveService {
          @Autowired
          private AccountMapper accountMapper;
          @Autowired
          private TransferSerialMapper transferSerialMapper;
      
          @Override
          public void tryReceive(TransferSerial transferSerial) {
              final String transactionId = RootContext.getXID();
              final String accountFromId = transferSerial.getAccountFromId();
              final String accountToId = transferSerial.getAccountToId();
              final String transferSerialNumber = transferSerial.getTransferSerialNumber();
              log.info("事务ID:[{}],tryPayment -> transferSerialNumber:[{}], accountFromId:[{}] , accountToId:[{}]"
                      , transactionId
                      , transferSerialNumber
                      , accountFromId
                      , accountToId
              );
              TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
              if (transferSerialExist != null) {
                  log.warn("事务ID:[{}],{}已经处理,忽略重复提交", transactionId, transferSerialNumber);
                  return;
              }
              Account accountExist = accountMapper.findByAmountIdForUpdate(accountToId);
              if (accountExist == null) {
                  throw ValidateParamsException.valueOfParamsIllegal(accountToId + "账户不存在");
              }
      
              Date date = new Date();
              Account accountNew = new Account();
              accountNew.setId(accountExist.getId());
              accountNew.setFreezedAmount(accountExist.getFreezedAmount().add(transferSerial.getAmount()));
              accountNew.setUpdateTime(date);
              accountMapper.update(accountNew);
      
              transferSerial.setCreateTime(date);
              transferSerial.setUpdateTime(date);
              transferSerial.setStatus(TransferSerial.Status.PENDING.getStatus());
              transferSerialMapper.insert(transferSerial);
      
          }
      
          @Override
          public void confirmReceive(String transferSerialNumber) {
              final String transactionId = RootContext.getXID();
              TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
              // 发生空提交,这是不允许的,需要终止
              if (transferSerialExist == null) {
                  log.error("事务ID:[{}],{}不存在", transactionId, transferSerialNumber);
                  throw ValidateParamsException.valueOfParamsIllegal("事务状态" + transferSerialNumber + "不存在");
              }
              if (TransferSerial.Status.DISCARD.getStatus() == transferSerialExist.getStatus()) {
                  log.warn("事务ID:[{}],{}是回滚状态,已经回滚的事务不能再次提交", transactionId, transferSerialNumber);
                  throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "已经回滚的事务不能再次提交");
              }
              if (TransferSerial.Status.FINISHED.getStatus() == transferSerialExist.getStatus()) {
                  log.warn("事务ID:[{}],{}已经是完成状态,忽略重复提交", transactionId, transferSerialNumber);
                  return;
              }
              //更新流水状态
              TransferSerial transferSerialNew = new TransferSerial();
              transferSerialNew.setId(transferSerialExist.getId());
              transferSerialNew.setStatus(TransferSerial.Status.FINISHED.getStatus());
              transferSerialMapper.update(transferSerialNew);
      
              //这里使用悲观锁(由于try已经预留资源,这里无需再次判断金额是否超出余额)
              Account account = accountMapper.findByAmountIdForUpdate(transferSerialExist.getAccountToId());
              //冻结金额释放
              Account accountNew = new Account();
              accountNew.setId(account.getId());
              accountNew.setAmount(account.getAmount().add(transferSerialExist.getAmount()));
              accountNew.setFreezedAmount(account.getFreezedAmount().subtract(transferSerialExist.getAmount()));
              accountNew.setUpdateTime(new Date());
              accountMapper.update(accountNew);
          }
      
          @Override
          public void cancelReceive(TransferSerial transferSerial) {
              final String transactionId = RootContext.getXID();
              final String transferSerialNumber = transferSerial.getTransferSerialNumber();
      
              //非悲观锁,避免锁表
              TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumber(transferSerialNumber);
              Date date = new Date();
              // 如果插入失败(Try正在插入),就等待重试。插入成功,可确保后续Try不会执行
              if (transferSerialExist == null) {
                  log.warn("事务ID:[{}],{}不存在,插入取消状态", transactionId, transferSerialNumber);
                  // 插入一条空记录,确保后续的try不会被执行
                  transferSerial.setCreateTime(date);
                  transferSerial.setUpdateTime(date);
                  transferSerial.setStatus(TransferSerial.Status.DISCARD.getStatus());
                  transferSerialMapper.insert(transferSerial);
                  return;
              }
              //存在后再使用悲观锁,防止悬挂
              transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
              if (TransferSerial.Status.DISCARD.getStatus() == transferSerialExist.getStatus()) {
                  log.warn("事务ID:[{}],{}是回滚状态,忽略重复提交", transactionId, transferSerialNumber);
                  return;
              }
      
              if (TransferSerial.Status.FINISHED.getStatus() == transferSerialExist.getStatus()) {
                  log.warn("事务ID:[{}],{}是完成状态,已经提交的事务不能进行回滚", transactionId, transferSerialNumber);
                  throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "已经提交的事务不能进行回滚");
              }
      
              Account account = accountMapper.findByAmountIdForUpdate(transferSerialExist.getAccountToId());
              if (account.getFreezedAmount().subtract(transferSerialExist.getAmount())
                      .compareTo(BigDecimal.ZERO) < 0) {
                  // 除非数据库数据被篡改,不可能出现这种情况,这里做一个防御
                  String message = String.format("事务ID:[%s],冻结金额[%s]-转账金额[%s]<0,检查数据是否错误",
                          transactionId,
                          account.getFreezedAmount(),
                          transferSerialExist.getAmount());
                  throw ValidateParamsException.valueOfParamsIllegal(message);
              }
              //更新流水状态
              TransferSerial transferSerialNew = new TransferSerial();
              transferSerialNew.setId(transferSerialExist.getId());
              transferSerialNew.setStatus(TransferSerial.Status.DISCARD.getStatus());
              transferSerialMapper.update(transferSerialNew);
      
              //冻结金额释放
              Account accountNew = new Account();
              accountNew.setId(account.getId());
              accountNew.setFreezedAmount(account.getFreezedAmount().subtract(transferSerialExist.getAmount()));
              accountNew.setUpdateTime(date);
              accountMapper.update(accountNew);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
    4. 账户的Dubbo服务接口,整合Seata。

      public interface AccountReceiveFacadeService {
          /**
           *   @TwoPhaseBusinessAction: 定义两阶段提交
           *   name = 该tcc的bean名称,全局唯一
           *   commitMethod = confirmTransfer为二阶段确认方法
           *   rollbackMethod = cancelTransfer为二阶段取消方法
           *   BusinessActionContextParameter注解 传递参数到二阶段中
           * try阶段,from给to转钱
           */
          @TwoPhaseBusinessAction(name = "accountReceiveFacadeService",
                  commitMethod = "confirmReceive",
                  rollbackMethod = "cancelReceive")
          boolean prepareReceive(@BusinessActionContextParameter(paramName = "accountReceiveFacadeRequestDTO")
                                         AccountReceiveFacadeRequestDTO accountReceiveFacadeRequestDTO);
      
          /**
           * 确认方法,保证与commitMethod一致
           */
          boolean confirmReceive(BusinessActionContext context);
      
          /**
           * 确认方法,保证与rollbackMethod一致
           */
          boolean cancelReceive(BusinessActionContext context);
      
      }
      
      @DubboService(version = "1.0.0")
      public class AccountReceiveFacadeServiceImpl implements AccountReceiveFacadeService {
          @Autowired
          private AccountReceiveService accountReceiveService;
      
          @Override
          public boolean prepareReceive(AccountReceiveFacadeRequestDTO accountReceiveRequestDTO) {
              TransferSerial transferSerial = new TransferSerial();
              BeanUtils.copyProperties(accountReceiveRequestDTO, transferSerial);
              accountReceiveService.tryReceive(transferSerial);
              return true;
          }
      
          @Override
          public boolean confirmReceive(BusinessActionContext context) {
              //返回的是com.alibaba.fastjson.JSONObject,不可直接强制转换为对象
              JSONObject jsonContext = (JSONObject) context.getActionContext("accountReceiveFacadeRequestDTO");
              AccountReceiveFacadeRequestDTO accountReceiveRequestDTO = JSONObject.toJavaObject(jsonContext, AccountReceiveFacadeRequestDTO.class);
              accountReceiveService.confirmReceive(accountReceiveRequestDTO.getTransferSerialNumber());
              return true;
          }
      
          @Override
          public boolean cancelReceive(BusinessActionContext context) {
              JSONObject jsonContext = (JSONObject) context.getActionContext("accountReceiveFacadeRequestDTO");
              AccountReceiveFacadeRequestDTO accountReceiveRequestDTO = JSONObject.toJavaObject(jsonContext, AccountReceiveFacadeRequestDTO.class);
              TransferSerial transferSerial = new TransferSerial();
              BeanUtils.copyProperties(accountReceiveRequestDTO, transferSerial);
              accountReceiveService.cancelReceive(transferSerial);
              return true;
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59

    聚合服务代码

    1. 聚合服务代码,在方法上增加@GlobalTransactional

      @Service
      @Slf4j
      public class PaymentAggregationService {
          @DubboReference(version = "1.0.0")
          private AccountTransferFacadeService accountTransferFacadeService;
          @DubboReference(version = "1.0.0")
          private AccountReceiveFacadeService accountReceiveFacadeService;
      
          /**
           * 转账付款
           */
          @GlobalTransactional(rollbackFor = Exception.class, timeoutMills = 200000)
          public void transfer(AccountTransferFacadeRequestDTO accountTransferRequestDTO,
                               AccountReceiveFacadeRequestDTO accountReceiveFacadeRequestDTO) {
              log.info("全局事务id:{},start", RootContext.getXID());
              accountTransferFacadeService.prepareTransfer(accountTransferRequestDTO);
              accountReceiveFacadeService.prepareReceive(accountReceiveFacadeRequestDTO);
              log.info("全局事务id:{},end", RootContext.getXID());
          }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
    2. seata配置

      seata.enabled=true
      seata.application-id=aggregation-tcc-service-seata
      seata.registry.type=nacos
      # Server和Client端的值需一致,默认seata-server
      seata.registry.nacos.application=seata-server
      seata.registry.nacos.server-addr=192.168.101.8:8848
      seata.registry.nacos.group=DEFAULT_GROUP
      seata.registry.nacos.cluster=default
      seata.registry.nacos.username=root
      seata.registry.nacos.password=root
      seata.registry.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
      seata.config.type=nacos
      seata.config.nacos.data-id=seataServer.properties
      seata.config.nacos.server-addr=192.168.101.8:8848
      seata.config.nacos.group=DEFAULT_GROUP
      seata.config.nacos.username=root
      seata.config.nacos.password=root
      seata.config.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
      # nacos server默认值是my_test_tx_group
      seata.tx-service-group=my_test_tx_group
      seata.service.vgroup-mapping.my_test_tx_group=default
      seata.service.disable-global-transaction=false
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
  • 相关阅读:
    3dmax渲染大图有斑点怎么办?
    DDS通信协议与安全实践
    Nodejs中包的介绍及npm安装依赖包的多种方法
    不同的测试技术区分
    maven配置文件(一)Settings配置
    Himall商城Xml帮助类 XML序列化 OSS策略
    leetcode648. 单词替换
    软件测试必备:如何编写测试用例?
    关于#matlab#的问题:地铁5G信号2.6G频段覆盖,人多时怎么设置微基站,基站部署位置如何,怎么根据地铁站的人流量确定小区怎么划分
    新形势下平台转型,环保新能源企业如何应用SCM供应链平台实现利益最大化?
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126162829