在上一篇,分析了seata at模式分布式事务的执行流程,可以看到它和spring声明式事务的过程非常相似,剩下的问题是分支事务是如何提交和回滚的,seata中的全局锁是什么?答案是seata代理了我们的数据源,这个代理数据源对数据源的某些方法进行了增强,加入了申请全局锁,生成undo_log,注册分支事务等逻辑。
全局锁主要用来实现全局事务的写隔离:
以一个示例来说明:
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。
tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。
以上内容截取自seata官网。
下面进入代理数据源的源码,入口是一个springboot自动配置类:SeataDataSourceAutoConfiguration
- @ConditionalOnBean(DataSource.class)
- @ConditionalOnExpression("${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
- public class SeataDataSourceAutoConfiguration {
-
- /**
- * The bean seataDataSourceBeanPostProcessor.
- */
- @Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
- @ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
- public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
- return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
- }
-
- /**
- * The bean seataAutoDataSourceProxyCreator.
- */
- @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
- @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
- public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
- return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
- seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
- }
- }
首先是SeataDataSourceBeanPostProcessor,bean的后置处理器,重写了bean的初始化后置处理方法:postProcessAfterInitialization,在这个方法中seata会为Spring容器中的数据源bean创建代理数据源。
- public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
- if (bean instanceof DataSource) {
- //不属于排除的数据源(可在配置文件中设置)
- if (!excludes.contains(bean.getClass().getName())) {
- //创建代理数据源,并和原数据源做映射
- DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
- }
-
- //If is SeataDataSourceProxy, return the original data source.
- if (bean instanceof SeataDataSourceProxy) {
- LOGGER.info("Unwrap the bean of the data source," +
- " and return the original data source to replace the data source proxy.");
- return ((SeataDataSourceProxy) bean).getTargetDataSource();
- }
- }
- return bean;
- }
-
-
- public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
- DataSource originalDataSource;
- if (dataSource instanceof SeataDataSourceProxy) {
- SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
-
- //只会创建一次代理
- if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
- return (SeataDataSourceProxy) dataSource;
- }
-
- //Get the original data source.
- originalDataSource = dataSourceProxy.getTargetDataSource();
- } else {
- originalDataSource = dataSource;
- }
- //从映射map中获取代理数据源
- SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
- //没有则创建
- if (dsProxy == null) {
- synchronized (dataSourceProxyMap) {
- dsProxy = dataSourceProxyMap.get(originalDataSource);
- if (dsProxy == null) {
- //创建代理数据源
- dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
- dataSourceProxyMap.put(originalDataSource, dsProxy);
- }
- }
- }
- return dsProxy;
- }
-
-
- private SeataDataSourceProxy createDsProxyByMode(BranchType mode, DataSource originDs) {
- //默认是AT模式,创建DataSourceProxy对象
- return BranchType.XA == mode ? new DataSourceProxyXA(originDs) : new DataSourceProxy(originDs);
- }
在创建代理数据源的过程中,seata客户端会向TC注册RM:
- public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
- if (targetDataSource instanceof SeataDataSourceProxy) {
- LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
- targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
- }
- this.targetDataSource = targetDataSource;
- init(targetDataSource, resourceGroupId);
- }
-
-
- private void init(DataSource dataSource, String resourceGroupId) {
- this.resourceGroupId = resourceGroupId;
- try (Connection connection = dataSource.getConnection()) {
- //获取数据库url,类型,并填充DataSourceProxy属性
- jdbcUrl = connection.getMetaData().getURL();
- dbType = JdbcUtils.getDbType(jdbcUrl);
- if (JdbcConstants.ORACLE.equals(dbType)) {
- userName = connection.getMetaData().getUserName();
- }
- } catch (SQLException e) {
- throw new IllegalStateException("can not init dataSource", e);
- }
- //向TC注册RM,包含了本地数据库连接信息
- //依然是通过netty客户端发送
- DefaultResourceManager.get().registerResource(this);
- if (ENABLE_TABLE_META_CHECKER_ENABLE) {
- tableMetaExcutor.scheduleAtFixedRate(() -> {
- try (Connection connection = dataSource.getConnection()) {
- TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
- .refresh(connection, DataSourceProxy.this.getResourceId());
- } catch (Exception ignore) {
- }
- }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
- }
-
- //设置默认分支事务类型为AT模式
- RootContext.setDefaultBranchType(this.getBranchType());
- }
然后是SeataAutoDataSourceProxyCreator这个bean,它继承了AbstractAutoProxyCreator,这个类大家应该很熟悉了,sping aop的核心类,用来为bean创建动态代理。SeataAutoDataSourceProxyCreator重写了父类的getAdvicesAndAdvisorsForBean和shouldSkip方法,分别用来从容器中寻找对应数据源bean Datasource的通知类和在创建动态代理时判断是否跳过:
- public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
- private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
- private final List<String> excludes;
- private final Advisor advisor;
-
- public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
- this.excludes = Arrays.asList(excludes);
- this.advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode));
- setProxyTargetClass(!useJdkProxy);
- }
-
- @Override
- protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Auto proxy of [{}]", beanName);
- }
- //获取能够匹配到的通知器,这里直接返回自己的Advisor也就是AdvisorDefaultIntroductionAdvisor,省去匹配过程
- return new Object[]{advisor};
- }
-
- @Override
- protected boolean shouldSkip(Class<?> beanClass, String beanName) {
- //判断是否要跳过创建代理的过程,这里直接判断是否是DataSource类型
- //如果是DataSource类型则需要为止创建代理
- return !DataSource.class.isAssignableFrom(beanClass) ||
- SeataProxy.class.isAssignableFrom(beanClass) ||
- excludes.contains(beanClass.getName());
- }
- }
最后orm框架在执行sql时,会拿到spring容器中的数据源,并执行获取数据库连接connection的方法,会来到拦截器SeataAutoDataSourceProxyAdvice的invoke:
- public Object invoke(MethodInvocation invocation) throws Throwable {
- if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
- return invocation.proceed();
- }
-
- Method method = invocation.getMethod();
- Object[] args = invocation.getArguments();
- //要执行的目标方法
- Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
- //判断是否是datasource的方法
- if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
- //从DataSourceProxyHolder中获取对应数据源的映射过的代理数据源DataSourceProxy
- //这里代理数据源DataSourceProxy在spinrg加载容器时已经创建完毕
- SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
- //执行代理数据源DataSourceProxy的相对应的方法
- return m.invoke(dataSourceProxy, args);
- } else {
- return invocation.proceed();
- }
- }
DataSourceProxy代理过程:
DataSourceProxy-->ConnectionProxy-->PreparedStatementProxy
最终在执行sql时,PreparedStatementProxy对执行sql方法execute进行代理PreparedStatementProxy#execute:
- public boolean execute() throws SQLException {
- return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
- }
-
- public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
- StatementCallback<T, S> statementCallback,
- Object... args) throws SQLException {
- return execute(null, statementProxy, statementCallback, args);
- }
-
-
- public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
- StatementProxy<S> statementProxy,
- StatementCallback<T, S> statementCallback,
- Object... args) throws SQLException {
- if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
- // Just work as original statement
- return statementCallback.execute(statementProxy.getTargetStatement(), args);
- }
-
- String dbType = statementProxy.getConnectionProxy().getDbType();
- if (CollectionUtils.isEmpty(sqlRecognizers)) {
- //获取sql类型的识别器
- sqlRecognizers = SQLVisitorFactory.get(
- statementProxy.getTargetSQL(),
- dbType);
- }
- Executor<T> executor;
- if (CollectionUtils.isEmpty(sqlRecognizers)) {
- executor = new PlainExecutor<>(statementProxy, statementCallback);
- } else {
- if (sqlRecognizers.size() == 1) {
- SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
- //根据不同的sql类型进入相应的case
- switch (sqlRecognizer.getSQLType()) {
- case INSERT:
- executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
- new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
- new Object[]{statementProxy, statementCallback, sqlRecognizer});
- break;
- case UPDATE:
- //以update操作为例,得到一个UpdateExecutor
- executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
- break;
- case DELETE:
- executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
- break;
- case SELECT_FOR_UPDATE:
- executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
- break;
- default:
- executor = new PlainExecutor<>(statementProxy, statementCallback);
- break;
- }
- } else {
- executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
- }
- }
- T rs;
- try {
- //执行UpdateExecutor的execute方法执行sql
- rs = executor.execute(args);
- } catch (Throwable ex) {
- if (!(ex instanceof SQLException)) {
- // Turn other exception into SQLException
- ex = new SQLException(ex);
- }
- throw (SQLException) ex;
- }
- return rs;
- }
来到父类BaseTransactionalExecutor#execute:
- public T execute(Object... args) throws Throwable {
- //从本地线程变量获取绑定了的全局事务id
- String xid = RootContext.getXID();
- if (xid != null) {
- //把xid绑定连接ConnectionProxy
- statementProxy.getConnectionProxy().bind(xid);
- }
- //设置GlobalLockRequire为true,意为在本地事务提交时需要获取全局锁
- statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
- return doExecute(args);
- }
-
-
- public T doExecute(Object... args) throws Throwable {
- AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
- if (connectionProxy.getAutoCommit()) {
- return executeAutoCommitTrue(args);
- } else {
- return executeAutoCommitFalse(args);
- }
- }
-
-
- protected T executeAutoCommitFalse(Object[] args) throws Exception {
- if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
- throw new NotSupportYetException("multi pk only support mysql!");
- }
- //获取前置镜像,其实就是select for updata查询语句,查询修改前对应的数据
- TableRecords beforeImage = beforeImage();
- //执行目标sql
- T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
- //获取后置镜像
- TableRecords afterImage = afterImage(beforeImage);
- //用前置镜像与后置镜像构建undo_log回滚日志,并设置到connectionProxy上
- prepareUndoLog(beforeImage, afterImage);
- return result;
- }
然后执行提交操作ConnectionProxy#commit:
- public void commit() throws SQLException {
- try {
- LOCK_RETRY_POLICY.execute(() -> {
- doCommit();
- return null;
- });
- } catch (SQLException e) {
- if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
- rollback();
- }
- throw e;
- } catch (Exception e) {
- throw new SQLException(e);
- }
- }
-
-
- private void doCommit() throws SQLException {
- if (context.inGlobalTransaction()) {
- //如果时分布式事务的话
- processGlobalTransactionCommit();
- } else if (context.isGlobalLockRequire()) {
- processLocalCommitWithGlobalLocks();
- } else {
- targetConnection.commit();
- }
- }
-
-
- private void processGlobalTransactionCommit() throws SQLException {
- try {
- //向TC注册分支事务
- register();
- } catch (TransactionException e) {
- recognizeLockKeyConflictException(e, context.buildLockKeys());
- }
- try {
- //插入undo_log日志到undo_log表
- UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
- //本地数据库提交
- targetConnection.commit();
- } catch (Throwable ex) {
- LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
- //提交异常会向TC上报
- report(false);
- throw new SQLException(ex);
- }
- if (IS_REPORT_SUCCESS_ENABLE) {
- report(true);
- }
- context.reset();
- }
-
-
- private void register() throws TransactionException {
- if (!context.hasUndoLog() || !context.hasLockKey()) {
- return;
- }
- //RM向TC注册分支事务,返回分支事务id,依然基于netty
- Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
- null, context.getXid(), null, context.buildLockKeys());
- context.setBranchId(branchId);
- }
值得注意的是,如果在执行目标sql发生异常,那么本地RM就不会去注册分支事务了,直接本地就会滚,从全局来看,全局事务发起者TM通过捕获rpc调用异常发起全局事务回滚。在注册分支事务时,可能会抛出TransactionException,意为获取全局锁失败,有可能是另一个全局事务拿到了全局锁,本地重试策略会不断睡眠+重试,最终抛出LockWaitTimeoutException超时异常,本地事务回滚,进而导致全局回滚。
以上只是分布式事务第一阶段,一阶段本地事务顺利提交,等待二阶段TC的异步通知,二阶段本地RM根据TC的反馈,如果全局回滚,则执行本地的undo_log回滚日志,如果全局提交,则删除undo_log即可,最后RM上报这一步的结果,TC释放全局锁。