• SeataAT模式如何达到读已提交的隔离级别


    前提是搞清在AT模式下,事务是阶段提交,然后全局事务再提交。在阶段提交(某个RM提交本地事务),全局事务未提交时,虽然其他的分布式事务无法修改该事务锁定的数据,但是能查询到阶段提交的数据,因此这么看 AT默认的隔离级别是读未提交,也就说会发生脏读

    如何实现读已提交隔离界别

    目标:全局事务整体提交后,其数据才能被其他事务查询到。
    方式:实现方法很简单,那就是在select语句后加 for update;
    原理

    1. 首先加上for update,数据库本身会对查询记录上写锁,如果在某个rm正在更新这条记录,而没还没有提交其的本地事务,那么此时的写锁会加不上,select ····· for update 会阻塞等待。
    2. 如果 for update查询时,rm事务已经提交,但是全局事务未提交。那么此时的 select for update应该也需要查询失败。那么依赖数据库的写锁已经达不到这个效果了,那么这个时候就需要seata利用全局锁来实现这个功能了。

    针对select for update的特殊处理

    io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List, io.seata.rm.datasource.StatementProxy, io.seata.rm.datasource.exec.StatementCallback, java.lang.Object…)

       switch (sqlRecognizer.getSQLType()) {
                        case INSERT:
                            executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                    new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                    new Object[]{statementProxy, statementCallback, sqlRecognizer});
                            break;
                        case UPDATE:
                            executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case DELETE:
                            executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        case SELECT_FOR_UPDATE:
                            executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        default:
                            executor = new PlainExecutor<>(statementProxy, statementCallback);
                            break;
                    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    seata为 for update语句单独写了一个执行器SelectForUpdateExecutor
    实现
    io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute

       public T doExecute(Object... args) throws Throwable {
            Connection conn = statementProxy.getConnection();
            DatabaseMetaData dbmd = conn.getMetaData();
            T rs;
            Savepoint sp = null;
            boolean originalAutoCommit = conn.getAutoCommit();
            try {
                if (originalAutoCommit) {
                    /*
                     * In order to hold the local db lock during global lock checking
                     * set auto commit value to false first if original auto commit was true
                     */
                    conn.setAutoCommit(false);
                } else if (dbmd.supportsSavepoints()) {
                    /*
                     * In order to release the local db lock when global lock conflict
                     * create a save point if original auto commit was false, then use the save point here to release db
                     * lock during global lock checking if necessary
                     */
                    sp = conn.setSavepoint();
                } else {
                    throw new SQLException("not support savepoint. please check your db version");
                }
    
                LockRetryController lockRetryController = new LockRetryController();
                ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
                String selectPKSQL = buildSelectSQL(paramAppenderList);
                while (true) {
                    try {
                        // #870
                        // execute return Boolean
                        // executeQuery return ResultSet
                        rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
    
                        // Try to get global lock of those rows selected
                        TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
                        String lockKeys = buildLockKey(selectPKRows);
                        if (StringUtils.isNullOrEmpty(lockKeys)) {
                            break;
                        }
    
                        if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
                            // Do the same thing under either @GlobalTransactional or @GlobalLock, 
                            // that only check the global lock  here.
                            statementProxy.getConnectionProxy().checkLock(lockKeys);
                        } else {
                            throw new RuntimeException("Unknown situation!");
                        }
                        break;
                    } catch (LockConflictException lce) {
                        if (sp != null) {
                            conn.rollback(sp);
                        } else {
                            conn.rollback();
                        }
                        // trigger retry
                        lockRetryController.sleep(lce);
                    }
                }
            } finally {
                if (sp != null) {
                    try {
                        if (!JdbcConstants.ORACLE.equalsIgnoreCase(getDbType())) {
                            conn.releaseSavepoint(sp);
                        }
                    } catch (SQLException e) {
                        LOGGER.error("{} release save point error.", getDbType(), e);
                    }
                }
                if (originalAutoCommit) {
                    conn.setAutoCommit(true);
                }
            }
            return rs;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    这个实现也很简单,执行查询语句,对查询结果构建lockKeys,然后向seata服务器查询此记录当前有没有上锁.如果此时全局事务仍然锁住这条记录,那么根据锁的尝试次数和尝试间隔,会先尝试等待全局锁释放。如果超出重试次数,那么会抛出异常。

    代价

    分布式事务中实现读已提交的代价是很高的,如果对查询加上了for update,那么首先,数据库会上写锁,在rm事务未提交前,如果对锁定记录做写操作,会阻塞线程。另外,全局锁的判断不仅需要额外的网络io,还可能会阻塞线程,对性能的影响还是比较大。对于非必要场景还是要尽量避免去使用。

  • 相关阅读:
    软件设计不是CRUD(12):低耦合模块设计理论——业务抽象:模块分层操作
    TOMCAT8.0 配置
    【新知实验室-TRTC开发】实时音视频之欢度世界杯
    10个最受欢迎的HDR环境贴图下载站
    python经典百题之交换数组元素
    Nerstudio 相机优化代码理解
    Revit中土建模块【精准生梁】快速生成
    简要解析盒子模型
    看一眼就会的k8s权限管理手把手教学
    《清单革命》内容梳理&随笔
  • 原文地址:https://blog.csdn.net/qq_37436172/article/details/126919593