分布式事务:https://www.processon.com/view/link/61cd52fb0e3e7441570801ab
Connection conn = ... //获取数据库连接
conn.setAutoCommit(false); //开启事务
try{
//...执行增删改查sql
conn.commit(); //提交事务
}catch (Exception e) {
conn.rollback();//事务回滚
}finally{
conn.close();//关闭链接
}
在微服务架构中,完成某一个业务功能可能需要横跨多个服务,操作多个数据库。这就涉及到到了分布式事务,需要操作的资源位于多个资源服务器上,而应用需要保证对于多个资源服务器的数据操作,要么全部成功,要么全部失败。本质上来说,分布式事务就是为了保证不同资源服务器的数据一致性。
跨库事务
跨库事务指的是,一个应用某个功能需要操作多个库,不同的库中存储不同的业务数据。下图演示了一个服务同时操作2个库的情况:

分库分表
通常一个库数据量比较大或者预期未来的数据量比较大,都会进行分库分表。如下图,将数据库B拆分成了2个库:
对于分库分表的情况,一般开发人员都会使用一些数据库中间件来降低sql操作的复杂性。如,对于sql:insert into user(id,name) values (1,“张三”),(2,“李四”)。这条sql是操作单库的语法,单库情况下,可以保证事务的一致性。 但是由于现在进行了分库分表,开发人员希望将1号记录插入分库1,2号记录插入分库2。所以数据库中间件要将其改写为2条sql,分别插入两个不同的分库,此时要保证两个库要不都成功,要不都失败,因此基本上所有的数据库中间件都面临着分布式事务的问题。

Service A完成某个功能需要直接操作数据库,同时需要调用Service B和Service C,而Service B又同时操作了2个数据库,Service C也操作了一个库。需要保证这些跨服务调用对多个数据库的操作要么都成功,要么都失败,实际上这可能是最典型的分布式事务场景。
小结:上述讨论的分布式事务场景中,无一例外的都直接或者间接的操作了多个数据库。如何保证事务的ACID特性,对于分布式事务实现方案而言,是非常大的挑战。同时,分布式事务实现方案还必须要考虑性能的问题,如果为了严格保证ACID特性,导致性能严重下降,那么对于一些要求快速响应的业务,是无法接受的。

两阶段提交协议(2PC),就是将提交(commit)过程划分为2个阶段(Phase)
TM通知各个RM准备提交它们的事务分支。如果RM判断自己进行的工作可以被提交,那就对工作内容进行持久化,再给TM肯定答复;要是发生了其他情况,那给TM的都是否定答复。
以mysql数据库为例,在第一阶段,事务管理器向所有涉及到的数据库服务器发出prepare"准备提交"请求,数据库收到请求后执行数据修改和日志记录等处理,处理完成后只是把事务的状态改成"可以提交",然后把结果返回给事务管理器。
TM根据阶段1各个RM prepare的结果,决定是提交还是回滚事务。如果所有的RM都prepare成功,那么TM通知所有的RM进行提交;如果有RM prepare失败的话,则TM通知所有RM回滚自己的事务分支。
以mysql数据库为例,如果第一阶段中所有数据库都prepare成功,那么事务管理器向数据库服务器发出"确认提交"请求,数据库服务器把事务的"可以提交"状态改为"提交完成"状态,然后返回应答。如果在第一阶段内有任何一个数据库的操作发生了错误,或者事务管理器收不到某个数据库的回应,则认为事务失败,回撤所有数据库的事务。数据库服务器收不到第二阶段的确认提交请求,也会把"可以提交"的事务回撤。

两阶段提交方案下全局事务的ACID特性,是依赖于RM的。一个全局事务内部包含了多个独立的事务分支,这一组事务分支要么都成功,要么都失败。各个事务分支的ACID特性共同构成了全局事务的ACID特性。也就是将单个事务分支支持的ACID特性提升一个层次到分布式事务的范畴。
https://seata.io/zh-cn/index.html
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。
Seata 提供的事务模式:
网站:
TC 为单独部署的 Server 服务端,TM 和 RM 为嵌入到应用中的 Client 客户端

Seata AT模式的核心是对业务无侵入,是一种改进后的两阶段提交,其设计思路如下:
业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。核心在于对业务sql进行解析,转换成undolog,并同时入库,这是怎么做的呢?

分布式事务操作成功,则TC通知RM异步删除undolog

