上一篇讲到了seata的全局事务模板方法,有8个主要步骤,下面一一分析。
TransactionInfo txInfo = business.getTransactionInfo():
- public TransactionInfo getTransactionInfo() {
- //全局事务超时时间
- int timeout = globalTrxAnno.timeoutMills();
- if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
- timeout = defaultGlobalTransactionTimeout;
- }
- //创建事务信息,并填充属性(超时,传播行为,重试间隔/次数等)
- TransactionInfo transactionInfo = new TransactionInfo();
- transactionInfo.setTimeOut(timeout);
- transactionInfo.setName(name());
- transactionInfo.setPropagation(globalTrxAnno.propagation());
- transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
- transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
- //回滚规则的集合,来自globalTransactional注解
- Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
- for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
- rollbackRules.add(new RollbackRule(rbRule));
- }
- for (String rbRule : globalTrxAnno.rollbackForClassName()) {
- rollbackRules.add(new RollbackRule(rbRule));
- }
- for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
- rollbackRules.add(new NoRollbackRule(rbRule));
- }
- for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
- rollbackRules.add(new NoRollbackRule(rbRule));
- }
- transactionInfo.setRollbackRules(rollbackRules);
- return transactionInfo;
- }
获取已存在的全局事务(如果存在的话,那么当前事务角色则为参与者):
- public static GlobalTransaction getCurrent() {
- //从线程变量获取全局事务id
- String xid = RootContext.getXID();
- if (xid == null) {
- //如果没有已存在的事务,返回null
- return null;
- }
- //如果存在事务,则另创建一个事务作为参与者
- return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
- }
处理事务传播行为:
- Propagation propagation = txInfo.getPropagation();
- SuspendedResourcesHolder suspendedResourcesHolder = null;
- try {
- switch (propagation) {
- case NOT_SUPPORTED:
- //NOT_SUPPORTED,如果已存在事务,则挂起(把全局事务id存在suspendedResourcesHolder中),然后执行目标方法
- if (existingTransaction(tx)) {
- suspendedResourcesHolder = tx.suspend();
- }
- // Execute without transaction and return.
- return business.execute();
- case REQUIRES_NEW:
- //REQUIRES_NEW,如果已存在事务,则挂起,然后创建一个新的事务
- if (existingTransaction(tx)) {
- suspendedResourcesHolder = tx.suspend();
- tx = GlobalTransactionContext.createNew();
- }
- // Continue and execute with new transaction
- break;
- case SUPPORTS:
- //SUPPORTS,没有事务则直接执行目标方法,有事务则继续往下执行
- if (notExistingTransaction(tx)) {
- return business.execute();
- }
- // Continue and execute with new transaction
- break;
- case REQUIRED:
- // If current transaction is existing, execute with current transaction,
- // else continue and execute with new transaction.
- break;
- case NEVER:
- //NEVER,不支持事务
- if (existingTransaction(tx)) {
- throw new TransactionException(
- String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
- , tx.getXid()));
- } else {
- // Execute without transaction and return.
- return business.execute();
- }
- case MANDATORY:
- //MANDATORY,必须存在事务
- if (notExistingTransaction(tx)) {
- throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
- }
- // Continue and execute with current transaction.
- break;
- default:
- throw new TransactionException("Not Supported Propagation:" + propagation);
- }
如果不存在事务,则创建一个新的全局事务:
- if (tx == null) {
- //如果之前没有事务,则创建一个新的事务,角色为全局事务发起者
- tx = GlobalTransactionContext.createNew();
- }
开始全局事务,像TC发起开始事务的请求:
- private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
- try {
- //执行before钩子方法
- triggerBeforeBegin();
- tx.begin(txInfo.getTimeOut(), txInfo.getName());
- //执行after钩子方法
- triggerAfterBegin();
- } catch (TransactionException txe) {
- throw new TransactionalExecutor.ExecutionException(tx, txe,
- TransactionalExecutor.Code.BeginFailure);
-
- }
- }
-
-
- public void begin(int timeout, String name) throws TransactionException {
- if (role != GlobalTransactionRole.Launcher) {
- //如果不是全局事务发起者那么直接返回
- assertXIDNotNull();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
- }
- return;
- }
- assertXIDNull();
- String currentXid = RootContext.getXID();
- if (currentXid != null) {
- throw new IllegalStateException("Global transaction already exists," +
- " can't begin a new global transaction, currentXid = " + currentXid);
- }
- //向TC发起开始一个全局事务的请求,TC返回一个全局事务id xid
- xid = transactionManager.begin(null, null, name, timeout);
- //事务状态为Begin
- status = GlobalStatus.Begin;
- //xid绑定线程变量
- RootContext.bind(xid);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Begin new global transaction [{}]", xid);
- }
- }
-
-
- public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
- throws TransactionException {
- //构建一个开始全局事务的请求
- GlobalBeginRequest request = new GlobalBeginRequest();
- request.setTransactionName(name);
- request.setTimeout(timeout);
- //同步请求TC(基于netty)
- GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
- if (response.getResultCode() == ResultCode.Failed) {
- throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
- }
- return response.getXid();
- }
执行业务逻辑(目标方法):
rs = business.execute();
回滚(需要的话):
completeTransactionAfterThrowing(txInfo, tx, ex):
- private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
- //确定需要回滚的异常
- if (txInfo != null && txInfo.rollbackOn(originalException)) {
- try {
- //回滚
- rollbackTransaction(tx, originalException);
- } catch (TransactionException txe) {
- // Failed to rollback
- throw new TransactionalExecutor.ExecutionException(tx, txe,
- TransactionalExecutor.Code.RollbackFailure, originalException);
- }
- } else {
- // not roll back on this exception, so commit
- commitTransaction(tx);
- }
- }
-
-
- private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
- //回滚前钩子方法
- triggerBeforeRollback();
- tx.rollback();
- //回滚后钩子方法
- triggerAfterRollback();
- //如果回滚成功,抛出一个ExecutionException,异常代码为RollbackDone,表示回滚成功,
- //GlobalTransactionalInterceptor会抛出原始异常
- throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
- ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
- }
-
-
- public void rollback() throws TransactionException {
- if (role == GlobalTransactionRole.Participant) {
- //如果是事务参与者,啥都不干
- // Participant has no responsibility of rollback
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
- }
- return;
- }
- assertXIDNotNull();
-
- int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
- try {
- while (retry > 0) {
- //默认最多重试5次,次数用完后抛出异常,外部catch后会继续重试
- try {
- //回滚,返回一个事务状态
- status = transactionManager.rollback(xid);
- break;
- } catch (Throwable ex) {
- LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
- retry--;
- if (retry == 0) {
- throw new TransactionException("Failed to report global rollback", ex);
- }
- }
- }
- } finally {
- if (xid.equals(RootContext.getXID())) {
- suspend();
- }
- }
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("[{}] rollback status: {}", xid, status);
- }
- }
提交事务(需要的话):
commitTransaction(tx)
- private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
- try {
- //提交前钩子方法
- triggerBeforeCommit();
- tx.commit();
- //提交后钩子方法
- triggerAfterCommit();
- } catch (TransactionException txe) {
- // 4.1 Failed to commit
- throw new TransactionalExecutor.ExecutionException(tx, txe,
- TransactionalExecutor.Code.CommitFailure);
- }
- }
提交的过程和回滚类似,这里就不看了。
可以看到整个分布式事务是一个二阶段提交的过程,一阶段事务发起者向TC发起开始一个事务的请求,会生成一个全局事务id:xid,这个id贯穿全局事务,然后各个分支事务执行本地sql并提交或回滚;二阶段TM根据各个分支事务执行情况向TC发起全局提交或回滚的请求,TC通知各个分支事务全局提交或回滚。本地事务在执行sql后会直接提交,如果必要的话在第二阶段根据undo_log回滚,在这期间并不占用数据库本地锁,在整个分布式事务执行过程中,占据全局锁。在此期间,分支事务是如何运行的,全局锁是什么,回滚原理是什么,将在下篇分析。
未完待续。。。