• Seata 源码篇之AT模式启动流程 - 下 - 04



    本系列文章:

    上一篇文章,我们看了Seata AT模式一阶段提交流程,本文我们来看看AT模式的二阶段流程和全局事务提交回滚逻辑的实现。


    全局事务提交

    当某个分支事务执行完本地业务SQL语句后,下一步就进入全局事务提交环节了,此处我们可以回顾TransactionalTemplate模版类的execute方法,如下所示:

        public Object execute(TransactionalExecutor business) throws Throwable {
            // 1. 获取当前全局事务的上下文信息
            TransactionInfo txInfo = business.getTransactionInfo();
            ...
            // 1.1 判断当前是否已经存在一个全局事务
            GlobalTransaction tx = GlobalTransactionContext.getCurrent();
    
            // 1.2 根据不同的全局事务传播行为进行处理
            Propagation propagation = txInfo.getPropagation();
            SuspendedResourcesHolder suspendedResourcesHolder = null;
            try {
                switch (propagation) {
                     ...
                }
    
                // 1.3 将当前全局锁配置设置到本地线程缓存中,然后返回先前的配置
                GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
    
                try {
                    // 2. 如果当前线程是全局事务的发起者,即TM,则给TC发送一个开启全局事务的请求,否则只是简单回调相关钩子方法
                    beginTransaction(txInfo, tx);
    
                    Object rs;
                    try {
                        // 3. 执行当前分支事务对应的本地事务
                        rs = business.execute();
                    } catch (Throwable ex) {
                        // 4. 分支事务执行发生异常,判断对应异常是否需要回滚,如果需要则回滚当前全局事务
                        completeTransactionAfterThrowing(txInfo, tx, ex);
                        throw ex;
                    }
    
                    // 5. 当前分支事务执行正常,由TM发送提交全局事务的请求
                    commitTransaction(tx, txInfo);
    
                    return rs;
                } finally {
                    // 6. 资源清理和恢复,同时触发钩子回调
                    resumeGlobalLockConfig(previousConfig);
                    triggerAfterCompletion();
                    cleanUp();
                }
            } finally {
                // 7. 如果存在被挂起的全局事务,则进行恢复
                if (suspendedResourcesHolder != null) {
                    tx.resume(suspendedResourcesHolder);
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    本节我们来看一下全局事务提交的commitTransaction方法实现:

        private void commitTransaction(GlobalTransaction tx, TransactionInfo txInfo)
                throws TransactionalExecutor.ExecutionException, TransactionException {
            // 1. 判断全局事务是否超时
            if (isTimeout(tx.getCreateTime(), txInfo)) {
                // business execution timeout
                Exception exx = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
                    String.format("client detected transaction timeout before commit, so change to rollback, xid = %s", tx.getXid()));
                rollbackTransaction(tx, exx);
                return;
            }
    
            try {
            // 2. 触发回调埋点,同时指向事务提交动作
                triggerBeforeCommit();
                tx.commit();
            // 3. 记录事务执行提交动作后的状态
                GlobalStatus afterCommitStatus = tx.getLocalStatus();
                TransactionalExecutor.Code code = TransactionalExecutor.Code.Unknown;
                switch (afterCommitStatus) {
                    case TimeoutRollbacking:
                        code = TransactionalExecutor.Code.Rollbacking;
                        break;
                    case TimeoutRollbacked:
                        code = TransactionalExecutor.Code.RollbackDone;
                        break;
                    case Finished:
                        code = TransactionalExecutor.Code.CommitFailure;
                        break;
                    default:
                }
            // 4. 如果事务提交失败或者超时,则抛出对应的异常信息
                Exception statusException = null;
                if (GlobalStatus.isTwoPhaseHeuristic(afterCommitStatus)) {
                    statusException = new TmTransactionException(TransactionExceptionCode.CommitHeuristic,
                        String.format("Global transaction[%s] not found, may be rollbacked.", tx.getXid()));
                } else if (GlobalStatus.isOnePhaseTimeout(afterCommitStatus)) {
                    statusException = new TmTransactionException(TransactionExceptionCode.TransactionTimeout,
                        String.format("Global transaction[%s] is timeout and will be rollback[TC].", tx.getXid()));
                }
                if (null != statusException) {
                    throw new TransactionalExecutor.ExecutionException(tx, statusException, code);
                }
            //  5. 正常提交后,触发对应的回调埋点
                triggerAfterCommit();
            } catch (TransactionException txe) {
                // 4.1 Failed to commit
                throw new TransactionalExecutor.ExecutionException(tx, txe,
                        TransactionalExecutor.Code.CommitFailure);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    DefaultGlobalTransaction 的 commit 方法负责完成全局事务的提交,当然全局事务提交由TM执行,如果当前分支事务角色是RM,这里直接返回,啥也不干:

        @Override
        public void commit() throws TransactionException {
            // 1. 如果是RM角色,直接返回
            if (role == GlobalTransactionRole.Participant) {
                return;
            }
            // 2. 如果是TM角色,则尝试执行全局事务提交,如果提交失败了,则进行多轮尝试
            int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;
            try {
                while (retry > 0) {
                    try {
                        retry--;
                        status = transactionManager.commit(xid);
                        break;
                    } catch (Throwable ex) {
                        if (retry == 0) {
                            throw new TransactionException("Failed to report global commit", ex);
                        }
                    }
                }
            } finally {
                // 3. TM全局事务提交成功后,执行XID解绑
                if (xid.equals(RootContext.getXID())) {
                    suspend(true);
                }
            }
            ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    DefaultTransactionManager 在Seata中的职责主要作为防腐层存在,负责屏蔽与TC的通信过程,下面我们看看其commit方法实现:

        @Override
        public GlobalStatus commit(String xid) throws TransactionException {
            GlobalCommitRequest globalCommit = new GlobalCommitRequest();
            globalCommit.setXid(xid);
            GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);
            return response.getGlobalStatus();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    当TM完成全局事务提交后,下面便是由TC通知其他分支事务执行全局提交了,也就是二阶段提交,二阶段提交的主要任务就是异步删除本地的undo日志


    分支事务全局提交

    各个分支事务的全局提交由TC异步回调通知完成,如下图所示:

    在这里插入图片描述

    具体代码存在于DefaultRMHandler的handle方法中:

        @Override
        public BranchCommitResponse handle(BranchCommitRequest request) {
            MDC.put(RootContext.MDC_KEY_XID, request.getXid());
            MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
            // 利用分支事务类型,获取对应的处理器,然后调用处理器的handle方法,处理分支事务提交请求
            return getRMHandler(request.getBranchType()).handle(request);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里获取的处理器类型为RMHandlerAT,所以最终会调用RMHandlerAT的handle方法处理分支事务提交请求:

    public abstract class AbstractRMHandler extends AbstractExceptionHandler
        implements RMInboundHandler, TransactionMessageHandler {
    
        @Override
        public BranchCommitResponse handle(BranchCommitRequest request) {
            BranchCommitResponse response = new BranchCommitResponse();
            exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
                @Override
                public void execute(BranchCommitRequest request, BranchCommitResponse response)
                    throws TransactionException {
                    // 真正执行分支事务提交的方法
                    doBranchCommit(request, response);
                }
            }, request, response);
            return response;
        }
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
        protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
            throws TransactionException {
            String xid = request.getXid();
            long branchId = request.getBranchId();
            String resourceId = request.getResourceId();
            String applicationData = request.getApplicationData();
            // 调用资源管理器的branchCommit方法,完成分支事务提交
            BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
                applicationData);
            response.setXid(xid);
            response.setBranchId(branchId);
            response.setBranchStatus(status);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    由于这里我们使用的是AT模式,所以最终会调用DataSourceManager的branchCommit方法完成分支事务的提交:

        @Override
        public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                         String applicationData) throws TransactionException {
            return asyncWorker.branchCommit(xid, branchId, resourceId);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    asyncWorker 通过名字可以猜到,此处采用的是异步提交方式,所以下面我们来看看asyncWorker是如何进行异步提交的:

        public BranchStatus branchCommit(String xid, long branchId, String resourceId) {
            // 准备两阶段提交上下文信息,然后加入分支事务提交队列中
            Phase2Context context = new Phase2Context(xid, branchId, resourceId);
            addToCommitQueue(context);
            return BranchStatus.PhaseTwo_Committed;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    不难看出,此处采用的是典型的生产者-消费者模式,下面看看具体是谁会从提交队列中取出任务执行:

        private void addToCommitQueue(Phase2Context context) {
            // 直接将任务加入队列中去,然后返回
            if (commitQueue.offer(context)) {
                return;
            }
            // 如果队列满了,则立即让线程池处理一波任务,然后再尝试将当前任务加入队列
            // 此处的thenRun方法是异步执行的
            CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor)
                    .thenRun(() -> addToCommitQueue(context));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    队列和定时任务线程池初始化过程可以在AsyncWorker类的构造函数中寻见:

        public AsyncWorker(DataSourceManager dataSourceManager) {
            this.dataSourceManager = dataSourceManager;
            // 默认队列大小为10000 
            commitQueue = new LinkedBlockingQueue<>(ASYNC_COMMIT_BUFFER_LIMIT);
            // 启动定时任务线程池,每秒执行一次任务
            ThreadFactory threadFactory = new NamedThreadFactory("AsyncWorker", 2, true);
            scheduledExecutor = new ScheduledThreadPoolExecutor(2, threadFactory);
            scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    下面可以来看看具体执行的是什么样的任务:

        void doBranchCommitSafely() {
            doBranchCommit();
        }
     
        private void doBranchCommit() {
            if (commitQueue.isEmpty()) {
                return;
            }
    
            // 1. 取出队列中所有任务
            List<Phase2Context> allContexts = new LinkedList<>();
            commitQueue.drainTo(allContexts);
            // 2. 按照资源ID对分支事务提交任务进行分组
            Map<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);
            // 3. 依次处理每个任务
            groupedContexts.forEach(this::dealWithGroupedContexts);
        }  
    
        private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {
            ...
            // 1. 通过资源ID获取对应的数据源代理器
            DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
            // 2. 删除undo日志
            Connection conn = null;
            conn = dataSourceProxy.getPlainConnection();
            ...
            UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());
            List<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);
            for (List<Phase2Context> partition : splitByLimit) {
                deleteUndoLog(conn, undoLogManager, partition);
            }
            ...
        } 
    
        private void deleteUndoLog(final Connection conn, UndoLogManager undoLogManager, List<Phase2Context> contexts) {
            Set<String> xids = new LinkedHashSet<>(contexts.size());
            Set<Long> branchIds = new LinkedHashSet<>(contexts.size());
            contexts.forEach(context -> {
                xids.add(context.xid);
                branchIds.add(context.branchId);
            });
           ...
           // 删除undo_log表中的undo_log日志
           // 这里提交的事务是为了确保批量删除undo_log日志这一过程的原子性 
           undoLogManager.batchDeleteUndoLog(xids, branchIds, conn);
           if (!conn.getAutoCommit()) {
                conn.commit();
           }
           ...
        } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    可以看到分支事务全局提交逻辑很简单,就是借助asyncWorker完成undo日志的异步删除。


    全局事务回滚

    全局事务回滚逻辑存在于TransactionalTemplate的completeTransactionAfterThrowing方法中:

        private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable originalException)
                throws TransactionalExecutor.ExecutionException, TransactionException {
            // 如果需要对当前异常执行回滚,则执行全局事务回滚操作,否则还是执行全局事务提交
            if (txInfo != null && txInfo.rollbackOn(originalException)) {
                rollbackTransaction(tx, originalException);
            } else {
                // not roll back on this exception, so commit
                commitTransaction(tx, txInfo);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
        private void rollbackTransaction(GlobalTransaction tx, Throwable originalException) throws TransactionException, TransactionalExecutor.ExecutionException {
            try {
                // 执行全局事务回滚逻辑和前后回调埋点
                triggerBeforeRollback();
                tx.rollback();
                triggerAfterRollback();
            }
            ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    DefaultGlobalTransaction 的 rollback 方法负责完成全局事务的回滚,当然全局事务回滚也由TM执行,如果当前分支事务角色是RM,这里直接返回,啥也不干:

       @Override
        public void rollback() throws TransactionException {
            // 1. 如果当前分支事务的角色是RM,则直接返回
            if (role == GlobalTransactionRole.Participant) {
                ...
                return;
            }
            ...
            // 2. 如果当前分支事务角色是TM,则通知TC负责发起全局事务回滚请求
            int retry = ROLLBACK_RETRY_COUNT <= 0 ? DEFAULT_TM_ROLLBACK_RETRY_COUNT : ROLLBACK_RETRY_COUNT;
            try {
                while (retry > 0) {
                    try {
                        retry--;
                        status = transactionManager.rollback(xid);
                        break;
                    } catch (Throwable ex) {
                        ...
                    }
                }
            } finally {
                if (xid.equals(RootContext.getXID())) {
                    suspend(true);
                }
            }
            ...
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    DefaultTransactionManager 在Seata中的职责主要作为防腐层存在,负责屏蔽与TC的通信过程,下面我们看看其rollback方法实现:

        @Override
        public GlobalStatus rollback(String xid) throws TransactionException {
            GlobalRollbackRequest globalRollback = new GlobalRollbackRequest();
            globalRollback.setXid(xid);
            GlobalRollbackResponse response = (GlobalRollbackResponse) syncCall(globalRollback);
            return response.getGlobalStatus();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    分支事务全局回滚

    各个分支事务的本地由TC异步回调通知完成,如下图所示:

    在这里插入图片描述

    具体代码存在于DefaultRMHandler的handle方法中:

        @Override
        public BranchRollbackResponse handle(BranchRollbackRequest request) {
            MDC.put(RootContext.MDC_KEY_XID, request.getXid());
            MDC.put(RootContext.MDC_KEY_BRANCH_ID, String.valueOf(request.getBranchId()));
             // 利用分支事务类型,获取对应的处理器,然后调用处理器的handle方法,处理分支事务提交请求
            return getRMHandler(request.getBranchType()).handle(request);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这里获取的处理器类型为RMHandlerAT,所以最终会调用RMHandlerAT的handle方法处理分支事务提交请求:

    public abstract class AbstractRMHandler extends AbstractExceptionHandler
        implements RMInboundHandler, TransactionMessageHandler {
    
        @Override
        public BranchRollbackResponse handle(BranchRollbackRequest request) {
            BranchRollbackResponse response = new BranchRollbackResponse();
            exceptionHandleTemplate(new AbstractCallback<BranchRollbackRequest, BranchRollbackResponse>() {
                @Override
                public void execute(BranchRollbackRequest request, BranchRollbackResponse response)
                    throws TransactionException {
                    // 执行全局回滚
                    doBranchRollback(request, response);
                }
            }, request, response);
            return response;
        }
    
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
        protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)
            throws TransactionException {
            String xid = request.getXid();
            long branchId = request.getBranchId();
            String resourceId = request.getResourceId();
            String applicationData = request.getApplicationData();
            // 调用资源管理器的branchRollback方法,完成分支事务回滚
            BranchStatus status = getResourceManager().branchRollback(request.getBranchType(), xid, branchId, resourceId,
                applicationData);
            response.setXid(xid);
            response.setBranchId(branchId);
            response.setBranchStatus(status);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    由于这里我们使用的是AT模式,所以最终会调用DataSourceManager的branchRollback方法完成分支事务的回滚:

        @Override
        public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                           String applicationData) throws TransactionException {
            DataSourceProxy dataSourceProxy = get(resourceId);
            ...
            // 利用undo日志完成回滚
            UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
            ...
            return BranchStatus.PhaseTwo_Rollbacked;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    关于如何利用undo日志完成回滚,这块内容将在本系列后面进行讲解,本文暂时不做深究。


    小结

    到目前为止,我们大体浏览了Seata AT模式整体实现流程。后面,我们将深入Server模块进行研究,以及Seata的RPC模块,感兴趣的童鞋可以持续关注。

  • 相关阅读:
    力扣-232.用栈实现队列
    spring cache (默认方式)
    Go基础语法:函数+
    Day43-Session(重点)、JSP原理
    《算法通关村——双指针妙用》
    拼多多季报图解:营收355亿同比增65% 研发投入达27亿
    Windows 11又双叒出现Bug,导致截图工具崩溃
    魔改xxl-job,彻底告别手动配置任务!
    Gen4Gen:多概念个性化图像生成的数据驱动革新
    【Kafka专题】Kafka收发消息核心参数详解
  • 原文地址:https://blog.csdn.net/m0_53157173/article/details/133583006