分布式事务操作失败,TM向TC发送回滚请求,RM 收到协调器TC发来的回滚请求,通过 XID 和 Branch ID 找到相应的回滚日志记录,通过回滚记录生成反向的更新 SQL 并执行,以完成分支的回滚。

Seata分TC、TM和RM三个角色,TC(Server端)为单独服务端部署,TM和RM(Client端)由业务系统集成
Server端存储模式(store.mode)支持三种:
资源目录:https://github.com/seata/seata/tree/v1.5.1/script
Seata的注册中心是作用于Seata自身的,和Spring Cloud的注册中心无关Seata的配置中心是作用于Seata自身的,和Spring Cloud的配置中心无关
seata:
registry:
# support: nacos, eureka, redis, zk, consul, etcd3, sofa
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
cluster: default
username:
password:
config:
# support: nacos, consul, apollo, zk, etcd3
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: 7e838c12-8554-4231-82d5-6d93573ddf32
group: SEATA_GROUP
data-id: seataServer.properties
username:
password:
store.mode=db
store.lock.mode=db
store.session.mode=db
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=root
store.mode=db,由于seata是通过jdbc的executeBatch来批量插入全局锁的,根据MySQL官网的说明,连接参数中的rewriteBatchedStatements为true时,在执行executeBatch,并且操作类型为insert时,jdbc驱动会把对应的SQL优化成insert into () values (), ()的形式来提升批量插入的性能。事务分组如何找到后端Seata集群(TC)?
bin/seata-server.sh -p 8091 -h 127.0.0.1 -m db

首先需要在客户端Mysql库中建表
https://github.com/seata/seata/blob/develop/script/client/at/db/mysql.sql
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
<dependency>
<groupId>com.alibaba.cloudgroupId>
<artifactId>spring-cloud-starter-alibaba-seataartifactId>
dependency>
seata:
application-id: ${spring.application.name}
# seata 服务分组,要与服务端配置service.vgroup_mapping的后缀对应
tx-service-group: default_tx_group
registry:
# 指定nacos作为注册中心
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
config:
# 指定nacos作为配置中心
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: 7e838c12-8554-4231-82d5-6d93573ddf32
group: SEATA_GROUP
data-id: seataServer.properties
@Override
@GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
public Order saveOrder(OrderVo orderVo){
log.info("=============用户下单=================");
log.info("当前 XID: {}", RootContext.getXID());
// 保存订单
Order order = new Order();
order.setUserId(orderVo.getUserId());
order.setCommodityCode(orderVo.getCommodityCode());
order.setCount(orderVo.getCount());
order.setMoney(orderVo.getMoney());
order.setStatus(OrderStatus.INIT.getValue());
Integer saveOrderRecord = orderMapper.insert(order);
log.info("保存订单{}", saveOrderRecord > 0 ? "成功" : "失败");
//扣减库存
storageFeignService.deduct(orderVo.getCommodityCode(),orderVo.getCount());
//扣减余额
accountFeignService.debit(orderVo.getUserId(),orderVo.getMoney());
//更新订单
Integer updateOrderRecord = orderMapper.updateOrderStatus(order.getId(),OrderStatus.SUCCESS.getValue());
log.info("更新订单id:{} {}", order.getId(), updateOrderRecord > 0 ? "成功" : "失败");
return order;
}
XA 模型会一直占用资源,直到第二阶段结束才会释放数据库资源
AT 有前置镜像和后置镜像
XA 是一直占有资源
https://seata.io/zh-cn/docs/dev/mode/xa-mode.html

