• Seata AT模式下的源码解析(二)


    6. 一阶段

    在一阶段的调用流程是

    请添加图片描述

    6.1 DataSource

    Seata最重要的一个功能就是对 DataSource 进行了代理,在用户执行插入 sql 时会在插入之间根据 sql 构建一个前置镜像出来,如果出现异常了,就可以通过 undolog 日志里面的镜像语句进行回滚;

    seata中代理对数据进行代理的方式以及调用联调大致如下,seata对 数据源、连接对象、预编译对象 都进行了代理,最后通过 ExecuteTemplate 对象来执行解析 sql创建镜像等操作

    在这里插入图片描述

    6.2 SeataAutoDataSourceProxyCreator

    步骤跟上面扫描 @GloableTransactional 一样对 DataSource 数据源进行代理,代理对象为 SeataDataSourceProxy 类型,根据代理模式创建不同的实现类:

    • DataSourceProxy:AT模式
    • DataSourceProxyXA:XA模式
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        // 只对 DataSource 进行代理
        if (!(bean instanceof DataSource)) {
            return bean;
        }
        if (!(bean instanceof SeataDataSourceProxy)) {
            //调用父类对 bean 进行代理
            Object enhancer = super.wrapIfNecessary(bean, beanName, cacheKey);
            //要么已经被代理了,要么是被排除了
            if (bean == enhancer) {
                return bean;
            }
            //否者构建代理对象
            DataSource origin = (DataSource) bean;
            SeataDataSourceProxy proxy = buildProxy(origin, dataSourceProxyMode);
            DataSourceProxyHolder.put(origin, proxy);
            return enhancer;
        }
        SeataDataSourceProxy proxy = (SeataDataSourceProxy) bean;
        DataSource origin = proxy.getTargetDataSource();
        Object originEnhancer = super.wrapIfNecessary(origin, beanName, cacheKey);
        if (origin == originEnhancer) {
            return origin;
        }
        DataSourceProxyHolder.put(origin, proxy);
        return originEnhancer;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    6.3 SeataAutoDataSourceProxyAdvice

    数据源通知类,通知类中没有做什么特别的事,就是判断了当前执行的方法在 DataSource 中是否也具有相同的方法,如果存在相同的方法,直接调用 代理类 就可以了

    public Object invoke(MethodInvocation invocation) throws Throwable {
        // check whether current context is expected
        if (!inExpectedContext()) {
            return invocation.proceed();
        }
        //获取需要执行的方法
        Method method = invocation.getMethod();
        String name = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        //获取到DataSource中是否具有当前方法,如果抛出异常那么直接执行本体方法
        Method declared;
        try {
            declared = DataSource.class.getDeclaredMethod(name, parameterTypes);
        } catch (NoSuchMethodException e) {
            return invocation.proceed();
        }
        //调用代理对象的方法
        DataSource origin = (DataSource) invocation.getThis();
        SeataDataSourceProxy proxy = DataSourceProxyHolder.get(origin);
        Object[] args = invocation.getArguments();
        return declared.invoke(proxy, args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    6.4 SeataDataSourceProxy

    public interface SeataDataSourceProxy extends DataSource {
    
        /**
         * 获取到代理的源对象
         */
        DataSource getTargetDataSource();
    
        /**
         * 获取当前分支事务采用的模式
         */
        BranchType getBranchType();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    6.5 DataSourceProxy

    AT模式的数据源代理对象,通过 DataSource 的使用方式,一般是,先通过 getConnection() 获取到一个数据库的连接,然后通过 Connection 对象再创建一个 Statement 的对象来操作 sql 语句,所以这里我们可以先看 getConnection() 方法是如何实现的

    getConnection()

    直接创建了一个 ConnectionProxy 连接代理对象

    public ConnectionProxy getConnection() throws SQLException {
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection);
    }
    
    • 1
    • 2
    • 3
    • 4

    6.6 ConnectionProxy

    在这里插入图片描述

    createStatement()

    又对 Statement 对象创建了一个代理对象

    public Statement createStatement() throws SQLException {
        Statement targetStatement = getTargetConnection().createStatement();
        return new StatementProxy(this, targetStatement);
    }
    
    • 1
    • 2
    • 3
    • 4
    commit()
    1)写隔离实现机制

    提交事务时,会先向 TC 注册一个全局锁,表名+行记录 构成的锁;Seata 中对于写隔离的实现就是采用全局锁实现,写的过程:

    • 一阶段本地事务提交前,申请全局锁,拿不到全局锁不能提交
    • 超出了限制将放弃拿锁,回滚本地事务,释放本地锁

    例如:t1和t2 两个事务,t1将 1000 修改为 900,t1先拿到全局锁,提交本地事务释放了本地锁,然后t2拿到本地锁将900修改为800,但是t1全局锁还没有释放,t2拿不到全局锁就根据策略进行重试,这时t1收到了 TC 的回滚请求,t1开始根据undolog进行回滚,发现t2还持有本地锁,就会一直进行重试回滚,t2持有本地锁重试次数超过了限制,放弃提交开始回滚数据释放问题锁,t1拿到本地锁开始执行回滚任务;因为整个过程 全局锁 在 t1结束前一直是被 t1 持有,所以不会发生脏写的问题。

    2)读隔离实现机制

    在数据库本地事务隔离级别 读已提交(Read Committed) 或以上的基础上,Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted)

    如果应用在特定场景下,必需要求全局的 读已提交 ,目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。

    为什么要默认采用 读未提交

    猜想:如果使用默认数据库的 可重复读,会出现的问题就是 t1开启事务修改1000 - 100 = 900但是本地事务还没有提交,t2查询的数据出来还是1000 - 100 = 900,t1拿到锁开始提交事务并且释放本地锁以及全局锁,而t2也开始提交事务,这时候 t1和t2 都将数据改成了900,就导致了数据异常

    public void commit() throws SQLException {
        try {
            //采用重试机制,反复的获取锁
            lockRetryPolicy.execute(() -> {
                //执行提交任务
                doCommit();
                return null;
            });
        } catch (SQLException e) {
            //判断是否是自动提交 并且并没有被改变过
            if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
                rollback();
            }
            throw e;
        } catch (Exception e) {
            throw new SQLException(e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    doCommit()
    private void doCommit() throws SQLException {
        //查看是否存在全局事务的配置,如果存在全局事务需要注册当前分支
        if (context.inGlobalTransaction()) {
            //如果分支注册成功了,直接提交事务
            processGlobalTransactionCommit();
        } else if (context.isGlobalLockRequire()) {
            //如果不存在 XID 先去判断是否存在全局锁
            processLocalCommitWithGlobalLocks();
        } else {
            //提交事务
            targetConnection.commit();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    processGlobalTransactionCommit()
    private void processGlobalTransactionCommit() throws SQLException {
        try {
            //向 TC 设置一个由 表名:id 组成的全局锁,返回一个 branchId
            register();
        } catch (TransactionException e) {
            //如果锁已经存在,那么构建一个锁冲突的异常 LockConflictException
            recognizeLockKeyConflictException(e, context.buildLockKeys());
        }
        try {
            //刷新undolog日志,提交事务
            UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
            targetConnection.commit();
        } catch (Throwable ex) {
            LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
            //上报异常事务状态
            report(false);
            throw new SQLException(ex);
        }
        if (IS_REPORT_SUCCESS_ENABLE) {
            report(true);
        }
        context.reset();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    6.7 PreparedStatementProxy

    在这里插入图片描述

    PreparedStatementProxy 覆写的 execute() 方法并没有做什么事,直接通过 ExecuteTemplate 来执行的

    @Override
    public boolean execute() throws SQLException {
        //执行完之后,直接调用回调函数执行源sql
        return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6.8 ExecuteTemplate

    • SQLRecognizerFactory : sql识别工厂,引的哪个包就使用哪个识别器,这里使用的是 druid的识别器
      • DruidSQLRecognizerFactoryImpl
      • AntlrMySQLRecognizerFactory
    • SQLRecognizer:sql识别器将对应的sql语句解析成不同的类型,下面是实现类,数据库不同实现的类型也不同

    在这里插入图片描述

    public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                         StatementProxy<S> statementProxy,
                                                         StatementCallback<T, S> statementCallback,
                                                         Object... args) throws SQLException {
            //对其进行全局锁的检查
            if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
                //如果没有全局锁的标识并且也不是AT模式,直接执行源sql
                return statementCallback.execute(statementProxy.getTargetStatement(), args);
            }
            //创建sql识别器
            String dbType = statementProxy.getConnectionProxy().getDbType();
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                /**
                 *  根据sql识别器工厂创建对应的识别器来创建 SQLRecognizer 对象,目前提供了两个实现类
                 *  DruidSQLRecognizerFactoryImpl 和 AntlrMySQLRecognizerFactory 两个子类进行识别,两个子类通过两个包进行引用
                 */
                sqlRecognizers = SQLVisitorFactory.get(
                        statementProxy.getTargetSQL(),
                        dbType);
            }
            Executor<T> executor;
            /**
             * 根据对应的sql类型创建出对应的执行器
             * 默认:PlainExecutor
             * 插入:InsertExecutor (会根据数据库的类型进行创建,这里只是指定接口名称)
             * 修改:UpdateExecutor
             * 删除:DeleteExecutor
             * select for update:SelectForUpdateExecutor
             * insert_on_duplicate_update:MySQLInsertOrUpdateExecutor
             * 多个sql执行:MultiExecutor
             */
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                executor = new PlainExecutor<>(statementProxy, statementCallback);
            } else {
                if (sqlRecognizers.size() == 1) {
                    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                    switch (sqlRecognizer.getSQLType()) {
                        case INSERT:
                            executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                        new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                        new Object[]{statementProxy, statementCallback, sqlRecognizer});
                            break;
                        case UPDATE:
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case DELETE:
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case SELECT_FOR_UPDATE:
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case INSERT_ON_DUPLICATE_UPDATE:
                            switch (dbType) {
                                case JdbcConstants.MYSQL:
                                case JdbcConstants.MARIADB:
                                    executor =
                                        new MySQLInsertOrUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                                    break;
                                default:
                                    throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                            }
                            break;
                        default:
                            executor = new PlainExecutor<>(statementProxy, statementCallback);
                            break;
                    }
                } else {
                    executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
                }
            }
            T rs;
            try {
                /**
                 * 执行时默认时执行 PlainExecutor中的方法
                 * 其他类型执行 BaseTransactionalExecutor 中的方法
                 */
                rs = executor.execute(args);
            } catch (Throwable ex) {
                if (!(ex instanceof SQLException)) {
                    // Turn other exception into SQLException
                    ex = new SQLException(ex);
                }
                throw (SQLException) ex;
            }
            return rs;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    6.9 Executor

    根据sql识别器,识别出sql的类型是 insert、update还是delete等类型,根据不同的类型创建不同的sql执行器

    在这里插入图片描述

    ExecuteTemplate 中对sql进行解析之后创建出不同类型的 Eexcutor 实现类,以 Insert sql为例子,解析出的类型是 MysqlInsertOrUpdateExecutor 该类型并没有实现 execute(args) 方法,往上找调用的方法在 BaseTrasactionalExecutor 类中

    execute()

    从上下文中获取了 XID 进行绑定

    public T execute(Object... args) throws Throwable {
        //获取到 XID
        String xid = RootContext.getXID();
        if (xid != null) {
            //如果XID不为空的话绑定到当前获取的连接代理对象中,证明当前连接已经绑定了全局事务,如果后续连接来绑定新全局事务抛出异常
            statementProxy.getConnectionProxy().bind(xid);
        }
        //判断是否需要全局锁,这里根据方法是否打了 @GlobalLock注解,GlobalTransactionalInterceptor中会根据 @GlobalLock 和 @GlobalTransactional注解进行不同的处理,如果是事务这里应该是false
        statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
        //AbstractDMLBaseExecutor
        return doExecute(args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    doExecute()
    public T doExecute(Object... args) throws Throwable {
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        //根据是否自动提交事务来进行区分执行的代码,如果是自动提交,那么需要把自动提交关闭,改成手动提交
        if (connectionProxy.getAutoCommit()) {
            return executeAutoCommitTrue(args);
        } else {
            //不需要提交留给二阶段通知提交时开始提交
            return executeAutoCommitFalse(args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    executeAutoCommitFalse()

    先根据sql语句构建出前置镜像 TableRecords ,然后执行sql语句,根据前置镜像的主键id构建出执行sql之后的查询sql语句,根据修改之后记录构建后置镜像

    public class TableRecords implements java.io.Serializable {
        private static final long serialVersionUID = 4441667803166771721L;
    	//表的元数据信息
        private transient TableMeta tableMeta;
    	//表名称
        private String tableName;
    	//每一行数据对应的记录,里面又包括了每个字段的key和value
        private List<Row> rows = new ArrayList<Row>();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在后续提交时会根据构建的 tableName:{主键值1},{主键值2}…{主键值N} 记录锁对行加上锁,防止其他事务进行修改

    protected T executeAutoCommitFalse(Object[] args) throws Exception {
        if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
            throw new NotSupportYetException("multi pk only support mysql!");
        }
        //构建前置镜像
        TableRecords beforeImage = beforeImage();
        //执行sql语句
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        //获取到影响的行数
        int updateCount = statementProxy.getUpdateCount();
        if (updateCount > 0) {
            //构建后置镜像根据前置镜像的id查询出执行之后的sql记录
            TableRecords afterImage = afterImage(beforeImage);
            //构建undolog日志:构建锁,锁的结构是 表名:主键值1....{值键值n},并且将锁添加到 lockKeysBuffer 在执行提交事务时会根据前面的锁对表数据上锁
            prepareUndoLog(beforeImage, afterImage);
        }
        return result;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    executeAutoCommitTrue()

    这里需要注意一点的是 LockRetryPolicy 对象,这里用到的是 AbstractDMLBaseExecutor.LockRetryPolicy

    • AbstractDMLBaseExecutor.LockRetryPolicy:继承至 ConnectionProxy.LockRetryPolicy,覆写了 onException(),抛出了异常会进行回滚
    • ConnectionProxy.LockRetryPolicy :onException() 方法什么事都没有做
    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        try {
            //改变是否自动提交
            connectionProxy.changeAutoCommit();
            //锁重试的机制,如果一直拿不到锁就执行回滚的任务
            return new LockRetryPolicy(connectionProxy).execute(() -> {
                //构建前后镜像undolog日志
                T result = executeAutoCommitFalse(args);
                //执行事务的提交,看上面 ConnectionProxy.commit()
                connectionProxy.commit();
                return result;
            });
        } catch (Exception e) {
            // when exception occur in finally,this exception will lost, so just print it here
            LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
            //根据配置中 在锁冲突时的重试回滚机制,默认是true
            if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                connectionProxy.getTargetConnection().rollback();
            }
            throw e;
        } finally {
            //清除事务相关的属性
            connectionProxy.getContext().reset();
            //自动提交打开
            connectionProxy.setAutoCommit(true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    7. 网络请求

    http://t.csdn.cn/jZSs2

    8. 二阶段

    8.1 分支提交

    在二阶段 TC 向 TM 发起分支提交的请求时,通过 BranchCommitRequest 构建请求体,前面在网络请求里面提到了 seata 中是如何对请求体进行处理的,下面代码是 处理器 调用到 AbstractRMHandler 的代码;对应的处理器是 RmBranchCommitProcessor

    public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
            if (!(request instanceof AbstractTransactionRequestToRM)) {
                throw new IllegalArgumentException();
            }
            //设置当前类为handler处理器
            AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
            transactionRequest.setRMInboundMessageHandler(this);
            //这里通过调用,会调用到 DefaultRMHandler.handle(BranchCommitRequest) 的方法中,然后再通过对应的模式来获取处理器执行
            return transactionRequest.handle(context);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    AbstractCallback 一个抽象的回调函数,实现了自定义的 Callback

    • onSuccess:成功之后回调
    • onTransactionException:出现事务异常的回调
    • onException:出现执行异常的回调
    public BranchCommitResponse handle(BranchCommitRequest request) {
            //处理分支提交的请求
            BranchCommitResponse response = new BranchCommitResponse();
            //自定义的回调抽象类 AbstractCallback,其中实现了异常捕获的方法
            exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
                @Override
                public void execute(BranchCommitRequest request, BranchCommitResponse response)
                    throws TransactionException {
                    //执行分支提交
                    doBranchCommit(request, response);
                }
            }, request, response);
            return response;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    执行分支提交的代码比较简单,获取到对应模式的资源管理器,然后调用对应的方法,默认是AT模式,这里就获取的是 DataSourceManager

    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
            throws TransactionException {
            //获取到对应的RM管理器,AT模式的执行器是 io.seata.rm.datasource.DataSourceManager.branchCommit() 开启一个异步的worker线程
            BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
                applicationData);
        .........
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    可以看到下面源码中 DataSourceManager.brancheCommit() 方法就调用了一个异步工作线程,执行分支提交 ,里面将任务添加到一个队列中等待执行,执行的任务就是删除对应的的 undolog 逻辑比较简单,就不贴详细的源码了

    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                         String applicationData) throws TransactionException {
            return asyncWorker.branchCommit(xid, branchId, resourceId);
        }
    
    • 1
    • 2
    • 3
    • 4

    8.2 分支回滚

    对应处理器 RmBranchRollbackProcessor,前面的处理逻辑都一样,之后调用的分支回滚方法不一样,上面分支提交是 branchCommit(),而这里是调用 branchRollback()

    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                           String applicationData) throws TransactionException {
            //根据资源id,获取到对应的数据源代理
            DataSourceProxy dataSourceProxy = get(resourceId);
            if (dataSourceProxy == null) {
                throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
            }
            try {
                //根据db类型获取到undolog的管理器,这里的undo方法,调用的 io.seata.rm.datasource.undo.AbstractUndoLogManager.undo
                UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
            } catch (TransactionException te) {
                //对应对应的异常码,设置对应的分支回滚状态
                    new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
                if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
                    return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
                } else {
                    return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
                }
            }
            return BranchStatus.PhaseTwo_Rollbacked;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    io.seata.rm.datasource.undo.AbstractUndoLogManager#undo:

    public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
            Connection conn = null;
            ResultSet rs = null;
            PreparedStatement selectPST = null;
            boolean originalAutoCommit = true;
            for (; ; ) {
                try {
                    //获取到真实的连接对象
                    conn = dataSourceProxy.getPlainConnection();
                    // The entire undo process should run in a local transaction.
                    if (originalAutoCommit = conn.getAutoCommit()) {
                        //修改自动提交为手动提交
                        conn.setAutoCommit(false);
                    }
                    // 查询undo log日志出来
                    selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
                    //设置分支id
                    selectPST.setLong(1, branchId);
                    //xid
                    selectPST.setString(2, xid);
                    //执行查询语句
                    rs = selectPST.executeQuery();
                    boolean exists = false;
                    //遍历查询出来的数据
                    while (rs.next()) {
                        exists = true;
                        //如果服务端发送了多次回滚请求,这里只需要确保回滚了正确状态的日志
                        int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
                        if (!canUndo(state)) {
                            if (LOGGER.isInfoEnabled()) {
                                LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                            }
                            return;
                        }
                        //解析出context字段
                        String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                        //解析出context为map
                        Map<String, String> context = parseContext(contextString);
                        //回去到rollback_info信息
                        byte[] rollbackInfo = getRollbackInfo(rs);
                        //获取到解析器名称,按照解析器进行序列化,默认使用jackson
                        String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                        UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                            : UndoLogParserFactory.getInstance(serializer);
                        //解析出回滚日志对象
                        BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
    
                        try {
                            // 设置当前线程采用的解析器的名称
                            setCurrentSerializer(parser.getName());
                            //获取到undolog 前后镜像数据信息
                            List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                            if (sqlUndoLogs.size() > 1) {
                                Collections.reverse(sqlUndoLogs);
                            }
                            //遍历sql
                            for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                                //获取到表的元数据信息
                                TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                                    conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                                sqlUndoLog.setTableMeta(tableMeta);
                                //根据数据库类型以及sql类型获取到对应的 undo执行器
                                AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                                    dataSourceProxy.getDbType(), sqlUndoLog);
                                undoExecutor.executeOn(conn);
                            }
                        } finally {
                            // 移除当前线程的序列化器
                            removeCurrentSerializer();
                        }
                    }
                    //如果存在undolog日志,需要删除undolog日志,跟业务代码一起提交事务,需要保证 undolog和日志回滚sql的一致性
                    if (exists) {
                        //删除undolog日志
                        deleteUndoLog(xid, branchId, conn);
                        //执行事务提交
                        conn.commit();
                    } else {
                        //如果不存在undolog日志,说明视图提交出现了异常导致undolog日志没有被存储上,这里插入了一个 GlobalFinished 状态的日志防止视图被正确的提交了
                        insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                        //执行事务提交
                        conn.commit();
                    }
                    return;
                } catch (SQLIntegrityConstraintViolationException e) {
                    
                } catch (Throwable e) {
                    //抛出了异常就执行undolog日志回滚
                } finally {
                    //关闭各个流和管道
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93

    方法中真正执行日志回滚的是 AbstractUndoExecutor 类型,这是一个抽象类,通过工厂方法进行创建,根据数据库类型以及 sql的类型 获取到对应的执行器

    在这里插入图片描述

    io.seata.rm.datasource.undo.AbstractUndoExecutor#executeOn:

    executeOn 作为一个抽象类的公关方法,其中一些比较有特点的函数方法都交给子类实现,例如:buildUndoSQL(构建回滚的sql,如果是插入构建的回滚sql就是删除)、getUndoRows(获取到后置镜像的数据信息)

    public void executeOn(Connection conn) throws SQLException {
            /**
             * 是否开启镜像数据的验证,如果开启了需要对后置镜像和当前数据进行数据的对比,如果不相同说明被其他事务改变需要根据对应的策略进行处理
             * 对比策略:
             *      前后镜像相比较是否一样,一样的就不需要执行后续了
             *      后镜与当前数据进行比较是否一样
             *          如果一样直接后续回滚
             *          如果不一样再判断前镜与当前数据是否一样,如果不一样说明出现了脏数据,如果一样就不需要执行后续了
             */
            if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
                return;
            }
            PreparedStatement undoPST = null;
            try {
                //buildUndoSQL() 抽象方法用于子类进行实现,sql不同子类实现方法也不同;例如:insert,构建回滚函数就是delete函数删除对应的语句
                String undoSQL = buildUndoSQL();
                //根据前置镜像的sql构建出sql语句
                undoPST = conn.prepareStatement(undoSQL);
                //根据后置镜像的记录获取到对应需要的数据
                TableRecords undoRows = getUndoRows();
                for (Row undoRow : undoRows.getRows()) {
                    ArrayList<Field> undoValues = new ArrayList<>();
                    //获取到记录的主键值
                    List<Field> pkValueList = getOrderedPkList(undoRows, undoRow, getDbType(conn));
                    for (Field field : undoRow.getFields()) {
                        if (field.getKeyType() != KeyType.PRIMARY_KEY) {
                            undoValues.add(field);
                        }
                    }
                    //设置value
                    undoPrepare(undoPST, undoValues, pkValueList);
                    //执行sql语句
                    undoPST.executeUpdate();
                }
    
            } 
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    8.3 undolog删除

    undolog日志的删除就比较简单了,调用处理 io.seata.rm.RMHandlerAT#handle 方法,通过undolog管理器删除执行,由 TC 指定删除对应的期限的 undolog日志,默认是删除过去 7 天的 3000条数据

    io.seata.rm.RMHandlerAT#handle

    public void handle(UndoLogDeleteRequest request) {
            String resourceId = request.getResourceId();
            DataSourceManager dataSourceManager = (DataSourceManager)getResourceManager();
            DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);
            boolean hasUndoLogTable = undoLogTableExistRecord.computeIfAbsent(resourceId, id -> checkUndoLogTableExist(dataSourceProxy));
            //根据存储天数进行数据的删除
            Date division = getLogCreated(request.getSaveDays());
            //获取到undolog日志管理器
            UndoLogManager manager = getUndoLogManager(dataSourceProxy);
            try (Connection conn = getConnection(dataSourceProxy)) {
                if (conn == null) {
                    LOGGER.warn("Failed to get connection to delete expired undo_log for {}", resourceId);
                    return;
                }
                int deleteRows;
                do {
                    //根据日期进行删除,默认 7 天之内的 3000条数据
                    deleteRows = deleteUndoLog(manager, conn, division);
                } while (deleteRows == LIMIT_ROWS);
            } catch (Exception e) {
                // should never happen, deleteUndoLog method had catch all Exception
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
  • 相关阅读:
    Express项目
    Mysql:设置主键自动增长起始值
    大屏UI设计-看这一篇就够了
    Android:Binder思考笔记
    三十八、Fluent融化凝固模型参数设置依据
    MySQL笔记(九):存储引擎
    【用户画像】在ClickHouse中将宽表转换为bitmap表(源码实现)、用户分群架构设计、SpringBoot概述及使用
    spring的一个properties配置文件,引用另外一个properties配置的变量
    【Kubernetes 系列】一文带你吃透 K8S 应用pod结点
    yolo系列之yolov5(3)
  • 原文地址:https://blog.csdn.net/weixin_43915643/article/details/127964070