• Spring-Cloud-Alibaba-SEATA源码解析(三)(客户端)


    前言

            在上一篇,分析了seata at模式分布式事务的执行流程,可以看到它和spring声明式事务的过程非常相似,剩下的问题是分支事务是如何提交和回滚的,seata中的全局锁是什么?答案是seata代理了我们的数据源,这个代理数据源对数据源的某些方法进行了增强,加入了申请全局锁,生成undo_log,注册分支事务等逻辑。

    全局锁

    全局锁主要用来实现全局事务的写隔离:

    • 一阶段本地事务提交前,需要确保先拿到 全局锁 。
    • 拿不到 全局锁 ,不能提交本地事务。
    • 拿 全局锁 的尝试被限制在一定范围内,超出范围将放弃,并回滚本地事务,释放本地锁。

    以一个示例来说明:

            两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。

            tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的 全局锁 ,本地提交释放本地锁。 tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的 全局锁 ,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待 全局锁 。

    tx1 二阶段全局提交,释放 全局锁 。tx2 拿到 全局锁 提交本地事务。

            如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。

            此时,如果 tx2 仍在等待该数据的 全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的 全局锁 等锁超时,放弃 全局锁 并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。

    因为整个过程 全局锁 在 tx1 结束前一直是被 tx1 持有的,所以不会发生 脏写 的问题。

    以上内容截取自seata官网。

    下面进入代理数据源的源码,入口是一个springboot自动配置类:SeataDataSourceAutoConfiguration

    1. @ConditionalOnBean(DataSource.class)
    2. @ConditionalOnExpression("${seata.enable:true} && ${seata.enableAutoDataSourceProxy:true} && ${seata.enable-auto-data-source-proxy:true}")
    3. public class SeataDataSourceAutoConfiguration {
    4. /**
    5. * The bean seataDataSourceBeanPostProcessor.
    6. */
    7. @Bean(BEAN_NAME_SEATA_DATA_SOURCE_BEAN_POST_PROCESSOR)
    8. @ConditionalOnMissingBean(SeataDataSourceBeanPostProcessor.class)
    9. public SeataDataSourceBeanPostProcessor seataDataSourceBeanPostProcessor(SeataProperties seataProperties) {
    10. return new SeataDataSourceBeanPostProcessor(seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
    11. }
    12. /**
    13. * The bean seataAutoDataSourceProxyCreator.
    14. */
    15. @Bean(BEAN_NAME_SEATA_AUTO_DATA_SOURCE_PROXY_CREATOR)
    16. @ConditionalOnMissingBean(SeataAutoDataSourceProxyCreator.class)
    17. public SeataAutoDataSourceProxyCreator seataAutoDataSourceProxyCreator(SeataProperties seataProperties) {
    18. return new SeataAutoDataSourceProxyCreator(seataProperties.isUseJdkProxy(),
    19. seataProperties.getExcludesForAutoProxying(), seataProperties.getDataSourceProxyMode());
    20. }
    21. }

            首先是SeataDataSourceBeanPostProcessor,bean的后置处理器,重写了bean的初始化后置处理方法:postProcessAfterInitialization,在这个方法中seata会为Spring容器中的数据源bean创建代理数据源。

    1. public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    2. if (bean instanceof DataSource) {
    3. //不属于排除的数据源(可在配置文件中设置)
    4. if (!excludes.contains(bean.getClass().getName())) {
    5. //创建代理数据源,并和原数据源做映射
    6. DataSourceProxyHolder.get().putDataSource((DataSource) bean, dataSourceProxyMode);
    7. }
    8. //If is SeataDataSourceProxy, return the original data source.
    9. if (bean instanceof SeataDataSourceProxy) {
    10. LOGGER.info("Unwrap the bean of the data source," +
    11. " and return the original data source to replace the data source proxy.");
    12. return ((SeataDataSourceProxy) bean).getTargetDataSource();
    13. }
    14. }
    15. return bean;
    16. }
    17. public SeataDataSourceProxy putDataSource(DataSource dataSource, BranchType dataSourceProxyMode) {
    18. DataSource originalDataSource;
    19. if (dataSource instanceof SeataDataSourceProxy) {
    20. SeataDataSourceProxy dataSourceProxy = (SeataDataSourceProxy) dataSource;
    21. //只会创建一次代理
    22. if (dataSourceProxyMode == dataSourceProxy.getBranchType()) {
    23. return (SeataDataSourceProxy) dataSource;
    24. }
    25. //Get the original data source.
    26. originalDataSource = dataSourceProxy.getTargetDataSource();
    27. } else {
    28. originalDataSource = dataSource;
    29. }
    30. //从映射map中获取代理数据源
    31. SeataDataSourceProxy dsProxy = dataSourceProxyMap.get(originalDataSource);
    32. //没有则创建
    33. if (dsProxy == null) {
    34. synchronized (dataSourceProxyMap) {
    35. dsProxy = dataSourceProxyMap.get(originalDataSource);
    36. if (dsProxy == null) {
    37. //创建代理数据源
    38. dsProxy = createDsProxyByMode(dataSourceProxyMode, originalDataSource);
    39. dataSourceProxyMap.put(originalDataSource, dsProxy);
    40. }
    41. }
    42. }
    43. return dsProxy;
    44. }
    45. private SeataDataSourceProxy createDsProxyByMode(BranchType mode, DataSource originDs) {
    46. //默认是AT模式,创建DataSourceProxy对象
    47. return BranchType.XA == mode ? new DataSourceProxyXA(originDs) : new DataSourceProxy(originDs);
    48. }

    在创建代理数据源的过程中,seata客户端会向TC注册RM:

    1. public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) {
    2. if (targetDataSource instanceof SeataDataSourceProxy) {
    3. LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName());
    4. targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource();
    5. }
    6. this.targetDataSource = targetDataSource;
    7. init(targetDataSource, resourceGroupId);
    8. }
    9. private void init(DataSource dataSource, String resourceGroupId) {
    10. this.resourceGroupId = resourceGroupId;
    11. try (Connection connection = dataSource.getConnection()) {
    12. //获取数据库url,类型,并填充DataSourceProxy属性
    13. jdbcUrl = connection.getMetaData().getURL();
    14. dbType = JdbcUtils.getDbType(jdbcUrl);
    15. if (JdbcConstants.ORACLE.equals(dbType)) {
    16. userName = connection.getMetaData().getUserName();
    17. }
    18. } catch (SQLException e) {
    19. throw new IllegalStateException("can not init dataSource", e);
    20. }
    21. //向TC注册RM,包含了本地数据库连接信息
    22. //依然是通过netty客户端发送
    23. DefaultResourceManager.get().registerResource(this);
    24. if (ENABLE_TABLE_META_CHECKER_ENABLE) {
    25. tableMetaExcutor.scheduleAtFixedRate(() -> {
    26. try (Connection connection = dataSource.getConnection()) {
    27. TableMetaCacheFactory.getTableMetaCache(DataSourceProxy.this.getDbType())
    28. .refresh(connection, DataSourceProxy.this.getResourceId());
    29. } catch (Exception ignore) {
    30. }
    31. }, 0, TABLE_META_CHECKER_INTERVAL, TimeUnit.MILLISECONDS);
    32. }
    33. //设置默认分支事务类型为AT模式
    34. RootContext.setDefaultBranchType(this.getBranchType());
    35. }

            然后是SeataAutoDataSourceProxyCreator这个bean,它继承了AbstractAutoProxyCreator,这个类大家应该很熟悉了,sping aop的核心类,用来为bean创建动态代理。SeataAutoDataSourceProxyCreator重写了父类的getAdvicesAndAdvisorsForBeanshouldSkip方法,分别用来从容器中寻找对应数据源bean Datasource的通知类和在创建动态代理时判断是否跳过:

    1. public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator {
    2. private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class);
    3. private final List<String> excludes;
    4. private final Advisor advisor;
    5. public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes, String dataSourceProxyMode) {
    6. this.excludes = Arrays.asList(excludes);
    7. this.advisor = new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice(dataSourceProxyMode));
    8. setProxyTargetClass(!useJdkProxy);
    9. }
    10. @Override
    11. protected Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, TargetSource customTargetSource) throws BeansException {
    12. if (LOGGER.isInfoEnabled()) {
    13. LOGGER.info("Auto proxy of [{}]", beanName);
    14. }
    15. //获取能够匹配到的通知器,这里直接返回自己的Advisor也就是AdvisorDefaultIntroductionAdvisor,省去匹配过程
    16. return new Object[]{advisor};
    17. }
    18. @Override
    19. protected boolean shouldSkip(Class<?> beanClass, String beanName) {
    20. //判断是否要跳过创建代理的过程,这里直接判断是否是DataSource类型
    21. //如果是DataSource类型则需要为止创建代理
    22. return !DataSource.class.isAssignableFrom(beanClass) ||
    23. SeataProxy.class.isAssignableFrom(beanClass) ||
    24. excludes.contains(beanClass.getName());
    25. }
    26. }

            最后orm框架在执行sql时,会拿到spring容器中的数据源,并执行获取数据库连接connection的方法,会来到拦截器SeataAutoDataSourceProxyAdvice的invoke:

    1. public Object invoke(MethodInvocation invocation) throws Throwable {
    2. if (!RootContext.requireGlobalLock() && dataSourceProxyMode != RootContext.getBranchType()) {
    3. return invocation.proceed();
    4. }
    5. Method method = invocation.getMethod();
    6. Object[] args = invocation.getArguments();
    7. //要执行的目标方法
    8. Method m = BeanUtils.findDeclaredMethod(dataSourceProxyClazz, method.getName(), method.getParameterTypes());
    9. //判断是否是datasource的方法
    10. if (m != null && DataSource.class.isAssignableFrom(method.getDeclaringClass())) {
    11. //从DataSourceProxyHolder中获取对应数据源的映射过的代理数据源DataSourceProxy
    12. //这里代理数据源DataSourceProxy在spinrg加载容器时已经创建完毕
    13. SeataDataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) invocation.getThis(), dataSourceProxyMode);
    14. //执行代理数据源DataSourceProxy的相对应的方法
    15. return m.invoke(dataSourceProxy, args);
    16. } else {
    17. return invocation.proceed();
    18. }
    19. }

    DataSourceProxy代理过程:

    DataSourceProxy-->ConnectionProxy-->PreparedStatementProxy

    最终在执行sql时,PreparedStatementProxy对执行sql方法execute进行代理PreparedStatementProxy#execute:

    1. public boolean execute() throws SQLException {
    2. return ExecuteTemplate.execute(this, (statement, args) -> statement.execute());
    3. }
    4. public static <T, S extends Statement> T execute(StatementProxy<S> statementProxy,
    5. StatementCallback<T, S> statementCallback,
    6. Object... args) throws SQLException {
    7. return execute(null, statementProxy, statementCallback, args);
    8. }
    9. public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
    10. StatementProxy<S> statementProxy,
    11. StatementCallback<T, S> statementCallback,
    12. Object... args) throws SQLException {
    13. if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
    14. // Just work as original statement
    15. return statementCallback.execute(statementProxy.getTargetStatement(), args);
    16. }
    17. String dbType = statementProxy.getConnectionProxy().getDbType();
    18. if (CollectionUtils.isEmpty(sqlRecognizers)) {
    19. //获取sql类型的识别器
    20. sqlRecognizers = SQLVisitorFactory.get(
    21. statementProxy.getTargetSQL(),
    22. dbType);
    23. }
    24. Executor<T> executor;
    25. if (CollectionUtils.isEmpty(sqlRecognizers)) {
    26. executor = new PlainExecutor<>(statementProxy, statementCallback);
    27. } else {
    28. if (sqlRecognizers.size() == 1) {
    29. SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
    30. //根据不同的sql类型进入相应的case
    31. switch (sqlRecognizer.getSQLType()) {
    32. case INSERT:
    33. executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
    34. new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
    35. new Object[]{statementProxy, statementCallback, sqlRecognizer});
    36. break;
    37. case UPDATE:
    38. //以update操作为例,得到一个UpdateExecutor
    39. executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
    40. break;
    41. case DELETE:
    42. executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
    43. break;
    44. case SELECT_FOR_UPDATE:
    45. executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
    46. break;
    47. default:
    48. executor = new PlainExecutor<>(statementProxy, statementCallback);
    49. break;
    50. }
    51. } else {
    52. executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
    53. }
    54. }
    55. T rs;
    56. try {
    57. //执行UpdateExecutor的execute方法执行sql
    58. rs = executor.execute(args);
    59. } catch (Throwable ex) {
    60. if (!(ex instanceof SQLException)) {
    61. // Turn other exception into SQLException
    62. ex = new SQLException(ex);
    63. }
    64. throw (SQLException) ex;
    65. }
    66. return rs;
    67. }

    来到父类BaseTransactionalExecutor#execute:

    1. public T execute(Object... args) throws Throwable {
    2. //从本地线程变量获取绑定了的全局事务id
    3. String xid = RootContext.getXID();
    4. if (xid != null) {
    5. //把xid绑定连接ConnectionProxy
    6. statementProxy.getConnectionProxy().bind(xid);
    7. }
    8. //设置GlobalLockRequire为true,意为在本地事务提交时需要获取全局锁
    9. statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock());
    10. return doExecute(args);
    11. }
    12. public T doExecute(Object... args) throws Throwable {
    13. AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
    14. if (connectionProxy.getAutoCommit()) {
    15. return executeAutoCommitTrue(args);
    16. } else {
    17. return executeAutoCommitFalse(args);
    18. }
    19. }
    20. protected T executeAutoCommitFalse(Object[] args) throws Exception {
    21. if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) {
    22. throw new NotSupportYetException("multi pk only support mysql!");
    23. }
    24. //获取前置镜像,其实就是select for updata查询语句,查询修改前对应的数据
    25. TableRecords beforeImage = beforeImage();
    26. //执行目标sql
    27. T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
    28. //获取后置镜像
    29. TableRecords afterImage = afterImage(beforeImage);
    30. //用前置镜像与后置镜像构建undo_log回滚日志,并设置到connectionProxy上
    31. prepareUndoLog(beforeImage, afterImage);
    32. return result;
    33. }

    然后执行提交操作ConnectionProxy#commit:

    1. public void commit() throws SQLException {
    2. try {
    3. LOCK_RETRY_POLICY.execute(() -> {
    4. doCommit();
    5. return null;
    6. });
    7. } catch (SQLException e) {
    8. if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) {
    9. rollback();
    10. }
    11. throw e;
    12. } catch (Exception e) {
    13. throw new SQLException(e);
    14. }
    15. }
    16. private void doCommit() throws SQLException {
    17. if (context.inGlobalTransaction()) {
    18. //如果时分布式事务的话
    19. processGlobalTransactionCommit();
    20. } else if (context.isGlobalLockRequire()) {
    21. processLocalCommitWithGlobalLocks();
    22. } else {
    23. targetConnection.commit();
    24. }
    25. }
    26. private void processGlobalTransactionCommit() throws SQLException {
    27. try {
    28. //向TC注册分支事务
    29. register();
    30. } catch (TransactionException e) {
    31. recognizeLockKeyConflictException(e, context.buildLockKeys());
    32. }
    33. try {
    34. //插入undo_log日志到undo_log表
    35. UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
    36. //本地数据库提交
    37. targetConnection.commit();
    38. } catch (Throwable ex) {
    39. LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
    40. //提交异常会向TC上报
    41. report(false);
    42. throw new SQLException(ex);
    43. }
    44. if (IS_REPORT_SUCCESS_ENABLE) {
    45. report(true);
    46. }
    47. context.reset();
    48. }
    49. private void register() throws TransactionException {
    50. if (!context.hasUndoLog() || !context.hasLockKey()) {
    51. return;
    52. }
    53. //RM向TC注册分支事务,返回分支事务id,依然基于netty
    54. Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
    55. null, context.getXid(), null, context.buildLockKeys());
    56. context.setBranchId(branchId);
    57. }

            值得注意的是,如果在执行目标sql发生异常,那么本地RM就不会去注册分支事务了,直接本地就会滚,从全局来看,全局事务发起者TM通过捕获rpc调用异常发起全局事务回滚。在注册分支事务时,可能会抛出TransactionException,意为获取全局锁失败,有可能是另一个全局事务拿到了全局锁,本地重试策略会不断睡眠+重试,最终抛出LockWaitTimeoutException超时异常,本地事务回滚,进而导致全局回滚。

    总结

            以上只是分布式事务第一阶段,一阶段本地事务顺利提交,等待二阶段TC的异步通知,二阶段本地RM根据TC的反馈,如果全局回滚,则执行本地的undo_log回滚日志,如果全局提交,则删除undo_log即可,最后RM上报这一步的结果,TC释放全局锁。

  • 相关阅读:
    17.RedHat认证-Ansible自动化运维(下)
    Linux:进程概念的引入和理解
    高通导航器软件开发包使用指南(10)
    【optimtool.unconstrain】无约束优化工具箱
    Linux下企业级夜莺监控分析工具的远程访问设置【内网穿透】
    C++排序函数sort()和qsort()的参数比较函数的统一记忆方法
    spring面试题
    计算机保研er历程分享(浙软、厦大、华师、东南网安、东北、西电、中南......)
    商业计划书PPT怎么做?这个AI软件一键在线生成,做PPT再也不求人!
    动态规划4(Leetcode746使用最小花费爬楼梯)
  • 原文地址:https://blog.csdn.net/w7sss/article/details/125509717