seata.data-source-proxy-mode = XA
seata:
# 是否开启spring-boot自动装配,默认true,包括数据源的自动代理以及GlobalTransactionScanner初始化
enabled: true
# 数据源代理模式 默认AT ******
data-source-proxy-mode: XA
application-id: ${spring.application.name}
# seata 服务分组,要与服务端配置service.vgroup_mapping的后缀对应
tx-service-group: default_tx_group
registry:
# 指定nacos作为注册中心
type: nacos
nacos:
application: seata-server
server-addr: 127.0.0.1:8848
namespace:
group: SEATA_GROUP
config:
# 指定nacos作为配置中心
type: nacos
nacos:
server-addr: 127.0.0.1:8848
namespace: 7e838c12-8554-4231-82d5-6d93573ddf32
group: SEATA_GROUP
data-id: seataServer.properties
Mysql XA事务Demo
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.48version>
dependency>
public class MysqlXADemo {
public static void main(String[] args) throws SQLException {
//true表示打印XA语句,,用于调试
boolean logXaCommands = true;
// 获得资源管理器操作接口实例 RM1
Connection conn1 = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_order", "root", "root");
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn1, logXaCommands);
XAResource rm1 = xaConn1.getXAResource();
// 获得资源管理器操作接口实例 RM2
Connection conn2 = DriverManager.getConnection("jdbc:mysql://localhost:3306/db_storage", "root", "root");
XAConnection xaConn2 = new MysqlXAConnection((com.mysql.jdbc.Connection) conn2, logXaCommands);
XAResource rm2 = xaConn2.getXAResource();
// AP请求TM执行一个分布式事务,TM生成全局事务id
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try {
// ==============分别执行RM1和RM2上的事务分支====================
// TM生成rm1上的事务分支id
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
// 执行rm1上的事务分支
rm1.start(xid1, XAResource.TMNOFLAGS);//One of TMNOFLAGS, TMJOIN, or TMRESUME.
PreparedStatement ps1 = conn1.prepareStatement("INSERT into order_tbl(user_id,commodity_code,count,money,status) VALUES (1001,2001,2,10,1)");
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
// TM生成rm2上的事务分支id
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 执行rm2上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = conn2.prepareStatement("update storage_tbl set count=count-2 where commodity_code=2001");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// ===================两阶段提交================================
// phase1:询问所有的RM 准备提交事务分支
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
// phase2:提交所有事务分支
boolean onePhase = false;
//TM判断有2个事务分支,所以不能优化为一阶段提交
if (rm1_prepare == XAResource.XA_OK && rm2_prepare == XAResource.XA_OK) {
//所有事务分支都prepare成功,提交所有事务分支
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {
//如果有事务分支没有成功,则回滚
rm1.rollback(xid1);
rm2.rollback(xid2);
}
} catch (XAException e) {
// 如果出现异常,也要进行回滚
e.printStackTrace();
}
}
}
TCC 基于分布式事务中的二阶段提交协议实现,它的全称为 Try-Confirm-Cancel,即资源预留(Try)、确认操作(Confirm)、取消操作(Cancel),他们的具体含义如下:
Try:对业务资源的检查并预留;
Confirm:对业务处理进行提交,即 commit 操作,只要 Try 成功,那么该步骤一定成功;
Cancel:对业务处理进行取消,即回滚操作,该步骤回对 Try 预留的资源进行释放。
TCC 是一种侵入式的分布式事务解决方案,以上三个操作都需要业务系统自行实现,对业务系统有着非常大的入侵性,设计相对复杂,但优点是 TCC 完全不依赖数据库,能够实现跨数据库、跨应用资源管理,对这些不同数据访问通过侵入式的编码方式实现一个原子操作,更好地解决了在各种复杂业务场景下的分布式事务问题。
try-commit

try-cancel

