seata作为Spring-Cloud-Alibaba的一员,目前正逐渐成为分布式事务的一个事实标准。它是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。seata支持多种分布式事务实现,如TCC,SAGA,XA,以及独有的AT模式,AT模式是阿里官方推荐的模式,提供无侵入自动补偿的事务模式,目前已支持MySQL、Oracle、PostgreSQL、TiDB 和 MariaDB。下面结合Spring-Cloud-Alibaba介绍SEATA AT模式源码。
前置知识:SEATA的基本使用,熟悉SEATA客户端和服务端配置和注册中心配置。
seata的使用非常简单,几乎无侵入,只要在事务方法上添加@GlobalTransactional注解,使用方式和spring声明式事务很像,不仅如此,seata的AT模式在执行流程上也和spring声明式事务很像,只不过是把事务范围扩大到了跨库/跨应用/跨数据库连接。
像这样在一个类或方法上添加@GlobalTransactional注解:
- @GlobalTransactional
- public void testSeata(){
- resourceMapper.insert(new Resource("资源1","001"));
- String res=feignService.addResource(new Resource("资源2","002"));
- System.out.println(res);
- }
源码入口:SeataAutoConfiguration
- @Bean(BEAN_NAME_FAILURE_HANDLER)
- @ConditionalOnMissingBean(FailureHandler.class)
- public FailureHandler failureHandler() {
- return new DefaultFailureHandlerImpl();
- }
-
- @Bean
- @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
- @ConditionalOnMissingBean(GlobalTransactionScanner.class)
- public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Automatically configure Seata");
- }
- return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
- }
主要看GlobalTransactionScanner这个类,一个扫描器,它能够扫描@GlobalTransactional注解的方法,并为之创建aop切面,和声明式事务一个套路。不同的是GlobalTransactionScanner这个类一个人干了全部是的事儿,看一下它的继承情况:
public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean
AbstractAutoProxyCreator在spring aop中是一个非常重要的类,它是一个BeanPostProcessor,在bean的初始化后置调用中为bean创建动态代理。
首先看一下GlobalTransactionScanner的初始化方法:
- public void afterPropertiesSet() {
- if (disableGlobalTransaction) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Global transaction is disabled.");
- }
- ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)this);
- return;
- }
- if (initialized.compareAndSet(false, true)) {
- //初始化TM,RM客户端
- initClient();
- }
- }
-
- private void initClient() {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Initializing Global Transaction Clients ... ");
- }
- if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
- throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
- }
- //init TM
- TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
- //init RM
- RMClient.init(applicationId, txServiceGroup);
- registerSpringShutdownHook();
-
- }
TC (Transaction Coordinator) - 事务协调者
维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器
定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
seata服务端就是TC,TM和RM在客户端,它们底层的通信使用netty实现。
熟悉spring aop的小伙伴应该知道,在bean的初始化的后置处理中,spring会为bean创建动态代理:
- public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
- if (bean != null) {
- Object cacheKey = getCacheKey(bean.getClass(), beanName);
- if (this.earlyProxyReferences.remove(cacheKey) != bean) {
- return wrapIfNecessary(bean, beanName, cacheKey);
- }
- }
- return bean;
- }
重点是wrapIfNecessary方法,GlobalTransactionScanner重写了这个方法:
- protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
- try {
- synchronized (PROXYED_SET) {
- //PROXYED_SET为已经处理过的bean集合,处理过了就可以跳过
- if (PROXYED_SET.contains(beanName)) {
- return bean;
- }
- interceptor = null;
- //如果是TCC模式会进入下面的if,这里只看AT模式,跳过
- if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
- //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
- interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
- ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)interceptor);
- } else {
- //获取bean的类型及其接口类型(如果有的话)
- Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
- Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
- //判断类上或是方法上是否有GlobalTransactional注解,没有则直接返回
- if (!existsAnnotation(new Class[]{serviceInterface})
- && !existsAnnotation(interfacesIfJdk)) {
- return bean;
- }
-
- //初始化globalTransactionalInterceptor,spring aop的拦截器
- //在执行目标方法时,globalTransactionalInterceptor会对其拦截
- if (globalTransactionalInterceptor == null) {
- globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
- ConfigurationCache.addConfigListener(
- ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
- (ConfigurationChangeListener)globalTransactionalInterceptor);
- }
- interceptor = globalTransactionalInterceptor;
- }
-
- LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
- if (!AopUtils.isAopProxy(bean)) {
- //如果不是代理类,则调用父类方法创建动态代理,正常aop逻辑
- bean = super.wrapIfNecessary(bean, beanName, cacheKey);
- } else {
- //如果已经是代理类了,则只需
- //拿到该类的拦截器链,然后构建Advisor数组,加入拦截器链
- AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
- Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
- for (Advisor avr : advisor) {
- advised.addAdvisor(0, avr);
- }
- }
- //放入已处理集合
- PROXYED_SET.add(beanName);
- return bean;
- }
- } catch (Exception exx) {
- throw new RuntimeException(exx);
- }
- }
创建动态代理的过程就不进去分析了,属于spring aop的基础知识。
在真正执行方法时,被globalTransactionalInterceptor拦截,看下拦截器的invoke方法:
- public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
- Class<?> targetClass =
- methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
- //获取目标方法
- Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
- if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
- //找到桥接方法(如果有的话)
- final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
- //获取GlobalTransactional或GlobalLock注解
- final GlobalTransactional globalTransactionalAnnotation =
- getAnnotation(method, targetClass, GlobalTransactional.class);
- final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
- boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
- if (!localDisable) {
- if (globalTransactionalAnnotation != null) {
- //进入globalTransactional拦截逻辑
- return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
- } else if (globalLockAnnotation != null) {
- //进入globalLock拦截逻辑,如果不需要分布式事务,只需要获取全局锁,可以用globalLock注解
- return handleGlobalLock(methodInvocation, globalLockAnnotation);
- }
- }
- }
- return methodInvocation.proceed();
- }
进入handleGlobalTransaction处理全局事务:
- Object handleGlobalTransaction(final MethodInvocation methodInvocation,
- final GlobalTransactional globalTrxAnno) throws Throwable {
- boolean succeed = true;
- try {
- //使用事务模板执行全局事务
- return transactionalTemplate.execute(new TransactionalExecutor() {
- @Override
- public Object execute() throws Throwable {
- return methodInvocation.proceed();
- }
-
- public String name() {
- String name = globalTrxAnno.name();
- if (!StringUtils.isNullOrEmpty(name)) {
- return name;
- }
- return formatMethod(methodInvocation.getMethod());
- }
-
- @Override
- public TransactionInfo getTransactionInfo() {
- // reset the value of timeout
- int timeout = globalTrxAnno.timeoutMills();
- if (timeout <= 0 || timeout == DEFAULT_GLOBAL_TRANSACTION_TIMEOUT) {
- timeout = defaultGlobalTransactionTimeout;
- }
-
- TransactionInfo transactionInfo = new TransactionInfo();
- transactionInfo.setTimeOut(timeout);
- transactionInfo.setName(name());
- transactionInfo.setPropagation(globalTrxAnno.propagation());
- transactionInfo.setLockRetryInternal(globalTrxAnno.lockRetryInternal());
- transactionInfo.setLockRetryTimes(globalTrxAnno.lockRetryTimes());
- Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
- for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
- rollbackRules.add(new RollbackRule(rbRule));
- }
- for (String rbRule : globalTrxAnno.rollbackForClassName()) {
- rollbackRules.add(new RollbackRule(rbRule));
- }
- for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
- rollbackRules.add(new NoRollbackRule(rbRule));
- }
- for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
- rollbackRules.add(new NoRollbackRule(rbRule));
- }
- transactionInfo.setRollbackRules(rollbackRules);
- return transactionInfo;
- }
- });
- } catch (TransactionalExecutor.ExecutionException e) {
- TransactionalExecutor.Code code = e.getCode();
- //如果全局事务异常,会根据异常代码进入相应的失败策略
- //对于提交或回滚失败,seata会不断重试,到达一定次数还是失败则退出并打印错误日志
- //不断重试基于netty的HashedWheelTimer
- switch (code) {
- case RollbackDone:
- throw e.getOriginalException();
- case BeginFailure:
- succeed = false;
- failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
- throw e.getCause();
- case CommitFailure:
- succeed = false;
- failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
- throw e.getCause();
- case RollbackFailure:
- failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
- throw e.getOriginalException();
- case RollbackRetrying:
- failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
- throw e.getOriginalException();
- default:
- throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
- }
- } finally {
- if (degradeCheck) {
- EVENT_BUS.post(new DegradeCheckEvent(succeed));
- }
- }
- }
下面进入事务模板方法:
- public Object execute(TransactionalExecutor business) throws Throwable {
- // 1. Get transactionInfo
- TransactionInfo txInfo = business.getTransactionInfo();
- if (txInfo == null) {
- throw new ShouldNeverHappenException("transactionInfo does not exist");
- }
- // 1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.
- GlobalTransaction tx = GlobalTransactionContext.getCurrent();
-
- // 1.2 Handle the transaction propagation.
- Propagation propagation = txInfo.getPropagation();
- SuspendedResourcesHolder suspendedResourcesHolder = null;
- try {
- switch (propagation) {
- case NOT_SUPPORTED:
- // If transaction is existing, suspend it.
- if (existingTransaction(tx)) {
- suspendedResourcesHolder = tx.suspend();
- }
- // Execute without transaction and return.
- return business.execute();
- case REQUIRES_NEW:
- // If transaction is existing, suspend it, and then begin new transaction.
- if (existingTransaction(tx)) {
- suspendedResourcesHolder = tx.suspend();
- tx = GlobalTransactionContext.createNew();
- }
- // Continue and execute with new transaction
- break;
- case SUPPORTS:
- // If transaction is not existing, execute without transaction.
- if (notExistingTransaction(tx)) {
- return business.execute();
- }
- // Continue and execute with new transaction
- break;
- case REQUIRED:
- // If current transaction is existing, execute with current transaction,
- // else continue and execute with new transaction.
- break;
- case NEVER:
- // If transaction is existing, throw exception.
- if (existingTransaction(tx)) {
- throw new TransactionException(
- String.format("Existing transaction found for transaction marked with propagation 'never', xid = %s"
- , tx.getXid()));
- } else {
- // Execute without transaction and return.
- return business.execute();
- }
- case MANDATORY:
- // If transaction is not existing, throw exception.
- if (notExistingTransaction(tx)) {
- throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
- }
- // Continue and execute with current transaction.
- break;
- default:
- throw new TransactionException("Not Supported Propagation:" + propagation);
- }
-
- // 1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.
- if (tx == null) {
- tx = GlobalTransactionContext.createNew();
- }
-
- // set current tx config to holder
- GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
-
- try {
- // 2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,
- // else do nothing. Of course, the hooks will still be triggered.
- beginTransaction(txInfo, tx);
-
- Object rs;
- try {
- // Do Your Business
- rs = business.execute();
- } catch (Throwable ex) {
- // 3. The needed business exception to rollback.
- completeTransactionAfterThrowing(txInfo, tx, ex);
- throw ex;
- }
-
- // 4. everything is fine, commit.
- commitTransaction(tx);
-
- return rs;
- } finally {
- //5. clear
- resumeGlobalLockConfig(previousConfig);
- triggerAfterCompletion();
- cleanUp();
- }
- } finally {
- // If the transaction is suspended, resume it.
- if (suspendedResourcesHolder != null) {
- tx.resume(suspendedResourcesHolder);
- }
- }
- }
这个方法很长,大致可以分为一下几个步骤:
TransactionInfo txInfo = business.getTransactionInfo();
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
Propagation propagation = txInfo.getPropagation();...
tx = GlobalTransactionContext.createNew();
beginTransaction(txInfo, tx);
rs = business.execute();
completeTransactionAfterThrowing(txInfo, tx, ex);
commitTransaction(tx);
未完待续。。。