• Spring-Cloud-Alibaba-SEATA源码解析(二)(客户端)


    前言

            上一篇讲到了seata的全局事务模板方法,有8个主要步骤,下面一一分析。

    获取事务信息

    TransactionInfo txInfo = business.getTransactionInfo():

    1. public TransactionInfo getTransactionInfo() {
    2. //全局事务超时时间
    3. int timeout = globalTrxAnno.timeoutMills();
    4. if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
    5. timeout = defaultGlobalTransactionTimeout;
    6. }
    7. //创建事务信息,并填充属性(超时,传播行为,重试间隔/次数等)
    8. TransactionInfo transactionInfo = new TransactionInfo();
    9. transactionInfo.setTimeOut(timeout);
    10. transactionInfo.setName(name());
    11. transactionInfo.setPropagation(globalTrxAnno.propagation());
    12. transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
    13. transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
    14. //回滚规则的集合,来自globalTransactional注解
    15. Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
    16. for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
    17. rollbackRules.add(new RollbackRule(rbRule));
    18. }
    19. for (String rbRule : globalTrxAnno.rollbackForClassName()) {
    20. rollbackRules.add(new RollbackRule(rbRule));
    21. }
    22. for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
    23. rollbackRules.add(new NoRollbackRule(rbRule));
    24. }
    25. for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
    26. rollbackRules.add(new NoRollbackRule(rbRule));
    27. }
    28. transactionInfo.setRollbackRules(rollbackRules);
    29. return transactionInfo;
    30. }

    获取已存在的全局事务(如果存在的话,那么当前事务角色则为参与者):

    1. public static GlobalTransaction getCurrent() {
    2. //从线程变量获取全局事务id
    3. String xid = RootContext.getXID();
    4. if (xid == null) {
    5. //如果没有已存在的事务,返回null
    6. return null;
    7. }
    8. //如果存在事务,则另创建一个事务作为参与者
    9. return new DefaultGlobalTransaction(xid, GlobalStatus.Begin, GlobalTransactionRole.Participant);
    10. }

    处理事务传播行为:

    1. Propagation propagation = txInfo.getPropagation();
    2. SuspendedResourcesHolder suspendedResourcesHolder = null;
    3. try {
    4. switch (propagation) {
    5. case NOT_SUPPORTED:
    6. //NOT_SUPPORTED,如果已存在事务,则挂起(把全局事务id存在suspendedResourcesHolder中),然后执行目标方法
    7. if (existingTransaction(tx)) {
    8. suspendedResourcesHolder = tx.suspend();
    9. }
    10. // Execute without transaction and return.
    11. return business.execute();
    12. case REQUIRES_NEW:
    13. //REQUIRES_NEW,如果已存在事务,则挂起,然后创建一个新的事务
    14. if (existingTransaction(tx)) {
    15. suspendedResourcesHolder = tx.suspend();
    16. tx = GlobalTransactionContext.createNew();
    17. }
    18. // Continue and execute with new transaction
    19. break;
    20. case SUPPORTS:
    21. //SUPPORTS,没有事务则直接执行目标方法,有事务则继续往下执行
    22. if (notExistingTransaction(tx)) {
    23. return business.execute();
    24. }
    25. // Continue and execute with new transaction
    26. break;
    27. case REQUIRED:
    28. // If current transaction is existing, execute with current transaction,
    29. // else continue and execute with new transaction.
    30. break;
    31. case NEVER:
    32. //NEVER,不支持事务
    33. if (existingTransaction(tx)) {
    34. throw new TransactionException(
    35. String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
    36. , tx.getXid()));
    37. } else {
    38. // Execute without transaction and return.
    39. return business.execute();
    40. }
    41. case MANDATORY:
    42. //MANDATORY,必须存在事务
    43. if (notExistingTransaction(tx)) {
    44. throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
    45. }
    46. // Continue and execute with current transaction.
    47. break;
    48. default:
    49. throw new TransactionException("Not Supported Propagation:" + propagation);
    50. }

    如果不存在事务,则创建一个新的全局事务:

    1. if (tx == null) {
    2. //如果之前没有事务,则创建一个新的事务,角色为全局事务发起者
    3. tx = GlobalTransactionContext.createNew();
    4. }

    开始全局事务,像TC发起开始事务的请求:

    1. private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    2. try {
    3. //执行before钩子方法
    4. triggerBeforeBegin();
    5. tx.begin(txInfo.getTimeOut(), txInfo.getName());
    6. //执行after钩子方法
    7. triggerAfterBegin();
    8. } catch (TransactionException txe) {
    9. throw new TransactionalExecutor.ExecutionException(tx, txe,
    10. TransactionalExecutor.Code.BeginFailure);
    11. }
    12. }
    13. public void begin(int timeout, String name) throws TransactionException {
    14. if (role != GlobalTransactionRole.Launcher) {
    15. //如果不是全局事务发起者那么直接返回
    16. assertXIDNotNull();
    17. if (LOGGER.isDebugEnabled()) {
    18. LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
    19. }
    20. return;
    21. }
    22. assertXIDNull();
    23. String currentXid = RootContext.getXID();
    24. if (currentXid != null) {
    25. throw new IllegalStateException("Global transaction already exists," +
    26. " can't begin a new global transaction, currentXid = " + currentXid);
    27. }
    28. //向TC发起开始一个全局事务的请求,TC返回一个全局事务id xid
    29. xid = transactionManager.begin(null, null, name, timeout);
    30. //事务状态为Begin
    31. status = GlobalStatus.Begin;
    32. //xid绑定线程变量
    33. RootContext.bind(xid);
    34. if (LOGGER.isInfoEnabled()) {
    35. LOGGER.info("Begin new global transaction [{}]", xid);
    36. }
    37. }
    38. public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
    39. throws TransactionException {
    40. //构建一个开始全局事务的请求
    41. GlobalBeginRequest request = new GlobalBeginRequest();
    42. request.setTransactionName(name);
    43. request.setTimeout(timeout);
    44. //同步请求TC(基于netty)
    45. GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    46. if (response.getResultCode() == ResultCode.Failed) {
    47. throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    48. }
    49. return response.getXid();
    50. }

    执行业务逻辑(目标方法):

    rs = business.execute();

    回滚(需要的话):

    completeTransactionAfterThrowing(txInfo, tx, ex):

    1. private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException) throws TransactionalExecutor.ExecutionException {
    2. //确定需要回滚的异常
    3. if (txInfo != null && txInfo.rollbackOn(originalException)) {
    4. try {
    5. //回滚
    6. rollbackTransaction(tx, originalException);
    7. } catch (TransactionException txe) {
    8. // Failed to rollback
    9. throw new TransactionalExecutor.ExecutionException(tx, txe,
    10. TransactionalExecutor.Code.RollbackFailure, originalException);
    11. }
    12. } else {
    13. // not roll back on this exception, so commit
    14. commitTransaction(tx);
    15. }
    16. }
    17. private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
    18. //回滚前钩子方法
    19. triggerBeforeRollback();
    20. tx.rollback();
    21. //回滚后钩子方法
    22. triggerAfterRollback();
    23. //如果回滚成功,抛出一个ExecutionException,异常代码为RollbackDone,表示回滚成功,
    24. //GlobalTransactionalInterceptor会抛出原始异常
    25. throw new TransactionalExecutor.ExecutionException(tx, GlobalStatus.RollbackRetrying.equals(tx.getLocalStatus())
    26. ? TransactionalExecutor.Code.RollbackRetrying : TransactionalExecutor.Code.RollbackDone, originalException);
    27. }
    28. public void rollback() throws TransactionException {
    29. if (role == GlobalTransactionRole.Participant) {
    30. //如果是事务参与者,啥都不干
    31. // Participant has no responsibility of rollback
    32. if (LOGGER.isDebugEnabled()) {
    33. LOGGER.debug("Ignore Rollback(): just involved in global transaction [{}]", xid);
    34. }
    35. return;
    36. }
    37. assertXIDNotNull();
    38. int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
    39. try {
    40. while (retry > 0) {
    41. //默认最多重试5次,次数用完后抛出异常,外部catch后会继续重试
    42. try {
    43. //回滚,返回一个事务状态
    44. status = transactionManager.rollback(xid);
    45. break;
    46. } catch (Throwable ex) {
    47. LOGGER.error("Failed to report global rollback [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());
    48. retry--;
    49. if (retry == 0) {
    50. throw new TransactionException("Failed to report global rollback", ex);
    51. }
    52. }
    53. }
    54. } finally {
    55. if (xid.equals(RootContext.getXID())) {
    56. suspend();
    57. }
    58. }
    59. if (LOGGER.isInfoEnabled()) {
    60. LOGGER.info("[{}] rollback status: {}", xid, status);
    61. }
    62. }

    提交事务(需要的话):

    commitTransaction(tx)
    1. private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    2. try {
    3. //提交前钩子方法
    4. triggerBeforeCommit();
    5. tx.commit();
    6. //提交后钩子方法
    7. triggerAfterCommit();
    8. } catch (TransactionException txe) {
    9. // 4.1 Failed to commit
    10. throw new TransactionalExecutor.ExecutionException(tx, txe,
    11. TransactionalExecutor.Code.CommitFailure);
    12. }
    13. }

    提交的过程和回滚类似,这里就不看了。

            可以看到整个分布式事务是一个二阶段提交的过程,一阶段事务发起者向TC发起开始一个事务的请求,会生成一个全局事务id:xid,这个id贯穿全局事务,然后各个分支事务执行本地sql并提交或回滚;二阶段TM根据各个分支事务执行情况向TC发起全局提交或回滚的请求,TC通知各个分支事务全局提交或回滚。本地事务在执行sql后会直接提交,如果必要的话在第二阶段根据undo_log回滚,在这期间并不占用数据库本地锁,在整个分布式事务执行过程中,占据全局锁。在此期间,分支事务是如何运行的,全局锁是什么,回滚原理是什么,将在下篇分析。

    未完待续。。。

  • 相关阅读:
    Java新手小白入门篇 SpringBoot项目的构建
    ElasticSearch使用入门及拼音搜索介绍
    XXL-JOB任务有效期支持实现方案
    600w播放,80w涨粉,B站UP主恰饭B站粉丝竟刷屏感谢甲方!
    React中的事件处理
    【React】第十五部分 React-Router6
    【Qt】—— 对象模型的认识
    WebSocket 详细教程
    工业自动化编程与数字图像处理技术
    Edge官方鼠标手势
  • 原文地址:https://blog.csdn.net/w7sss/article/details/125502364