TCC相关注解如下:
TCC 幂等、悬挂和空回滚问题如何解决?
TCC 模式中存在的三大问题是幂等、悬挂和空回滚。在 Seata1.5.1 版本中,增加了一张事务控制表,表名是 tcc_fence_log 来解决这个问题。而在@TwoPhaseBusinessAction 注解中提到的属性 useTCCFence 就是来指定是否开启这个机制,这个属性值默认是 false。
微服务增加tcc_fence_log日志表 https://github.com/seata/seata/tree/develop/script/client/tcc/db
-- -------------------------------- The script use tcc fence --------------------------------
CREATE TABLE IF NOT EXISTS `tcc_fence_log`
(
`xid` VARCHAR(128) NOT NULL COMMENT 'global id',
`branch_id` BIGINT NOT NULL COMMENT 'branch id',
`action_name` VARCHAR(64) NOT NULL COMMENT 'action name',
`status` TINYINT NOT NULL COMMENT 'status(tried:1;committed:2;rollbacked:3;suspended:4)',
`gmt_create` DATETIME(3) NOT NULL COMMENT 'create time',
`gmt_modified` DATETIME(3) NOT NULL COMMENT 'update time',
PRIMARY KEY (`xid`, `branch_id`),
KEY `idx_gmt_modified` (`gmt_modified`),
KEY `idx_status` (`status`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
@LocalTCC
public interface StorageService {
/**
* Try: 库存-扣减数量,冻结库存+扣减数量
*
* 定义两阶段提交,在try阶段通过@TwoPhaseBusinessAction注解定义了分支事务的 resourceId,commit和 cancel 方法
* name = 该tcc的bean名称,全局唯一
* commitMethod = commit 为二阶段确认方法
* rollbackMethod = rollback 为二阶段取消方法
* BusinessActionContextParameter注解 传递参数到二阶段中
*
* @param commodityCode 商品编号
* @param count 扣减数量
* @return
*/
@TwoPhaseBusinessAction(name = "deduct", commitMethod = "commit", rollbackMethod = "rollback", useTCCFence = true)
boolean deduct(@BusinessActionContextParameter(paramName = "commodityCode") String commodityCode,
@BusinessActionContextParameter(paramName = "count") int count);
/**
*
* Confirm: 冻结库存-扣减数量
* 二阶段确认方法可以另命名,但要保证与commitMethod一致
* context可以传递try方法的参数
*
* @param actionContext
* @return
*/
boolean commit(BusinessActionContext actionContext);
/**
* Cancel: 库存+扣减数量,冻结库存-扣减数量
* 二阶段取消方法可以另命名,但要保证与rollbackMethod一致
*
* @param actionContext
* @return
*/
boolean rollback(BusinessActionContext actionContext);
}
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
@Autowired
private StorageMapper storageMapper;
@Transactional
@Override
public boolean deduct(String commodityCode, int count){
log.info("=============冻结库存=================");
log.info("当前 XID: {}", RootContext.getXID());
// 检查库存
checkStock(commodityCode,count);
log.info("开始冻结 {} 库存", commodityCode);
//冻结库存
Integer record = storageMapper.freezeStorage(commodityCode,count);
log.info("冻结 {} 库存结果:{}", commodityCode, record > 0 ? "操作成功" : "扣减库存失败");
return true;
}
@Override
public boolean commit(BusinessActionContext actionContext) {
log.info("=============扣减冻结库存=================");
String commodityCode = actionContext.getActionContext("commodityCode").toString();
int count = (int) actionContext.getActionContext("count");
//扣减冻结库存
storageMapper.reduceFreezeStorage(commodityCode,count);
return true;
}
@Override
public boolean rollback(BusinessActionContext actionContext) {
log.info("=============解冻库存=================");
String commodityCode = actionContext.getActionContext("commodityCode").toString();
int count = (int) actionContext.getActionContext("count");
//扣减冻结库存
storageMapper.unfreezeStorage(commodityCode,count);
return true;
}
private void checkStock(String commodityCode, int count){
log.info("检查 {} 库存", commodityCode);
Storage storage = storageMapper.findByCommodityCode(commodityCode);
if (storage.getCount() < count) {
log.warn("{} 库存不足,当前库存:{}", commodityCode, count);
throw new RuntimeException("库存不足");
}
}
}
@Override
@GlobalTransactional(name="createOrder",rollbackFor=Exception.class)
public Order saveOrder(OrderVo orderVo) {
log.info("=============用户下单=================");
log.info("当前 XID: {}", RootContext.getXID());
//获取全局唯一订单号 测试使用
Long orderId = UUIDGenerator.generateUUID();
//阶段一: 创建订单
Order order = orderService.prepareSaveOrder(orderVo,orderId);
//扣减库存
storageFeignService.deduct(orderVo.getCommodityCode(), orderVo.getCount());
//扣减余额
accountFeignService.debit(orderVo.getUserId(), orderVo.getMoney());
return order;
}
要想防止空回滚,那么必须在 Cancel 方法中识别这是一个空回滚,Seata 是如何做的呢?
Seata 的做法是新增一个 TCC 事务控制表,包含事务的 XID 和 BranchID 信息,在 Try 方法执行时插入一条记录,表示一阶段执行了,执行 Cancel 方法时读取这条记录,如果记录不存在,说明 Try 方法没有执行。
Seata 是如何处理幂等问题的呢?
同样的也是在 TCC 事务控制表中增加一个记录状态的字段 status,该字段有 3 个值,分别为:
二阶段 Confirm/Cancel 方法执行后,将状态改为 committed 或 rollbacked 状态。当重复调用二阶段 Confirm/Cancel 方法时,判断事务状态即可解决幂等问题。
Seata 是怎么处理悬挂的呢?
在 TCC 事务控制表记录状态的字段 status 中增加一个状态:
当执行二阶段 Cancel 方法时,如果发现 TCC 事务控制表有相关记录,说明二阶段 Cancel 方法优先一阶段 Try 方法执行,因此插入一条 status=4 状态的记录,当一阶段 Try 方法后面执行时,判断 status=4 ,则说明有二阶段 Cancel 已执行,并返回 false 以阻止一阶段 Try 方法执行成功。
设计流程 https://www.processon.com/view/link/6311bfda1e0853187c0ecd8c
工作流程 https://www.processon.com/view/link/6007f5c00791294a0e9b611a
源码 https://www.processon.com/view/link/5f743063e0b34d0711f001d2

1. Spring Boot TC
@Component
public class ServerRunner implements CommandLineRunner, DisposableBean {
...
@Override
public void run(String... args) {
Server.start(args);
}
...
}
2. seata client 入口
SeataFeignClient#execute =》 headers.put("TX_XID", seataXid)
SeataHandlerInterceptor#preHandle =》 RootContext.bind(request.getHeader("TX_XID"))
3. GlobalTransactional TM TC RM
GlobalTransactionalInterceptor#invoke => MethodInterceptor
TM TransactionManager
TC DefaultCoordinator
RM ResourceManager
TransactionManager (TM)
DefaultTransactionManager
TransactionManagerHolder为创建单例TransactionManager的工厂,可以使用EnhancedServiceLoader的spi机制加载用户自定义的类,默认为DefaultTransactionManager。
GlobalTransaction 接口提供给用户开启事务,提交,回滚,获取状态等方法。
DefaultGlobalTransaction是GlobalTransaction接口的默认实现,它持有TransactionManager对象,默认开启事务超时时间为60秒,默认名称为default,因为调用者的业务方法可能多重嵌套创建多个GlobalTransaction对象开启事务方法,因此GlobalTransaction有GlobalTransactionRole角色属性,只有Launcher角色的才有开启、提交、回滚事务的权利。
GlobalTransactionContext为操作GlobalTransaction的工具类,提供创建新的GlobalTransaction,获取当前线程有的GlobalTransaction等方法。
GlobalTransactionScanner继承AbstractAutoProxyCreator类,即实现了SmartInstantiationAwareBeanPostProcessor接口,会在spring容器启动初始化bean的时候,对bean进行代理操作。wrapIfNecessary为继承父类代理bean的核心方法,如果用户配置了service.disableGlobalTransaction为false属性则注解不生效直接返回,否则对GlobalTransactional或GlobalLock的方法进行拦截代理。
GlobalTransactionalInterceptor实现aop的MethodInterceptor接口,对有@GlobalTransactional或GlobalLock注解的方法进行代理。
TransactionalTemplate模板类提供了一个开启事务,执行业务,成功提交和失败回滚的模板方法execute(TransactionalExecutor business)。
DefaultCoordinator (TC)
DefaultCoordinator即为TC,全局事务默认的事务协调器。它继承AbstractTCInboundHandler接口,为TC接收RM和TM的request请求数据,是进行相应处理的处理器。实现TransactionMessageHandler接口,去处理收到的RPC信息。实现ResourceManagerInbound接口,发送至RM的branchCommit,branchRollback请求。
Core接口为seata处理全球事务协调器TC的核心处理器,它继承ResourceManagerOutbound接口,接受来自RM的rpc网络请求(branchRegister,branchReport,lockQuery)。同时继承TransactionManager接口,接受来自TM的rpc网络请求(begin,commit,rollback,getStatus),另外提供提供3个接口方法。
GlobalSession是seata协调器DefaultCoordinator管理维护的重要部件,当用户开启全局分布式事务,TM调用begin方法请求至TC,TC则创建GlobalSession实例对象,返回唯一的xid。它实现SessionLifecycle接口,提供begin,changeStatus,changeBranchStatus,addBranch,removeBranch等操作session和branchSession的方法。
BranchSession为分支session,管理分支数据,受globalSession统一调度管理,它的lock和unlock方法由lockManger实现。
DefaultLockManager是LockManager的默认实现,它获取branchSession的lockKey,转换成List,委派Locker进行处理。
Locker接口提供根据行数据获取锁,释放锁,是否锁住和清除所有锁的方法。
ResourceManager (RM)
ResourceManager是seata的重要组件之一,RM负责管理分支数据资源的事务。
AbstractResourceManager实现ResourceManager提供模板方法。DefaultResourceManager适配所有的ResourceManager,所有方法调用都委派给对应负责的ResourceManager处理。
DataSourceManager 此为AT模式核心管理器,DataSourceManager继承AbstractResourceManager,管理数据库Resouce的注册,提交以及回滚等
AsyncWorker DataSourceManager事务提交委派给AsyncWorker进行提交的,因为都成功了,无需回滚成功的数据,只需要删除生成的操作日志就行,采用异步方式,提高效率。
UndoLogManager
Resource能被ResourceManager管理并且能够关联GlobalTransaction。
DataSourceProxy实现Resource接口,BranchType为AT自动模式。它继承AbstractDataSourceProxy代理类,所有的DataSource相关的方法调用传入的targetDataSource代理类的方法,除了创建connection方法为创建ConnectionProxy代理类。对象初始化时获取连接的jdbcUrl作为resourceId,并注册至DefaultResourceManager进行管理。同时还提供获取原始连接不被代理的getPlainConnection方法。
ExecuteTemplate为具体statement的execute,executeQuery和executeUpdate执行提供模板方法
https://shardingsphere.apache.org/document/current/cn/reference/transaction/base-transaction-seata/

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
`branch_id` BIGINT NOT NULL COMMENT 'branch transaction id',
`xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id',
`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',
`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',
`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',
`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',
UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB
AUTO_INCREMENT = 1
DEFAULT CHARSET = utf8mb4 COMMENT ='AT transaction mode undo table';
<dependency>
<groupId>com.alibaba.cloudgroupId>
<artifactId>spring-cloud-starter-alibaba-seataartifactId>
<version>2.2.8.RELEASEversion>
<exclusions>
<exclusion>
<groupId>io.seatagroupId>
<artifactId>seata-spring-boot-starterartifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>io.seatagroupId>
<artifactId>seata-spring-boot-starterartifactId>
<version>1.5.1version>
dependency>
<dependency>
<groupId>org.apache.shardingspheregroupId>
<artifactId>sharding-jdbc-spring-boot-starterartifactId>
<version>4.1.1version>
dependency>
<dependency>
<groupId>org.apache.shardingspheregroupId>
<artifactId>sharding-transaction-base-seata-atartifactId>
<version>4.1.1version>
dependency>
resources/seata.confclient {
application.id = demo-order-curr
transaction.service.group = default_tx_group
}
resources/application.ymlseata:
application-id: demo-order-curr
tx-service-group: default_tx_group
#关闭数据源自动代理,交给sharding-jdbc
enable-auto-data-source-proxy: false
registry:
type: nacos
nacos:
application: seata-server
server-addr: nacos.localhost.com:8848
group: SEATA_GROUP
config:
nacos:
server-addr: nacos.localhost.com:8848
namespace: seata-config
group: SEATA_GROUP
data-id: seataServer.properties
service:
vgroup-mapping:
default_tx_group: default
//注意:@GlobalTransactional 和 @ShardingTransactionType 不能同时出现,此处不能使用 @GlobalTransactional
//@GlobalTransactional(name = "generateOrder",rollbackFor = Exception.class)
//全局事务交给@SeataATShardingTransactionManager管理
@ShardingTransactionType(TransactionType.BASE)
@Transactional
public CommonResult generateOrder(OrderParam orderParam, Long memberId)



public interface RocketMQLocalTransactionListener {
/**
发送prepare消息成功此方法被回调,该方法用于执行本地事务
@param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
@param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
@return 返回事务状态,COMMIT :提交 ROLLBACK :回滚 UNKNOW :回调
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg);
/**
@param msg 通过获取transactionId来判断这条消息的本地事务执行状态
@return 返回事务状态,COMMIT :提交 ROLLBACK :回滚 UNKNOW :回调
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
调用
/*实际进行真实库存的扣减*/
// todo 分布式事务
// PO :可以使用MQ进行异步扣减
// 使用事务消息机制发送扣减库存消息
reduceStockMsgSender.sendReduceStockMsg(orderId,payType,orderDetail)
定义扩建库存生产者
@ExtRocketMQTemplateConfiguration
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
发送
@Component
public class ReduceStockMsgSender {
@Autowired
private ExtRocketMQTemplate extRocketMQTemplate;
/**
* 使用事务消息机制发送扣减库存消息
* @param orderId
* @param payType
* @param orderDetail
* @return
*/
public boolean sendReduceStockMsg(Long orderId, Integer payType, OmsOrderDetail orderDetail){
List<StockChanges> stockChangesList = new ArrayList<>();
for(OmsOrderItem omsOrderItem : orderDetail.getOrderItemList()){
stockChangesList.add(new StockChanges(omsOrderItem.getProductSkuId(),omsOrderItem.getProductQuantity()));
}
String destination = "reduce-stock";
StockChangeEvent stockChangeEvent = new StockChangeEvent();
stockChangeEvent.setPayType(payType);
stockChangeEvent.setOrderId(orderId);
stockChangeEvent.setStockChangesList(stockChangesList);
//TODO 全局事务id 可以用于幂等校验
String transactionId = UUID.randomUUID().toString();
stockChangeEvent.setTransactionId(transactionId);
Message<StockChangeEvent> message = MessageBuilder.withPayload(stockChangeEvent)
.setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
.setHeader("orderId",orderId)
.setHeader("payType",payType)
.build();
//destination:目的地(主题),这里发送给reduce-stock这个topic
//message:发送给消费者的消息体,需要使用MessageBuilder.withPayload() 来构建消息
//arg:参数
TransactionSendResult sendResult = extRocketMQTemplate.sendMessageInTransaction(destination,message,orderId);
return SendStatus.SEND_OK == sendResult.getSendStatus();
}
}
监听事务
@Slf4j
@RocketMQTransactionListener(rocketMQTemplateBeanName="extRocketMQTemplate") //一个事物监听器对应一个事物流程
public class ReduceStockMsgListener implements RocketMQLocalTransactionListener {
@Autowired
private OmsOrderMapper omsOrderMapper;
@Autowired
private OmsPortalOrderService portalOrderService;
/**
* 事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
try {
//解析message
Long orderId = Long.parseLong(String.valueOf(arg));
String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
Integer payType = Integer.valueOf((String)message.getHeaders().get("payType"));
//修改订单状态
portalOrderService.updateOrderStatus(orderId,payType,transactionId);
//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务状态回查
* @param message
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String transactionId = (String) message.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
int existTx = omsOrderMapper.isExistTx(transactionId);
if (existTx > 0) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
消息体
@Data
public class StockChangeEvent {
/**
* 事务id
*/
private String transactionId;
private List<StockChanges> stockChangesList;
private Long orderId;
/**
* 支付方式:0->未支付;1->支付宝;2->微信
*/
private Integer payType;
}
表SQL
CREATE TABLE IF NOT EXISTS `local_transaction_log`
(
`tx_no` VARCHAR(128) NOT NULL COMMENT '分布式事务ID',
`created` DATETIME(6) NOT NULL COMMENT 'create datetime',
UNIQUE KEY `local_transaction_log_key` (`tx_no`)
) ENGINE = INNODB
DEFAULT CHARSET = utf8mb4 COMMENT ='分布式事务控制表';
消费者
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}",topic = "${rocketmq.consumer.topic}")
public class ReduceStockMsgConsumer implements RocketMQListener<StockChangeEvent> {
@Autowired
private StockManageService stockManageService;
/**
* 接收消息
*/
@Override
public void onMessage(StockChangeEvent stockChangeEvent) {
log.info("开始消费消息:{}",stockChangeEvent);
stockManageService.reduceStock(stockChangeEvent);
}
}
消费者幂等性实现
@Override
@Transactional
public void reduceStock(StockChangeEvent stockChangeEvent) {
//幂等性校验
if(skuStockMapper.isExistTx(stockChangeEvent.getTransactionId())>0){
return ;
}
List<StockChanges> stockChangesList = stockChangeEvent.getStockChangesList();
//扣减冻结库存
skuStockMapper.updateSkuStock(stockChangesList);
//添加事务记录,用于幂等
skuStockMapper.addTx(stockChangeEvent.getTransactionId());
}
校验、插入
@Select("select count(1) from local_transaction_log where tx_no = #{txNo}")
int isExistTx(String txNo);
@Insert("insert into local_transaction_log values(#{txNo},now());")
int addTx(String txNo);
配置
rocketmq:
name-server: rocketmq.localhost.com:9876
consumer:
group: stock_consumer_group
topic: reduce-stock