前提是搞清在AT模式下,事务是阶段提交,然后全局事务再提交。在阶段提交(某个RM提交本地事务),全局事务未提交时,虽然其他的分布式事务无法修改该事务锁定的数据,但是能查询到阶段提交的数据,因此这么看 AT默认的隔离级别是读未提交,也就说会发生脏读
目标:全局事务整体提交后,其数据才能被其他事务查询到。
方式:实现方法很简单,那就是在select语句后加 for update;
原理
io.seata.rm.datasource.exec.ExecuteTemplate#execute(java.util.List, io.seata.rm.datasource.exec.StatementCallback
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
seata为 for update语句单独写了一个执行器SelectForUpdateExecutor
实现
io.seata.rm.datasource.exec.SelectForUpdateExecutor#doExecute
public T doExecute(Object... args) throws Throwable {
Connection conn = statementProxy.getConnection();
DatabaseMetaData dbmd = conn.getMetaData();
T rs;
Savepoint sp = null;
boolean originalAutoCommit = conn.getAutoCommit();
try {
if (originalAutoCommit) {
/*
* In order to hold the local db lock during global lock checking
* set auto commit value to false first if original auto commit was true
*/
conn.setAutoCommit(false);
} else if (dbmd.supportsSavepoints()) {
/*
* In order to release the local db lock when global lock conflict
* create a save point if original auto commit was false, then use the save point here to release db
* lock during global lock checking if necessary
*/
sp = conn.setSavepoint();
} else {
throw new SQLException("not support savepoint. please check your db version");
}
LockRetryController lockRetryController = new LockRetryController();
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
String selectPKSQL = buildSelectSQL(paramAppenderList);
while (true) {
try {
// #870
// execute return Boolean
// executeQuery return ResultSet
rs = statementCallback.execute(statementProxy.getTargetStatement(), args);
// Try to get global lock of those rows selected
TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
String lockKeys = buildLockKey(selectPKRows);
if (StringUtils.isNullOrEmpty(lockKeys)) {
break;
}
if (RootContext.inGlobalTransaction() || RootContext.requireGlobalLock()) {
// Do the same thing under either @GlobalTransactional or @GlobalLock,
// that only check the global lock here.
statementProxy.getConnectionProxy().checkLock(lockKeys);
} else {
throw new RuntimeException("Unknown situation!");
}
break;
} catch (LockConflictException lce) {
if (sp != null) {
conn.rollback(sp);
} else {
conn.rollback();
}
// trigger retry
lockRetryController.sleep(lce);
}
}
} finally {
if (sp != null) {
try {
if (!JdbcConstants.ORACLE.equalsIgnoreCase(getDbType())) {
conn.releaseSavepoint(sp);
}
} catch (SQLException e) {
LOGGER.error("{} release save point error.", getDbType(), e);
}
}
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
}
return rs;
}
这个实现也很简单,执行查询语句,对查询结果构建lockKeys,然后向seata服务器查询此记录当前有没有上锁.如果此时全局事务仍然锁住这条记录,那么根据锁的尝试次数和尝试间隔,会先尝试等待全局锁释放。如果超出重试次数,那么会抛出异常。
在分布式事务中实现读已提交的代价是很高的,如果对查询加上了for update,那么首先,数据库会上写锁,在rm事务未提交前,如果对锁定记录做写操作,会阻塞线程。另外,全局锁的判断不仅需要额外的网络io,还可能会阻塞线程,对性能的影响还是比较大。对于非必要场景还是要尽量避免去使用。