• @GlobalLock注解作用与原理解析


    GlobalLock的作用

    对于某条数据进行更新操作,如果全局事务正在进行,当某个本地事务需要更新该数据时,需要使用@GlobalLock确保其不会对全局事务正在操作的数据进行修改。防止的本地事务对全局事务的数据脏写。如果和select for update组合使用,还可以起到防止脏读的效果。

    全局锁

    首先我们知道,seata的AT模式是二段提交的,而且AT模式能够做到事务ACID四种特性中的A原子性和D持久性,默认情况下隔离级别也只能保证在读未提交
    那么为了保证原子性,在全局事务未提交之前,其中被修改的数据会被加上全局锁,保证不再会被其他全局事务修改。

    为什么要使用GlobalLock

    但是全局锁仅仅能防止全局事务对一个上锁的数据再次进行修改,在很多业务场景中我们是没有跨系统的rpc调用的,通常是不会加分布式事务的。

    例如有分布式事务执行完毕A系统的业务逻辑,正在继续执行B系统逻辑,并且A系统事务已经提交。此时A系统一个本地的spring事务去与分布式事务修改同一行数据,是可以正常修改的
    由于本地的spring事务并不受seata的全局锁控制容易导致脏写,即全局事务修改数据后,还未提交,数据又被本地事务改掉了。这很容易发生数据出错的问题,而且十分有可能导致全局事务回滚时发现 数据已经dirty(与uodoLog中的beforeImage不同)。那么就会回滚失败,进而导致全局锁无法释放,后续的操作无法进行下去。也是比较严重的问题。
    一种解决办法就是,针对所有相关操作都加上AT全局事务,但这显然是没必要的,因为全局事务意味者需要与seata-server进行通信,创建全局事务,注册分支事务,记录undoLog,判断锁冲突,注册锁。
    那么对于不需要跨系统,跨库的的业务来说,使用GlobalTransactional实在是有点浪费了
    那么更加轻量的GlobalLock就能够发挥作用了,其只需要判断本地的修改是否与全局锁冲突就够了

    工作原理

    加上@GlobalLock之后,会进入切面
    io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
    进而进入这个方法,处理全局锁

        Object handleGlobalLock(final MethodInvocation methodInvocation,
            final GlobalLock globalLockAnno) throws Throwable {
            return globalLockTemplate.execute(new GlobalLockExecutor() {
                @Override
                public Object execute() throws Throwable {
                    return methodInvocation.proceed();
                }
    
                @Override
                public GlobalLockConfig getGlobalLockConfig() {
                    GlobalLockConfig config = new GlobalLockConfig();
                    config.setLockRetryInternal(globalLockAnno.lockRetryInternal());
                    config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
                    return config;
                }
            });
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    进入execute方法

    public Object execute(GlobalLockExecutor executor) throws Throwable {
            boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
            if (!alreadyInGlobalLock) {
                RootContext.bindGlobalLockFlag();
            }
    
            // set my config to config holder so that it can be access in further execution
            // for example, LockRetryController can access it with config holder
            GlobalLockConfig myConfig = executor.getGlobalLockConfig();
            GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
    
            try {
                return executor.execute();
            } finally {
                // only unbind when this is the root caller.
                // otherwise, the outer caller would lose global lock flag
                if (!alreadyInGlobalLock) {
                    RootContext.unbindGlobalLockFlag();
                }
    
                // if previous config is not null, we need to set it back
                // so that the outer logic can still use their config
                if (previousConfig != null) {
                    GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
                } else {
                    GlobalLockConfigHolder.remove();
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    先判断当前是否已经在globalLock范围之内,如果已经在范围之内,那么把上层的配置取出来,用新的配置替换,并在方法执行完毕时候,释放锁,或者将配置替换成之前的上层配置
    如果开启全局锁,会在threadLocal put一个标记

        //just put something not null
    CONTEXT_HOLDER.put(KEY_GLOBAL_LOCK_FLAG, VALUE_GLOBAL_LOCK_FLAG);
    
    • 1
    • 2

    开始执行业务方法
    那么加上相关GlobalLock标记的和普通方法的区别在哪里?

    我们都知道,seata会对数据库连接做代理,在生成PreparedStatement时会进入

    io.seata.rm.datasource.AbstractConnectionProxy#prepareStatement(java.lang.String)

      @Override
        public PreparedStatement prepareStatement(String sql) throws SQLException {
            String dbType = getDbType();
            // support oracle 10.2+
            PreparedStatement targetPreparedStatement = null;
            if (BranchType.AT == RootContext.getBranchType()) {
                List<SQLRecognizer> sqlRecognizers = SQLVisitorFactory.get(sql, dbType);
                if (sqlRecognizers != null && sqlRecognizers.size() == 1) {
                    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                    if (sqlRecognizer != null && sqlRecognizer.getSQLType() == SQLType.INSERT) {
                        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dbType).getTableMeta(getTargetConnection(),
                                sqlRecognizer.getTableName(), getDataSourceProxy().getResourceId());
                        String[] pkNameArray = new String[tableMeta.getPrimaryKeyOnlyName().size()];
                        tableMeta.getPrimaryKeyOnlyName().toArray(pkNameArray);
                        targetPreparedStatement = getTargetConnection().prepareStatement(sql,pkNameArray);
                    }
                }
            }
            if (targetPreparedStatement == null) {
                targetPreparedStatement = getTargetConnection().prepareStatement(sql);
            }
            return new PreparedStatementProxy(this, targetPreparedStatement, sql);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    这里显然不会进入AT模式的逻辑,那么直接通过真正的数据库连接,生成PreparedStatement,再使用PreparedStatementProxy进行包装,代理增强
    在使用PreparedStatementProxy执行sql时,会进入seata定义的一些逻辑

     public boolean execute() throws SQLException {
            return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
        }
    
    • 1
    • 2
    • 3

    最终来到
    io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List, io.seata.rm.datasource.StatementProxy, io.seata.rm.datasource.exec.StatementCallback, java.lang.Object…)

       public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                         StatementProxy<S> statementProxy,
                                                         StatementCallback<T, S> statementCallback,
                                                         Object... args) throws SQLException {
            if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
                // Just work as original statement
                return statementCallback.execute(statementProxy.getTargetStatement(), args);
            }
    
            String dbType = statementProxy.getConnectionProxy().getDbType();
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                sqlRecognizers = SQLVisitorFactory.get(
                        statementProxy.getTargetSQL(),
                        dbType);
            }
            Executor<T> executor;
            if (CollectionUtils.isEmpty(sqlRecognizers)) {
                executor = new PlainExecutor<>(statementProxy, statementCallback);
            } else {
                if (sqlRecognizers.size() == 1) {
                    SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
                    switch (sqlRecognizer.getSQLType()) {
                        case INSERT:
                            executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                            break;
                        case UPDATE:
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case DELETE:
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case SELECT_FOR_UPDATE:
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        default:
                            executor = new PlainExecutor<>(statementProxy, statementCallback);
                            break;
                    }
                } else {
                    executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
                }
            }
            T rs;
            try {
                rs = executor.execute(args);
            } catch (Throwable ex) {
                if (!(ex instanceof SQLException)) {
                    // Turn other exception into SQLException
                    ex = new SQLException(ex);
                }
                throw (SQLException) ex;
            }
            return rs;
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    如果当前线程不需要锁并且不不在AT模式的分支事务下,直接使用原生的preparedStatement执行就好了
    这里四种操作,通过不同的接口去执行,接口又有多种不同的数据库类型实现

    插入分为不同的数据库类型,通过spi获取
    在这里插入图片描述

    seata提供了三种数据库的实现,
    update,delete,select三种没有多个实现类
    他们在执行时都会执行父类的方法
    io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitTrue

       protected T executeAutoCommitTrue(Object[] args) throws Throwable {
            ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
            try {
                connectionProxy.changeAutoCommit();
                return new LockRetryPolicy(connectionProxy).execute(() -> {
                    T result = executeAutoCommitFalse(args);
                    connectionProxy.commit();
                    return result;
                });
            } catch (Exception e) {
                // when exception occur in finally,this exception will lost, so just print it here
                LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e);
                if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) {
                    connectionProxy.getTargetConnection().rollback();
                }
                throw e;
            } finally {
                connectionProxy.getContext().reset();
                connectionProxy.setAutoCommit(true);
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    全局锁的策略, 是在一个while(true)循环里不断执行

     protected <T> T doRetryOnLockConflict(Callable<T> callable) throws Exception {
                LockRetryController lockRetryController = new LockRetryController();
                while (true) {
                    try {
                        return callable.call();
                    } catch (LockConflictException lockConflict) {
                        onException(lockConflict);
                        lockRetryController.sleep(lockConflict);
                    } catch (Exception e) {
                        onException(e);
                        throw e;
                    }
                }
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    如果出现异常是LockConflictException,进入sleep

    public void sleep(Exception e) throws LockWaitTimeoutException {
            if (--lockRetryTimes < 0) {
                throw new LockWaitTimeoutException("Global lock wait timeout", e);
            }
    
            try {
                Thread.sleep(lockRetryInternal);
            } catch (InterruptedException ignore) {
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这两个变量就是@GlobalLock注解的两个配置,一个是重试次数,一个重试之间的间隔时间。

    继续就是执行数据库更新操作
    io.seata.rm.datasource.exec.AbstractDMLBaseExecutor#executeAutoCommitFalse
    发现这里也会生成,undoLog,beforeImage和afterImage,其实想想,在GlobalLock下,是没必要生成undoLog的。但是现有逻辑确实要生成,这个seata后续应该会优化。

    protected T executeAutoCommitFalse(Object[] args) throws Exception {
            if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
                throw new NotSupportYetException("multi pk only support mysql!");
            }
            TableRecords beforeImage = beforeImage();
            T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
            TableRecords afterImage = afterImage(beforeImage);
            prepareUndoLog(beforeImage, afterImage);
            return result;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    生成beforeImage和aferImage的逻辑也比较简单。分别在执行更新前,查询数据库,和更新后查询数据库
    可见记录undoLog是十分影响性能的,查询就多了两次,如果undoLog入库还要再多一次入库操作。
    再看prepareUndoLog

     protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
            if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
                return;
            }
            if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
                if (beforeImage.getRows().size() != afterImage.getRows().size()) {
                    throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
                }
            }
            ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    
            TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
            String lockKeys = buildLockKey(lockKeyRecords);
            if (null != lockKeys) {
                connectionProxy.appendLockKey(lockKeys);
    
                SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
                connectionProxy.appendUndoLog(sqlUndoLog);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    将lockKeys,和undoLog,暂时记录在connectionProxy中,也就是说至此还没有将uodoLog记录到数据库,也没有判断全局锁,这些事情都留到了事务提交
    io.seata.rm.datasource.ConnectionProxy#doCommit

     private void doCommit() throws SQLException {
            if (context.inGlobalTransaction()) {
                processGlobalTransactionCommit();
            } else if (context.isGlobalLockRequire()) {
                processLocalCommitWithGlobalLocks();
            } else {
                targetConnection.commit();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    进入io.seata.rm.datasource.ConnectionProxy#processLocalCommitWithGlobalLocks
    这个 方法很简单就是首先进行锁的检查,并没有我想象中的加索全局事务。

     private void processLocalCommitWithGlobalLocks() throws SQLException {
            checkLock(context.buildLockKeys());
            try {
                targetConnection.commit();
            } catch (Throwable ex) {
                throw new SQLException(ex);
            }
            context.reset();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    也就是说,使用GlobalLock会对全局锁检测,但是并不会对记录加全局锁。但是配合全局事务这样已经能够保证全局事务的原子性了。可见GlobalLock还是要和本地事务组合一起使用的,这样才能保证,GlobalLock执行完毕本地事务未提交的数据不会被别的本地事务/分布式事务修改掉。

  • 相关阅读:
    Ragnar-lothbrok 靶机
    HarmonyOS--状态管理--装饰器
    chatGPT教你算法(2)——常用的查找算法
    基于nodejs的在线跑腿管理系统
    从命令行管理文件
    【高性能计算】C++多线程计算与线程池
    warning LNK4017: DESCRIPTION 语句不支持目标平台;已忽略
    docker入门加实战—项目部署之DockrCompose
    【JDK 8-集合框架】5.3 limit 和 sorted 函数
    ModelCoder状态机:对柴油机工况判断策略进行建模
  • 原文地址:https://blog.csdn.net/qq_37436172/article/details/126689639