TCC与XA两阶段提交很类似
在第一阶段,XA是写本地的redo和undo日志,还未提交事务,Try是通过独立的事务在业务上预留资源。
在第二阶段,XA或者提交事务或者回滚事务,TCC或者执行业务或者补偿业务。
XA始终持有资源锁,在两个阶段中保持同步阻塞。而TCC是业务层面的分布式事务,每一个阶段都是一个独立的事务,不持有资源锁。TCC没有XA事务的同步阻塞和单点故障的问题。
TCC与Saga相比多了一个Try阶段,显得更重,更加复杂
TCC流程图

业务场景:A银行用户向B银行用户转账1000元,那么对应的TCC三阶段逻辑为
| 账户0(A银行) | 账户1(B银行) | |
|---|---|---|
| try方法 | 冻结金额+1000 余额-1000 插入转账流水(用于幂等) | 冻结金额+1000 创建转账流水(用于幂等) |
| confirm | 冻结金额-1000 更新转账流水状态(用于幂等) | 冻结金额-1000 金额+1000 更新流水状态(用于幂等) |
| cancel | 冻结金额-1000 金额+1000 更新转账流水状态(用于幂等) | 冻结金额-1000 更新流水状态(用于幂等) |
项目源码:https://github.com/jannal/transaction/blob/master/seata-tcc
添加依赖
//seata
compile 'io.seata:seata-spring-boot-starter:1.4.2'
服务模块
| 服务 | 描述 |
|---|---|
| seata-service-account0 | 账户服务(A银行) |
| seata-service-account1 | 积分服务(B银行) |
| seata-service-aggregation | 聚合服务 |
数据库设计
CREATE DATABASE IF NOT EXISTS seata_tcc_bank0 DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_bin;
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account`
(
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`account_id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '账户标识',
`amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '金额',
`freezed_amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '冻结金额',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_accout` (`account_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='账户表';
DROP TABLE IF EXISTS `t_transfer_serial`;
CREATE TABLE `t_transfer_serial`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`transfer_serial_number` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '转账流水号',
`account_from_id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '转账账户',
`account_to_id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '进账账户',
`amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '金额',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '状态,1待处理,2处理,3废弃',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `unqi_transfer` (`transfer_serial_number`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
INSERT INTO `seata_tcc_bank0`.`t_account`(`id`, `account_id`, `amount`, `freezed_amount`, `create_time`, `update_time`)
VALUES (1, 'jannal', 10000.00, 0.00, '2022-05-03 17:23:37', '2022-05-03 17:23:39');
seata配置
seata.enabled=true
seata.application-id=account0-tcc-provider-seata
seata.registry.type=nacos
# Server和Client端的值需一致,默认seata-server
seata.registry.nacos.application=seata-server
seata.registry.nacos.server-addr=192.168.101.8:8848
seata.registry.nacos.group=DEFAULT_GROUP
seata.registry.nacos.cluster=default
seata.registry.nacos.username=root
seata.registry.nacos.password=root
seata.registry.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
seata.config.type=nacos
seata.config.nacos.data-id=seataServer.properties
seata.config.nacos.server-addr=192.168.101.8:8848
seata.config.nacos.group=DEFAULT_GROUP
seata.config.nacos.username=root
seata.config.nacos.password=root
seata.config.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
# nacos server默认值是my_test_tx_group
seata.tx-service-group=my_test_tx_group
seata.service.vgroup-mapping.my_test_tx_group=default
seata.service.disable-global-transaction=false
账户的业务逻辑
public interface AccountTransferService {
/**
* 尝试转账
*/
public void tryPayment(TransferSerial transferSerial);
/**
* 确定扣款
*/
public void confirmPayment(String transferSerialNumber);
/**
* 取消扣款
*/
public void cancelPayment(TransferSerial transferSerial);
}
@Service
@Slf4j
public class AccountTransferServiceImpl implements AccountTransferService {
@Autowired
private AccountMapper accountMapper;
@Autowired
private TransferSerialMapper transferSerialMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void tryPayment(TransferSerial transferSerial) {
final String transactionId = RootContext.getXID();
String accountFromId = transferSerial.getAccountFromId();
String transferSerialNumber = transferSerial.getTransferSerialNumber();
BigDecimal amount = transferSerial.getAmount();
log.info("事务ID:[{}],tryPayment -> transferSerialNumber:[{}], accountFromId:[{}] , accountToId:[{}]"
, transactionId
, transferSerialNumber
, accountFromId
, transferSerial.getAccountToId()
);
TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
if (transferSerialExist != null) {
log.warn("事务ID:[{}],{}已经处理,忽略重复提交", transactionId, transferSerialNumber);
return;
}
Account accountExist = accountMapper.findByAmountIdForUpdate(accountFromId);
if (accountExist == null) {
throw ValidateParamsException.valueOfParamsIllegal(accountFromId + "账户不存在");
}
//判断账户余额是否充足
if (accountExist.getAmount().subtract(amount).compareTo(BigDecimal.ZERO) < 0) {
throw ValidateParamsException.valueOfParamsIllegal(accountFromId + "账户余额不足");
}
Account accountNew = new Account();
accountNew.setId(accountExist.getId());
accountNew.setAmount(accountExist.getAmount().subtract(amount));
accountNew.setFreezedAmount(accountExist.getFreezedAmount().add(amount));
accountNew.setUpdateTime(new Date());
accountMapper.update(accountNew);
Date date = new Date();
transferSerial.setCreateTime(date);
transferSerial.setUpdateTime(date);
transferSerial.setStatus(TransferSerial.Status.PENDING.getStatus());
transferSerialMapper.insert(transferSerial);
}
@Override
public void confirmPayment(String transferSerialNumber) {
final String transactionId = RootContext.getXID();
TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
// 发生空提交,这是不允许的,需要终止
if (transferSerialExist == null) {
//log.warn("事务ID:[{}],{}不存在,忽略提交", transactionId, transferSerialNumber);
throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "事务状态不存在");
}
if (TransferSerial.Status.DISCARD.getStatus() == transferSerialExist.getStatus()) {
log.warn("事务ID:[{}],{}是回滚状态,已经回滚的事务不能再次提交", transactionId, transferSerialNumber);
throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "已经回滚的事务不能再次提交");
}
if (TransferSerial.Status.FINISHED.getStatus() == transferSerialExist.getStatus()) {
log.warn("事务ID:[{}],{}已经是完成状态,忽略重复提交", transactionId, transferSerialNumber);
return;
}
//更新流水状态
TransferSerial transferSerialNew = new TransferSerial();
transferSerialNew.setId(transferSerialExist.getId());
transferSerialNew.setStatus(TransferSerial.Status.FINISHED.getStatus());
transferSerialMapper.update(transferSerialNew);
//这里使用悲观锁(由于try已经预留资源,这里无需再次判断金额是否超出余额)
Account account = accountMapper.findByAmountIdForUpdate(transferSerialExist.getAccountFromId());
//冻结金额释放
Account accountNew = new Account();
accountNew.setId(account.getId());
accountNew.setFreezedAmount(account.getFreezedAmount().subtract(transferSerialExist.getAmount()));
accountNew.setUpdateTime(new Date());
accountMapper.update(accountNew);
}
@Override
public void cancelPayment(TransferSerial transferSerial) {
final String transactionId = RootContext.getXID();
final String transferSerialNumber = transferSerial.getTransferSerialNumber();
//非悲观锁,避免锁表
TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumber(transferSerialNumber);
Date date = new Date();
// 如果插入失败(Try正在插入),就等待重试。插入成功,可确保后续Try不会执行
if (transferSerialExist == null) {
log.warn("事务ID:[{}],{}不存在,插入取消状态", transactionId, transferSerialNumber);
// 插入一条空记录,确保后续的try不会被执行
transferSerial.setCreateTime(date);
transferSerial.setUpdateTime(date);
transferSerial.setStatus(TransferSerial.Status.DISCARD.getStatus());
transferSerialMapper.insert(transferSerial);
return;
}
//存在后再使用悲观锁,防止悬挂
transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
if (TransferSerial.Status.DISCARD.getStatus() == transferSerialExist.getStatus()) {
log.warn("事务ID:[{}],{}是回滚状态,忽略重复提交", transactionId, transferSerialNumber);
return;
}
if (TransferSerial.Status.FINISHED.getStatus() == transferSerialExist.getStatus()) {
log.warn("事务ID:[{}],{}是完成状态,已经提交的事务不能进行回滚", transactionId, transferSerialNumber);
throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "已经提交的事务不能进行回滚");
}
Account account = accountMapper.findByAmountIdForUpdate(transferSerialExist.getAccountFromId());
if (account.getFreezedAmount().subtract(transferSerialExist.getAmount())
.compareTo(BigDecimal.ZERO) < 0) {
// 除非数据库数据被篡改,一般不可能出现这种情况,这里做一个防御
String message = String.format("事务ID:[%s],冻结金额[%s]-转账金额[%s]<0,检查数据是否错误",
transactionId,
account.getFreezedAmount(),
transferSerialExist.getAmount());
throw ValidateParamsException.valueOfParamsIllegal(message);
}
//更新流水状态
TransferSerial transferSerialNew = new TransferSerial();
transferSerialNew.setId(transferSerialExist.getId());
transferSerialNew.setStatus(TransferSerial.Status.DISCARD.getStatus());
transferSerialMapper.update(transferSerialNew);
//冻结金额释放
Account accountNew = new Account();
accountNew.setId(account.getId());
accountNew.setAmount(account.getAmount().add(transferSerialExist.getAmount()));
accountNew.setFreezedAmount(account.getFreezedAmount().subtract(transferSerialExist.getAmount()));
accountNew.setUpdateTime(new Date());
accountMapper.update(accountNew);
}
}
账户的Dubbo服务接口,整合Seata。
public interface AccountTransferFacadeService {
/**
* @TwoPhaseBusinessAction: 定义两阶段提交
* name = 该tcc的bean名称,全局唯一
* commitMethod = confirmTransfer为二阶段确认方法
* rollbackMethod = cancelTransfer为二阶段取消方法
* BusinessActionContextParameter注解 传递参数到二阶段中
* BusinessActionContext TCC事务上下文
* try阶段,from给to转钱
*/
@TwoPhaseBusinessAction(name = "accountTransferFacadeService", commitMethod = "confirmTransfer", rollbackMethod = "cancelTransfer")
boolean prepareTransfer(@BusinessActionContextParameter(paramName = "accountTransferFacadeRequestDTO") AccountTransferFacadeRequestDTO accountTransferRequestDTO);
/**
* 确认方法,保证与commitMethod一致
*/
boolean confirmTransfer(BusinessActionContext context);
/**
* 确认方法,保证与rollbackMethod一致
*/
boolean cancelTransfer(BusinessActionContext context);
}
@Slf4j
@DubboService(version = "1.0.0")
public class AccountTransferFacadeServiceImpl implements AccountTransferFacadeService {
@Autowired
private AccountTransferService accountTransferService;
@Override
public boolean prepareTransfer(AccountTransferFacadeRequestDTO accountTransferFacadeRequestDTO) {
TransferSerial transferSerial = new TransferSerial();
BeanUtils.copyProperties(accountTransferFacadeRequestDTO, transferSerial);
accountTransferService.tryPayment(transferSerial);
return true;
}
@Override
public boolean confirmTransfer(BusinessActionContext context) {
//返回的是com.alibaba.fastjson.JSONObject,不可直接强制转换为对象
JSONObject jsonContext = (JSONObject) context.getActionContext("accountTransferFacadeRequestDTO");
AccountTransferFacadeRequestDTO accountTransferRequestDTO = JSONObject.toJavaObject(jsonContext, AccountTransferFacadeRequestDTO.class);
accountTransferService.confirmPayment(accountTransferRequestDTO.getTransferSerialNumber());
return true;
}
@Override
public boolean cancelTransfer(BusinessActionContext context) {
JSONObject jsonContext = (JSONObject) context.getActionContext("accountTransferFacadeRequestDTO");
AccountTransferFacadeRequestDTO accountTransferRequestDTO = JSONObject.toJavaObject(jsonContext, AccountTransferFacadeRequestDTO.class);
TransferSerial transferSerial = new TransferSerial();
BeanUtils.copyProperties(accountTransferRequestDTO, transferSerial);
accountTransferService.cancelPayment(transferSerial);
return true;
}
}
数据库设计
CREATE DATABASE IF NOT EXISTS seata_tcc_bank1 DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_bin;
DROP TABLE IF EXISTS `t_account`;
CREATE TABLE `t_account`
(
`id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`account_id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '账户标识',
`amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '金额',
`freezed_amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '冻结金额',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uniq_accout` (`account_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin COMMENT='账户表';
DROP TABLE IF EXISTS `t_transfer_serial`;
CREATE TABLE `t_transfer_serial`
(
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`transfer_serial_number` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '转账流水号',
`account_from_id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '转账账户',
`account_to_id` varchar(50) COLLATE utf8mb4_bin NOT NULL COMMENT '进账账户',
`amount` decimal(20, 2) NOT NULL DEFAULT '0.00' COMMENT '金额',
`status` tinyint(4) NOT NULL DEFAULT '1' COMMENT '状态,1待处理,2处理,3废弃',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_time` datetime NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `unqi_transfer` (`transfer_serial_number`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
INSERT INTO `t_account`(`id`, `account_id`, `amount`, `freezed_amount`, `create_time`, `update_time`)
VALUES (1, 'tom', 2000.00, 0.00, '2022-05-03 17:24:15', '2022-05-03 17:24:17');
Seata配置
seata.enabled=true
seata.application-id=account1-tcc-provider-seata
seata.registry.type=nacos
# Server和Client端的值需一致,默认seata-server
seata.registry.nacos.application=seata-server
seata.registry.nacos.server-addr=192.168.101.8:8848
seata.registry.nacos.group=DEFAULT_GROUP
seata.registry.nacos.cluster=default
seata.registry.nacos.username=root
seata.registry.nacos.password=root
seata.registry.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
seata.config.type=nacos
seata.config.nacos.data-id=seataServer.properties
seata.config.nacos.server-addr=192.168.101.8:8848
seata.config.nacos.group=DEFAULT_GROUP
seata.config.nacos.username=root
seata.config.nacos.password=root
seata.config.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
# nacos server默认值是my_test_tx_group
seata.tx-service-group=my_test_tx_group
seata.service.vgroup-mapping.my_test_tx_group=default
seata.service.disable-global-transaction=false
账户业务逻辑
public interface AccountReceiveService {
/**
* 尝试收款
*/
public void tryReceive(TransferSerial transferSerial);
/**
* 确定收款
*/
public void confirmReceive(String transferSerialNumber);
/**
* 取消收款
*/
public void cancelReceive(TransferSerial transferSerial);
}
@Service
@Slf4j
public class AccountReceiveServiceImpl implements AccountReceiveService {
@Autowired
private AccountMapper accountMapper;
@Autowired
private TransferSerialMapper transferSerialMapper;
@Override
public void tryReceive(TransferSerial transferSerial) {
final String transactionId = RootContext.getXID();
final String accountFromId = transferSerial.getAccountFromId();
final String accountToId = transferSerial.getAccountToId();
final String transferSerialNumber = transferSerial.getTransferSerialNumber();
log.info("事务ID:[{}],tryPayment -> transferSerialNumber:[{}], accountFromId:[{}] , accountToId:[{}]"
, transactionId
, transferSerialNumber
, accountFromId
, accountToId
);
TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
if (transferSerialExist != null) {
log.warn("事务ID:[{}],{}已经处理,忽略重复提交", transactionId, transferSerialNumber);
return;
}
Account accountExist = accountMapper.findByAmountIdForUpdate(accountToId);
if (accountExist == null) {
throw ValidateParamsException.valueOfParamsIllegal(accountToId + "账户不存在");
}
Date date = new Date();
Account accountNew = new Account();
accountNew.setId(accountExist.getId());
accountNew.setFreezedAmount(accountExist.getFreezedAmount().add(transferSerial.getAmount()));
accountNew.setUpdateTime(date);
accountMapper.update(accountNew);
transferSerial.setCreateTime(date);
transferSerial.setUpdateTime(date);
transferSerial.setStatus(TransferSerial.Status.PENDING.getStatus());
transferSerialMapper.insert(transferSerial);
}
@Override
public void confirmReceive(String transferSerialNumber) {
final String transactionId = RootContext.getXID();
TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
// 发生空提交,这是不允许的,需要终止
if (transferSerialExist == null) {
log.error("事务ID:[{}],{}不存在", transactionId, transferSerialNumber);
throw ValidateParamsException.valueOfParamsIllegal("事务状态" + transferSerialNumber + "不存在");
}
if (TransferSerial.Status.DISCARD.getStatus() == transferSerialExist.getStatus()) {
log.warn("事务ID:[{}],{}是回滚状态,已经回滚的事务不能再次提交", transactionId, transferSerialNumber);
throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "已经回滚的事务不能再次提交");
}
if (TransferSerial.Status.FINISHED.getStatus() == transferSerialExist.getStatus()) {
log.warn("事务ID:[{}],{}已经是完成状态,忽略重复提交", transactionId, transferSerialNumber);
return;
}
//更新流水状态
TransferSerial transferSerialNew = new TransferSerial();
transferSerialNew.setId(transferSerialExist.getId());
transferSerialNew.setStatus(TransferSerial.Status.FINISHED.getStatus());
transferSerialMapper.update(transferSerialNew);
//这里使用悲观锁(由于try已经预留资源,这里无需再次判断金额是否超出余额)
Account account = accountMapper.findByAmountIdForUpdate(transferSerialExist.getAccountToId());
//冻结金额释放
Account accountNew = new Account();
accountNew.setId(account.getId());
accountNew.setAmount(account.getAmount().add(transferSerialExist.getAmount()));
accountNew.setFreezedAmount(account.getFreezedAmount().subtract(transferSerialExist.getAmount()));
accountNew.setUpdateTime(new Date());
accountMapper.update(accountNew);
}
@Override
public void cancelReceive(TransferSerial transferSerial) {
final String transactionId = RootContext.getXID();
final String transferSerialNumber = transferSerial.getTransferSerialNumber();
//非悲观锁,避免锁表
TransferSerial transferSerialExist = transferSerialMapper.findBySerialNumber(transferSerialNumber);
Date date = new Date();
// 如果插入失败(Try正在插入),就等待重试。插入成功,可确保后续Try不会执行
if (transferSerialExist == null) {
log.warn("事务ID:[{}],{}不存在,插入取消状态", transactionId, transferSerialNumber);
// 插入一条空记录,确保后续的try不会被执行
transferSerial.setCreateTime(date);
transferSerial.setUpdateTime(date);
transferSerial.setStatus(TransferSerial.Status.DISCARD.getStatus());
transferSerialMapper.insert(transferSerial);
return;
}
//存在后再使用悲观锁,防止悬挂
transferSerialExist = transferSerialMapper.findBySerialNumberForUpdate(transferSerialNumber);
if (TransferSerial.Status.DISCARD.getStatus() == transferSerialExist.getStatus()) {
log.warn("事务ID:[{}],{}是回滚状态,忽略重复提交", transactionId, transferSerialNumber);
return;
}
if (TransferSerial.Status.FINISHED.getStatus() == transferSerialExist.getStatus()) {
log.warn("事务ID:[{}],{}是完成状态,已经提交的事务不能进行回滚", transactionId, transferSerialNumber);
throw ValidateParamsException.valueOfParamsIllegal(transferSerialNumber + "已经提交的事务不能进行回滚");
}
Account account = accountMapper.findByAmountIdForUpdate(transferSerialExist.getAccountToId());
if (account.getFreezedAmount().subtract(transferSerialExist.getAmount())
.compareTo(BigDecimal.ZERO) < 0) {
// 除非数据库数据被篡改,不可能出现这种情况,这里做一个防御
String message = String.format("事务ID:[%s],冻结金额[%s]-转账金额[%s]<0,检查数据是否错误",
transactionId,
account.getFreezedAmount(),
transferSerialExist.getAmount());
throw ValidateParamsException.valueOfParamsIllegal(message);
}
//更新流水状态
TransferSerial transferSerialNew = new TransferSerial();
transferSerialNew.setId(transferSerialExist.getId());
transferSerialNew.setStatus(TransferSerial.Status.DISCARD.getStatus());
transferSerialMapper.update(transferSerialNew);
//冻结金额释放
Account accountNew = new Account();
accountNew.setId(account.getId());
accountNew.setFreezedAmount(account.getFreezedAmount().subtract(transferSerialExist.getAmount()));
accountNew.setUpdateTime(date);
accountMapper.update(accountNew);
}
}
账户的Dubbo服务接口,整合Seata。
public interface AccountReceiveFacadeService {
/**
* @TwoPhaseBusinessAction: 定义两阶段提交
* name = 该tcc的bean名称,全局唯一
* commitMethod = confirmTransfer为二阶段确认方法
* rollbackMethod = cancelTransfer为二阶段取消方法
* BusinessActionContextParameter注解 传递参数到二阶段中
* try阶段,from给to转钱
*/
@TwoPhaseBusinessAction(name = "accountReceiveFacadeService",
commitMethod = "confirmReceive",
rollbackMethod = "cancelReceive")
boolean prepareReceive(@BusinessActionContextParameter(paramName = "accountReceiveFacadeRequestDTO")
AccountReceiveFacadeRequestDTO accountReceiveFacadeRequestDTO);
/**
* 确认方法,保证与commitMethod一致
*/
boolean confirmReceive(BusinessActionContext context);
/**
* 确认方法,保证与rollbackMethod一致
*/
boolean cancelReceive(BusinessActionContext context);
}
@DubboService(version = "1.0.0")
public class AccountReceiveFacadeServiceImpl implements AccountReceiveFacadeService {
@Autowired
private AccountReceiveService accountReceiveService;
@Override
public boolean prepareReceive(AccountReceiveFacadeRequestDTO accountReceiveRequestDTO) {
TransferSerial transferSerial = new TransferSerial();
BeanUtils.copyProperties(accountReceiveRequestDTO, transferSerial);
accountReceiveService.tryReceive(transferSerial);
return true;
}
@Override
public boolean confirmReceive(BusinessActionContext context) {
//返回的是com.alibaba.fastjson.JSONObject,不可直接强制转换为对象
JSONObject jsonContext = (JSONObject) context.getActionContext("accountReceiveFacadeRequestDTO");
AccountReceiveFacadeRequestDTO accountReceiveRequestDTO = JSONObject.toJavaObject(jsonContext, AccountReceiveFacadeRequestDTO.class);
accountReceiveService.confirmReceive(accountReceiveRequestDTO.getTransferSerialNumber());
return true;
}
@Override
public boolean cancelReceive(BusinessActionContext context) {
JSONObject jsonContext = (JSONObject) context.getActionContext("accountReceiveFacadeRequestDTO");
AccountReceiveFacadeRequestDTO accountReceiveRequestDTO = JSONObject.toJavaObject(jsonContext, AccountReceiveFacadeRequestDTO.class);
TransferSerial transferSerial = new TransferSerial();
BeanUtils.copyProperties(accountReceiveRequestDTO, transferSerial);
accountReceiveService.cancelReceive(transferSerial);
return true;
}
}
聚合服务代码,在方法上增加@GlobalTransactional
@Service
@Slf4j
public class PaymentAggregationService {
@DubboReference(version = "1.0.0")
private AccountTransferFacadeService accountTransferFacadeService;
@DubboReference(version = "1.0.0")
private AccountReceiveFacadeService accountReceiveFacadeService;
/**
* 转账付款
*/
@GlobalTransactional(rollbackFor = Exception.class, timeoutMills = 200000)
public void transfer(AccountTransferFacadeRequestDTO accountTransferRequestDTO,
AccountReceiveFacadeRequestDTO accountReceiveFacadeRequestDTO) {
log.info("全局事务id:{},start", RootContext.getXID());
accountTransferFacadeService.prepareTransfer(accountTransferRequestDTO);
accountReceiveFacadeService.prepareReceive(accountReceiveFacadeRequestDTO);
log.info("全局事务id:{},end", RootContext.getXID());
}
}
seata配置
seata.enabled=true
seata.application-id=aggregation-tcc-service-seata
seata.registry.type=nacos
# Server和Client端的值需一致,默认seata-server
seata.registry.nacos.application=seata-server
seata.registry.nacos.server-addr=192.168.101.8:8848
seata.registry.nacos.group=DEFAULT_GROUP
seata.registry.nacos.cluster=default
seata.registry.nacos.username=root
seata.registry.nacos.password=root
seata.registry.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
seata.config.type=nacos
seata.config.nacos.data-id=seataServer.properties
seata.config.nacos.server-addr=192.168.101.8:8848
seata.config.nacos.group=DEFAULT_GROUP
seata.config.nacos.username=root
seata.config.nacos.password=root
seata.config.nacos.namespace=e489e0de-8001-41b8-83a6-3241d426a9f7
# nacos server默认值是my_test_tx_group
seata.tx-service-group=my_test_tx_group
seata.service.vgroup-mapping.my_test_tx_group=default
seata.service.disable-global-transaction=false