• Seata流程源码梳理下篇-TC


    我们上篇简单梳理了下TM、RM的一些流程(离现在过得挺久的了,这篇我们这篇来梳理下TC的内容。

    TC (Transaction Coordinator) - 事务协调者

    维护全局和分支事务的状态,驱动全局事务提交或回滚

    TM (Transaction Manager) - 事务管理器

    定义全局事务的范围:开始全局事务、提交或回滚全局事务。

    RM (Resource Manager) - 资源管理器

    管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

    一、TC的定义

    TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚

    ​ 这个是官网给出的定义,就是说其实全局事务的协调。具体来说例如:当某个TM(事务管理器)需要开启事务的时候,就需要一个协调者来驱动全局事务或提交,或回滚,这个概念我们开始的时候很容易与TM混淆。我们看到这两个一个是开始,一个是驱动,具体通过demo来说。

    1、一些基本逻辑信息

    用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

    • 仓储服务(Storage):对给定的商品扣除仓储数量。
    • 订单服务(Order):根据采购需求创建订单。
    • 帐户服务(Account):从用户帐户中扣除余额。
    • 业务服务(Business):发起整个业务逻辑。

    在这里插入图片描述
    在这里插入图片描述

    这个官网给出的例子,就是Business服务发起本次业务逻辑,也就是通过@GlobalTransactional来开始全局事务,然后后面的也就是TM的一些逻辑处理,这个我们上篇有说过,就是去调TC表示自己要开始全局事务,然后TC就会生成全局事务的XID,之后其调其他的分支事务Stock、Order的话就带上这个全局事务XID。然后这里TC就是一个协调者的角色,其他的TM、RM都想TC注册。例如如果后面如果@GlobalTransactional方法正常执行的话,就事务提交,其就向TC发送消息,然后TC去协调分支事务的提交。

    ​ 下面我们就来具体看下在TC中这些注册、提交、回滚等的操作。

    二、TC的一些逻辑处理

    1、前置内容

    ​ 首先在TC这边,关于对应请求的处理,入口都是RemotingProcessor接口的具体实现

    在这里插入图片描述

    ​ 其的子类:

    在这里插入图片描述

    2、RM通道注册

    ​ RM(资源管理器)注册。我们上面有介绍过,Seata会代理服务关于数据库的操作。然后其数据源初始化的时候,就会向TC注册。

    在这里插入图片描述

    但我们需要注意,这个目前还只是channel的通道连接,不是具体的RM资源注册,具体的RM事务资源注册是在全局事务开始然后操作数据库的时候。

    ​ 对应到TC这边就是RegRmProcessor在处理,这个主要是保存Channel通道,与Netty相关的内容,目前还没具体研究,但影响不大。先不深入这个。

    在这里插入图片描述

    3、TM通道注册

    在这里插入图片描述

    ​ 这个与前面的RM是类似的。

    4、ServerOnRequestProcessor

    ​ 然后这个是一个核心的处理类,其用来处理各种类型的信息。

    在这里插入图片描述

    ​ 我们可以看到,像分支注册、全局事务的开始、回滚都是这个Process处理的。

    public class ServerOnRequestProcessor implements RemotingProcessor {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServerOnRequestProcessor.class);
    
        private RemotingServer remotingServer;
    
        private TransactionMessageHandler transactionMessageHandler;
    
        public ServerOnRequestProcessor(RemotingServer remotingServer, TransactionMessageHandler transactionMessageHandler) {
            this.remotingServer = remotingServer;
            this.transactionMessageHandler = transactionMessageHandler;
        }
    
        @Override
        public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
            if (ChannelManager.isRegistered(ctx.channel())) {
                onRequestMessage(ctx, rpcMessage);
            } else {
               ........
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));
                }
            }
        }
    
        private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
            Object message = rpcMessage.getBody();
            RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
           	...........
            if (message instanceof MergedWarpMessage) {
                AbstractResultMessage[] results = new AbstractResultMessage[((MergedWarpMessage) message).msgs.size()];
                for (int i = 0; i < results.length; i++) {
                    final AbstractMessage subMessage = ((MergedWarpMessage) message).msgs.get(i);
                    results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);
                }
                MergeResultMessage resultMessage = new MergeResultMessage();
                resultMessage.setMsgs(results);
                remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), resultMessage);
            } else {
                // the single send request message
                final AbstractMessage msg = (AbstractMessage) message;
                AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
                remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    ​ 然后处理的核心就是DefaultCoordinator,上面的AbstractRMHandler是RM的处理

    在这里插入图片描述

    5、DefaultCoordinator

    ​ 这里的主要用来处理TC的逻辑就是TCInboundHandler接口:

    在这里插入图片描述

    ​ 通过这些Request我们也能找到其主要是用来处理全局事务的开启、分支事务的注册、全局事务的提交、回滚、状态这些。下面我们就来主要看下看下GlobalBeginRequestBranchRegisterRequestGlobalCommitRequest的处理。

    1、GlobalBeginRequest的处理

    在这里插入图片描述

    在这里插入图片描述

    ​ 通过这里我们可以找到,在GlobalBegin开启全局事务的主要处理就是生成全局事务Xid返回。

    在这里插入图片描述

    public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName,
                                                        int timeout) {
          GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout);
          return session;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
        public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout) {
            this.transactionId = UUIDGenerator.generateUUID();
            this.status = GlobalStatus.Begin;
    
            this.applicationId = applicationId;
            this.transactionServiceGroup = transactionServiceGroup;
            this.transactionName = transactionName;
            this.timeout = timeout;
            this.xid = XID.generateXID(transactionId);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这里是全局事务的session,然后再通过session.begin来进行对应处理,例如将全局事务session保存到数据库:

    在这里插入图片描述

    在这里插入图片描述

    ​ 我们可以看到这里有多种处理方式、例如保存到文件、redis这些。目前我们是保存到数据库中,处理的就是DataBaseSessionManager

    public class DataBaseSessionManager extends AbstractSessionManager
        implements Initialize {
    		............
        @Override
        public void addGlobalSession(GlobalSession session) throws TransactionException {
            if (StringUtils.isBlank(taskName)) {
                boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
                if (!ret) {
                    throw new StoreException("addGlobalSession failed.");
                }
            } else {
                boolean ret = transactionStoreManager.writeSession(LogOperation.GLOBAL_UPDATE, session);
                if (!ret) {
                    throw new StoreException("addGlobalSession failed.");
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    ​ 这里之后就是DataBaseTransactionStoreManager处理入库逻辑

    在这里插入图片描述

    在这里插入图片描述

    2、BranchRegisterRequest分支事务注册的处理

    ​ 再之后我们来看下分支事务的注册处理逻辑。分支事务的注册就是每个RM对应开始操作DB的时候(这个RM相关的内容上篇有提到过),向TC发起分支事务注册,代码逻辑就是放在seata代理的jdbc的ConnectionProxy

    在这里插入图片描述

    在这里插入图片描述

    然后TC这边就会处理这个请求,可以看到这里就是生成并返回分支事务ID

    在这里插入图片描述

    在这里插入图片描述

    同时会将当前分支事务保存到数据库中

    在这里插入图片描述

    3、GlobalCommitRequest全局事务提交的处理

    ​ 我们上篇有提到,再TM发起全局事务后,如果各个分支都执行成功后,其会向TC发起全局事务的提交。

    对于我们官方给到的demo。我们看下关于事务相关的表的数据。

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述
    然后各个分支都执行完成后,由TM来提交全局事务

    在这里插入图片描述

    在这里插入图片描述

    ​ 在TM提交后,我们来看下TC的处理:

    在这里插入图片描述

    ![外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传](https://img-home.csdnimg.cn/images/20230724024159.png?origin_url=F%3A%5CMarkDown-File%5C%E7%9F%A5%E8%AF%86%E7%82%B9%E8%B5%84%E6%96%99%5CSeata%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-TC.assets%5C在这里插入图片描述
    &pos_id=img-hmn2nU2b-1695536657221)

    ​ 这里我们主要需要注意SessionHelper.forEach(globalSession.getSortedBranches(), branchSession对于每个branchSession的循环调用,也就是每个分支事务的session,在这里就会循环,然后调用每个分支进行分支事务的本地提交,也就是RM本地提交的处理。

    在这里插入图片描述

    然后RM的处理主要就是RMHandlerAT这个类在处理,因为我们当前是AT模式

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    然后ResourceManager就是DataSourceManager

    在这里插入图片描述

    ​ 这里主要是添加到异步队列中。

    在这里插入图片描述

    然后主要处理就是删除数据库的undo记录,就我们全局事务成功了,不需要使用undo来回滚本地事务了。

    在这里插入图片描述

  • 相关阅读:
    【python笔记】第五节 流程控制
    【Nginx】学习及相关题目整理
    【SpringCloud】API网关(Spring Cloud Gateway)
    linux下的文件重命名
    webapi开发框架实践
    获取随机id的api接口
    LinuxC/C++ 实现HTTP Request
    【YOLO系列】YOLOv1学习(PyTorch)原理加代码
    轻量云服务器租用好在哪
    JAVA毕业设计BS架构考研交流学习平台设计与实现计算机源码+lw文档+系统+调试部署+数据库
  • 原文地址:https://blog.csdn.net/qq_25179481/article/details/133239825