• Spring-Cloud-Alibaba-SEATA源码解析(一)(客户端)


    前言

            seata作为Spring-Cloud-Alibaba的一员,目前正逐渐成为分布式事务的一个事实标准。它是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。seata支持多种分布式事务实现,如TCC,SAGA,XA,以及独有的AT模式,AT模式是阿里官方推荐的模式,提供无侵入自动补偿的事务模式,目前已支持MySQL、Oracle、PostgreSQL、TiDB 和 MariaDB。下面结合Spring-Cloud-Alibaba介绍SEATA  AT模式源码。

    前置知识:SEATA的基本使用,熟悉SEATA客户端和服务端配置和注册中心配置。

    @GlobalTransactional

            seata的使用非常简单,几乎无侵入,只要在事务方法上添加@GlobalTransactional注解,使用方式和spring声明式事务很像,不仅如此,seata的AT模式在执行流程上也和spring声明式事务很像,只不过是把事务范围扩大到了跨库/跨应用/跨数据库连接。

    像这样在一个类或方法上添加@GlobalTransactional注解:

    1. @GlobalTransactional
    2. public void testSeata(){
    3. resourceMapper.insert(new Resource("资源1","001"));
    4. String res=feignService.addResource(new Resource("资源2","002"));
    5. System.out.println(res);
    6. }

    源码入口:SeataAutoConfiguration

    1. @Bean(BEAN_NAME_FAILURE_HANDLER)
    2. @ConditionalOnMissingBean(FailureHandler.class)
    3. public FailureHandler failureHandler() {
    4. return new DefaultFailureHandlerImpl();
    5. }
    6. @Bean
    7. @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
    8. @ConditionalOnMissingBean(GlobalTransactionScanner.class)
    9. public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
    10. if (LOGGER.isInfoEnabled()) {
    11. LOGGER.info("Automatically configure Seata");
    12. }
    13. return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
    14. }

            主要看GlobalTransactionScanner这个类,一个扫描器,它能够扫描@GlobalTransactional注解的方法,并为之创建aop切面,和声明式事务一个套路。不同的是GlobalTransactionScanner这个类一个人干了全部是的事儿,看一下它的继承情况:

    public class GlobalTransactionScanner extends AbstractAutoProxyCreator
        implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean
    

    AbstractAutoProxyCreator在spring aop中是一个非常重要的类,它是一个BeanPostProcessor,在bean的初始化后置调用中为bean创建动态代理。

    首先看一下GlobalTransactionScanner的初始化方法:

    1. public void afterPropertiesSet() {
    2. if (disableGlobalTransaction) {
    3. if (LOGGER.isInfoEnabled()) {
    4. LOGGER.info("Global transaction is disabled.");
    5. }
    6. ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
    7. (ConfigurationChangeListener)this);
    8. return;
    9. }
    10. if (initialized.compareAndSet(false, true)) {
    11. //初始化TM,RM客户端
    12. initClient();
    13. }
    14. }
    15. private void initClient() {
    16. if (LOGGER.isInfoEnabled()) {
    17. LOGGER.info("Initializing Global Transaction Clients ... ");
    18. }
    19. if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
    20. throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
    21. }
    22. //init TM
    23. TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
    24. //init RM
    25. RMClient.init(applicationId, txServiceGroup);
    26. registerSpringShutdownHook();
    27. }

    TC (Transaction Coordinator) - 事务协调者

    维护全局和分支事务的状态,驱动全局事务提交或回滚。

    TM (Transaction Manager) - 事务管理器

    定义全局事务的范围:开始全局事务、提交或回滚全局事务。

    RM (Resource Manager) - 资源管理器

    管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

    seata服务端就是TC,TM和RM在客户端,它们底层的通信使用netty实现。 

     熟悉spring aop的小伙伴应该知道,在bean的初始化的后置处理中,spring会为bean创建动态代理:

    1. public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
    2. if (bean != null) {
    3. Object cacheKey = getCacheKey(bean.getClass(), beanName);
    4. if (this.earlyProxyReferences.remove(cacheKey) != bean) {
    5. return wrapIfNecessary(bean, beanName, cacheKey);
    6. }
    7. }
    8. return bean;
    9. }

    重点是wrapIfNecessary方法,GlobalTransactionScanner重写了这个方法:

    1. protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
    2. try {
    3. synchronized (PROXYED_SET) {
    4. //PROXYED_SET为已经处理过的bean集合,处理过了就可以跳过
    5. if (PROXYED_SET.contains(beanName)) {
    6. return bean;
    7. }
    8. interceptor = null;
    9. //如果是TCC模式会进入下面的if,这里只看AT模式,跳过
    10. if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
    11. //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
    12. interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
    13. ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
    14. (ConfigurationChangeListener)interceptor);
    15. } else {
    16. //获取bean的类型及其接口类型(如果有的话)
    17. Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
    18. Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
    19. //判断类上或是方法上是否有GlobalTransactional注解,没有则直接返回
    20. if (!existsAnnotation(new Class[]{serviceInterface})
    21. && !existsAnnotation(interfacesIfJdk)) {
    22. return bean;
    23. }
    24. //初始化globalTransactionalInterceptor,spring aop的拦截器
    25. //在执行目标方法时,globalTransactionalInterceptor会对其拦截
    26. if (globalTransactionalInterceptor == null) {
    27. globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
    28. ConfigurationCache.addConfigListener(
    29. ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
    30. (ConfigurationChangeListener)globalTransactionalInterceptor);
    31. }
    32. interceptor = globalTransactionalInterceptor;
    33. }
    34. LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
    35. if (!AopUtils.isAopProxy(bean)) {
    36. //如果不是代理类,则调用父类方法创建动态代理,正常aop逻辑
    37. bean = super.wrapIfNecessary(bean, beanName, cacheKey);
    38. } else {
    39. //如果已经是代理类了,则只需
    40. //拿到该类的拦截器链,然后构建Advisor数组,加入拦截器链
    41. AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
    42. Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
    43. for (Advisor avr : advisor) {
    44. advised.addAdvisor(0, avr);
    45. }
    46. }
    47. //放入已处理集合
    48. PROXYED_SET.add(beanName);
    49. return bean;
    50. }
    51. } catch (Exception exx) {
    52. throw new RuntimeException(exx);
    53. }
    54. }

    创建动态代理的过程就不进去分析了,属于spring aop的基础知识。

    在真正执行方法时,被globalTransactionalInterceptor拦截,看下拦截器的invoke方法:

    1. public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
    2. Class<?> targetClass =
    3. methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
    4. //获取目标方法
    5. Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
    6. if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
    7. //找到桥接方法(如果有的话)
    8. final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
    9. //获取GlobalTransactional或GlobalLock注解
    10. final GlobalTransactional globalTransactionalAnnotation =
    11. getAnnotation(method, targetClass, GlobalTransactional.class);
    12. final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
    13. boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
    14. if (!localDisable) {
    15. if (globalTransactionalAnnotation != null) {
    16. //进入globalTransactional拦截逻辑
    17. return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
    18. } else if (globalLockAnnotation != null) {
    19. //进入globalLock拦截逻辑,如果不需要分布式事务,只需要获取全局锁,可以用globalLock注解
    20. return handleGlobalLock(methodInvocation, globalLockAnnotation);
    21. }
    22. }
    23. }
    24. return methodInvocation.proceed();
    25. }

    进入handleGlobalTransaction处理全局事务:

    1. Object handleGlobalTransaction(final MethodInvocation methodInvocation,
    2. final GlobalTransactional globalTrxAnno) throws Throwable {
    3. boolean succeed = true;
    4. try {
    5. //使用事务模板执行全局事务
    6. return transactionalTemplate.execute(new TransactionalExecutor() {
    7. @Override
    8. public Object execute() throws Throwable {
    9. return methodInvocation.proceed();
    10. }
    11. public String name() {
    12. String name = globalTrxAnno.name();
    13. if (!StringUtils.isNullOrEmpty(name)) {
    14. return name;
    15. }
    16. return formatMethod(methodInvocation.getMethod());
    17. }
    18. @Override
    19. public TransactionInfo getTransactionInfo() {
    20. // reset the value of timeout
    21. int timeout = globalTrxAnno.timeoutMills();
    22. if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
    23. timeout = defaultGlobalTransactionTimeout;
    24. }
    25. TransactionInfo transactionInfo = new TransactionInfo();
    26. transactionInfo.setTimeOut(timeout);
    27. transactionInfo.setName(name());
    28. transactionInfo.setPropagation(globalTrxAnno.propagation());
    29. transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
    30. transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
    31. Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
    32. for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
    33. rollbackRules.add(new RollbackRule(rbRule));
    34. }
    35. for (String rbRule : globalTrxAnno.rollbackForClassName()) {
    36. rollbackRules.add(new RollbackRule(rbRule));
    37. }
    38. for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
    39. rollbackRules.add(new NoRollbackRule(rbRule));
    40. }
    41. for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
    42. rollbackRules.add(new NoRollbackRule(rbRule));
    43. }
    44. transactionInfo.setRollbackRules(rollbackRules);
    45. return transactionInfo;
    46. }
    47. });
    48. } catch (TransactionalExecutor.ExecutionException e) {
    49. TransactionalExecutor.Code code = e.getCode();
    50. //如果全局事务异常,会根据异常代码进入相应的失败策略
    51. //对于提交或回滚失败,seata会不断重试,到达一定次数还是失败则退出并打印错误日志
    52. //不断重试基于netty的HashedWheelTimer
    53. switch (code) {
    54. case RollbackDone:
    55. throw e.getOriginalException();
    56. case BeginFailure:
    57. succeed = false;
    58. failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
    59. throw e.getCause();
    60. case CommitFailure:
    61. succeed = false;
    62. failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
    63. throw e.getCause();
    64. case RollbackFailure:
    65. failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
    66. throw e.getOriginalException();
    67. case RollbackRetrying:
    68. failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
    69. throw e.getOriginalException();
    70. default:
    71. throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
    72. }
    73. } finally {
    74. if (degradeCheck) {
    75. EVENT_BUS.post(new DegradeCheckEvent(succeed));
    76. }
    77. }
    78. }

    下面进入事务模板方法:

    1. public Object execute(TransactionalExecutor business) throws Throwable {
    2. // 1. Get transactionInfo
    3. TransactionInfo txInfo = business.getTransactionInfo();
    4. if (txInfo == null) {
    5. throw new ShouldNeverHappenException("transactionInfo does not exist");
    6. }
    7. // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
    8. GlobalTransaction tx = GlobalTransactionContext.getCurrent();
    9. // 1.2 Handle the transaction propagation.
    10. Propagation propagation = txInfo.getPropagation();
    11. SuspendedResourcesHolder suspendedResourcesHolder = null;
    12. try {
    13. switch (propagation) {
    14. case NOT_SUPPORTED:
    15. // If transaction is existing, suspend it.
    16. if (existingTransaction(tx)) {
    17. suspendedResourcesHolder = tx.suspend();
    18. }
    19. // Execute without transaction and return.
    20. return business.execute();
    21. case REQUIRES_NEW:
    22. // If transaction is existing, suspend it, and then begin new transaction.
    23. if (existingTransaction(tx)) {
    24. suspendedResourcesHolder = tx.suspend();
    25. tx = GlobalTransactionContext.createNew();
    26. }
    27. // Continue and execute with new transaction
    28. break;
    29. case SUPPORTS:
    30. // If transaction is not existing, execute without transaction.
    31. if (notExistingTransaction(tx)) {
    32. return business.execute();
    33. }
    34. // Continue and execute with new transaction
    35. break;
    36. case REQUIRED:
    37. // If current transaction is existing, execute with current transaction,
    38. // else continue and execute with new transaction.
    39. break;
    40. case NEVER:
    41. // If transaction is existing, throw exception.
    42. if (existingTransaction(tx)) {
    43. throw new TransactionException(
    44. String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
    45. , tx.getXid()));
    46. } else {
    47. // Execute without transaction and return.
    48. return business.execute();
    49. }
    50. case MANDATORY:
    51. // If transaction is not existing, throw exception.
    52. if (notExistingTransaction(tx)) {
    53. throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
    54. }
    55. // Continue and execute with current transaction.
    56. break;
    57. default:
    58. throw new TransactionException("Not Supported Propagation:" + propagation);
    59. }
    60. // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
    61. if (tx == null) {
    62. tx = GlobalTransactionContext.createNew();
    63. }
    64. // set current tx config to holder
    65. GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
    66. try {
    67. // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
    68. // else do nothing. Of course, the hooks will still be triggered.
    69. beginTransaction(txInfo, tx);
    70. Object rs;
    71. try {
    72. // Do Your Business
    73. rs = business.execute();
    74. } catch (Throwable ex) {
    75. // 3. The needed business exception to rollback.
    76. completeTransactionAfterThrowing(txInfo, tx, ex);
    77. throw ex;
    78. }
    79. // 4. everything is fine, commit.
    80. commitTransaction(tx);
    81. return rs;
    82. } finally {
    83. //5. clear
    84. resumeGlobalLockConfig(previousConfig);
    85. triggerAfterCompletion();
    86. cleanUp();
    87. }
    88. } finally {
    89. // If the transaction is suspended, resume it.
    90. if (suspendedResourcesHolder != null) {
    91. tx.resume(suspendedResourcesHolder);
    92. }
    93. }
    94. }

    这个方法很长,大致可以分为一下几个步骤:

    1. 获取事务信息:
      TransactionInfo txInfo = business.getTransactionInfo();
    2. 获取已存在的全局事务(如果存在的话,那么当前事务角色则为参与者):
      GlobalTransaction tx = GlobalTransactionContext.getCurrent();
    3. 处理事务传播行为:
      Propagation propagation = txInfo.getPropagation();...
    4. 如果不存在事务,则创建一个新的全局事务:
      tx = GlobalTransactionContext.createNew();
    5. 开始全局事务,像TC发起开始事务的请求:
      beginTransaction(txInfo, tx);
    6. 执行业务逻辑(目标方法):
      rs = business.execute();
    7. 回滚(需要的话):
      completeTransactionAfterThrowing(txInfo, tx, ex);
    8. 提交事务(需要的话):
      commitTransaction(tx);

    未完待续。。。 

  • 相关阅读:
    Java 数组
    开发一款流行的国内App可能用到的SDK功能分析
    【Numpy】深入剖析Numpy.arange()与range()的区别
    [网鼎杯 2020 朱雀组]Nmap 通过nmap写入木马 argcmd过滤实现逃逸
    关于useState、useEffect的一些误区和心得
    Dubbo的集群容错方案
    域控制器的深度详解!
    作业来了~~~
    跟着Datawhale重学数据结构与算法(3)---排序算法
    Serverless架构演进与实践
  • 原文地址:https://blog.csdn.net/w7sss/article/details/125485689