• spring4.0事务超详细源码解析


    事务

    github:https://github.com/Notelzg/spring-reader
    目前事务都是基于mysql的事务,其特性是 atomicity(原子性), consistency(一致性), isolation(隔离性), durability(持久性)
    ACID四大特性。spring的事务就是在spring中对事务进行适配,通过模板的设计模式来避免 重复的代码,把重复性的开始事务、关闭事务、回滚等重复事情做了,而业务侧值只需要写业务代码
    通过注解的方式开启事务就好了。这种方式提高了开发的效率,对事务进行代理对开发透明。 对后续的扩展比较方便。而且如果模板方法有问题,可以统一升级,而且避免了个人写事务导致的错误。
    看到这里大家肯定会想到了spring的AOP,像这种重复的,有固定入口的,特别适合切面来做。spring 事务 也是实现了事务的切面增强,基于Spring AOP技术。

    开启配置

    在配置文件中,通过下面的配置开启对事务的支持。 这个如果你没有开启AOP的配置,这里会默认注册AOP,因为事务是基于AOP做的。

     //proxy-target-class 这里是tru,由于基于AOP所以这里指的是AOP的属性,是适应CgLib,还是JDK动态代理
     // 为true标识,代理类使用CgLib生成子类进行代理。如果Spring AOP还没有配置,这里会先注册AOP的 beanPost
     
      // 这里的false不会生效, 下面不会覆盖上面的(因为false不进行处理,true的时候属性设置为true,上面
      //这里是存在冲突的,请注意,可能导致事务失效,
     
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    配置激活的beanPost

    1, xml命名空间和java类之间的映射 TxNamespaceHandler这个命名空间都是通过配置文件配置的, spring会加载所有的命名空间

    spring-tx-4.3.30.RELEASE.jar/META-INF/spring.handlers
    文件内容:
    http\://www.springframework.org/schema/tx=org.springframework.transaction.config.TxNamespaceHandler
    
    
    • 1
    • 2
    • 3
    • 4

    2, java事务命名空间解析

    public class TxNamespaceHandler extends NamespaceHandlerSupport {
      static final String TRANSACTION_MANAGER_ATTRIBUTE = "transaction-manager";
      static final String DEFAULT_TRANSACTION_MANAGER_BEAN_NAME = "transactionManager";
    
      public void init() {
        this.registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
         // 上面配置的属性: annotation-driven  这个激活了这个配置
        this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
        this.registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3, 注册事务需要用到的切面 AnnotationDrivenBeanDefinitionParser.java
    xml文件解析, annotation-driven

    public BeanDefinition parse(Element element, ParserContext parserContext) {
        this.registerTransactionalEventListenerFactory(parserContext);
        String mode = element.getAttribute("mode");
        // 这个应该是使用aspectJ的支持,目前我们都没有使用
        if ("aspectj".equals(mode)) {
        this.registerTransactionAspect(element, parserContext);
        } else {
        // 正常情况下都走的这里
        AnnotationDrivenBeanDefinitionParser.AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    注册 advice, pointCut, 织入 需要的类

    // 专门定义了一个内部类来专门处理AOP相关的,也体现了设计模式的开闭原则
    private static class AopAutoProxyConfigurer {
        private AopAutoProxyConfigurer() {
        }
    
        public static void configureAutoProxyCreator(Element element, ParserContext parserContext) {
          AopNamespaceUtils.registerAutoProxyCreatorIfNecessary(parserContext, element);
          String txAdvisorBeanName = "org.springframework.transaction.config.internalTransactionAdvisor";
          if (!parserContext.getRegistry().containsBeanDefinition(txAdvisorBeanName)) {
            // 对spring 事务注解进行解析, 获取方法/类上面的事务属性,在TransactionAttributeSourcePointcut的时候使用
            Object eleSource = parserContext.extractSource(element);
            RootBeanDefinition sourceDef = new RootBeanDefinition("org.springframework.transaction.annotation.AnnotationTransactionAttributeSource");
            sourceDef.setSource(eleSource);
            sourceDef.setRole(2);
            String sourceName = parserContext.getReaderContext().registerWithGeneratedName(sourceDef);
            
            // 事务增强器,实现了advice 
            RootBeanDefinition interceptorDef = new RootBeanDefinition(TransactionInterceptor.class);
            interceptorDef.setSource(eleSource);
            interceptorDef.setRole(2);
            // 这里设置 transctionManager 类名称, 这里新增属性,key:transactionManagerBeanName, 
            //value: transactionManager(配置文件里面配置的,transaction-manager:)
            AnnotationDrivenBeanDefinitionParser.registerTransactionManager(element, interceptorDef);
            // 设置解析事务注解类,这里就可以看出把pointCut 和 事务解析 分离的好处了 
            interceptorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
            String interceptorName = parserContext.getReaderContext().registerWithGeneratedName(interceptorDef);
            
            // advisor,里面包含了 advice 和 pointCut
            RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
            advisorDef.setSource(eleSource);
            advisorDef.setRole(2);
            // 把事务解析器注入, 这里是RuntimeBeanReference类型, 初始化bean的时候就会去生成该bean的对象
            // 这里是因为该类在pointCut静态检查的时候就需要使用,所以先进行初始化
            advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
            // 把事务增强器注入, 这里是字符串类型,为什么呢,初始化bean的时候不会获取增强器的bean的对象?
            // 这里设置成string,是为了延迟初始化增强器,因为增强器是在生成具体的代理的时候才需要生成,并且使用,
            // 所以可以延迟加载生成对象,这里也符合spring的哲学,能延迟加载的就延迟加载
            // 这里设置拦截器的名称 ,然后通过 父类的getAdvice()方法去获取具体对象
            advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
            if (element.hasAttribute("order")) {
              advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
            }
            parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
            CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
            compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
            compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
            compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
            parserContext.registerComponent(compositeDef);
          }
    
        }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    advisor

    org.springframework.transaction.interceptor.BeanFactoryTransactionAttributeSourceAdvisor
    该类聚合了切点,和增强器,所以从这里开始分析,由于其实现了Advisor接口
    所以BeanFactoryAdvisorRetrievalHelper.findAdvisorBeans
    所以BeanFactoryAdvisorRetrievalHelper是在AOP里面自动开启的,具体细节可以看AOP解析。
    AbstractAdvisorAutoProxyCreator.getAdvicesAndAdvisorsForBean 检查该类实现的所有接口,包含该类
    检查所有方法,是否需要进行增强。
    可以获取到事务的切面。

    // 继承了 AbstractBeanFactoryPointcutAdvisor 有默认实现 getAdvice,只要设置adviceBeanName即可
    // 在注册bena的时候,已经设置了adviceBeanName, 其类是:TransactionInterceptor.java
    public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {
    
    	private TransactionAttributeSource transactionAttributeSource;
    
        //切点,这个类是一个abstract, getTransactionAttributeSource 是子类实现,提供具体的解析,
        // TransactionAttributeSourcePointcut 提供整体的流程处理, 可以看做是一个模板类,处理统一的流程
    	private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
    		@Override
    		protected TransactionAttributeSource getTransactionAttributeSource() {
    			return transactionAttributeSource;
    		}
    	};
    
    
    	/**
    	 * Set the transaction attribute source which is used to find transaction
    	 * attributes. This should usually be identical to the source reference
    	 * set on the transaction interceptor itself.
    	 * @see TransactionInterceptor#setTransactionAttributeSource
    	 */
    	public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
    		this.transactionAttributeSource = transactionAttributeSource;
    	}
    
    	/**
    	 * Set the {@link ClassFilter} to use for this pointcut.
    	 * Default is {@link ClassFilter#TRUE}.
    	 */
    	public void setClassFilter(ClassFilter classFilter) {
    		this.pointcut.setClassFilter(classFilter);
    	}
    
        // 获取切点, 对类,方法
    	@Override
    	public Pointcut getPointcut() {
    		return this.pointcut;
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    切点

    org.springframework.transaction.interceptor.TransactionAttributeSourcePointcut

    abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {
        // 个人任务这里包装一层,把pointCut和 TransactionAttributeSource解析组合起来,方便后续的扩展, 同时也是
       // 降低了耦合度,复杂度,方便维护和扩展
    	@Override
    	public boolean matches(Method method, Class<?> targetClass) {
    		if (targetClass != null && TransactionalProxy.class.isAssignableFrom(targetClass)) {
    			return false;
    		}
    		// 子类实现该方法, 通过 AopAutoProxyConfigurer类,我们知道这里的source是
            // org.springframework.transaction.annotation.AnnotationTransactionAttributeSource
    		TransactionAttributeSource tas = getTransactionAttributeSource();
    		// 改属性正常情况下都不能为空,如果为空有问题了,事务属性解析不为空,则标识是事务,需要增强
    		return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
    	}
        // 子类实现这个方法,提供具体的注解解析实现,
    	protected abstract TransactionAttributeSource getTransactionAttributeSource();
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    事务解析的整体流程

    抽象父类,处理整理流程
    Fallback 为什么有FallBack是因为解析属性的时候,会有很多倒退的操作
    来保证能处理大部分的情况。

    public abstract class AbstractFallbackTransactionAttributeSource implements TransactionAttributeSource {
        // 获取事务的属性
    	@Override
    	public TransactionAttribute getTransactionAttribute(Method method, Class targetClass) {
    	    // object类肯定不是,直接结束
    		if (method.getDeclaringClass() == Object.class) {
    			return null;
    		}
    
    		// First, see if we have a cached value.
    		Object cacheKey = getCacheKey(method, targetClass);
    		TransactionAttribute cached = this.attributeCache.get(cacheKey);
    		if (cached != null) {
    			// Value will either be canonical value indicating there is no transaction attribute,
    			// or an actual transaction attribute.
    			if (cached == NULL_TRANSACTION_ATTRIBUTE) {
    				return null;
    			}
    			else {
    				return cached;
    			}
    		}
    		else {
    			// 解析细节
    			TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
    			// Put it in the cache.
    			if (txAttr == null) {
    				this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
    			}
    			else {
    				String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
    				if (txAttr instanceof DefaultTransactionAttribute) {
    					((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
    				}
    				if (logger.isDebugEnabled()) {
    					logger.debug("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
    				}
    				this.attributeCache.put(cacheKey, txAttr);
    			}
    			return txAttr;
    		}
    	}
    
    	
    	protected TransactionAttribute computeTransactionAttribute(Method method, Class targetClass) {
    		// 公共方法检查,非公共方法直接结束
    		if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
    			return null;
    		}
    
            // 如果是cglib生成的代理,则获取其父类
    		Class userClass = ClassUtils.getUserClass(targetClass);
    		// 如果method 是来自接口的,则返回 userClass中的method,返回实现类中的method
    		Method specificMethod = ClassUtils.getMostSpecificMethod(method, userClass);
    		// If we are dealing with method with generic parameters, find the original method.
    		specificMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
    
    		// First try is the method in the target class.
    		// 调用子类的具体实现,这里是子类进行扩展, 比如对TranactionL 注解的解析,如果方法上解析成功,直接返回
    		TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
    		if (txAttr != null) {
    			return txAttr;
    		}
    
    		// Second try is the transaction attribute on the target class.
    		// 方法上面没有,则进行类上面检查,同样是子类实现,比如注解解析
    		txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
    		if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
    			return txAttr;
    		}
    
            // 倒退操作,尽可能的把事务解析出来
    		if (specificMethod != method) {
    			// Fallback is to look at the original method.
    			txAttr = findTransactionAttribute(method);
    			if (txAttr != null) {
    				return txAttr;
    			}
    			// Last fallback is the class of the original method.
    			txAttr = findTransactionAttribute(method.getDeclaringClass());
    			if (txAttr != null && ClassUtils.isUserLevelMethod(method)) {
    				return txAttr;
    			}
    		}
    
    		return null;
    	}
    
    	protected abstract TransactionAttribute findTransactionAttribute(Class clazz);
    	protected abstract TransactionAttribute findTransactionAttribute(Method method);
    	protected boolean allowPublicMethodsOnly() { return false; }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92

    事务注解解析入口

    org.springframework.transaction.annotation.AnnotationTransactionAttributeSource
    这里的具体解析是通过父类(AbstractFallbackTransactionAttributeSource)进行的,该类实现了父类的方法,进行扩展

    // 注解解析类,继承一个抽象模板类,这个也是属于sprig的特色了,把统一处理的流程放在抽象类中,把经常变动/扩展的部分作为
    // 抽象方法进行(封装变化), 达到开闭原则
    public class AnnotationTransactionAttributeSource extends AbstractFallbackTransactionAttributeSource implements Serializable {
      // 只支持 public方法上面的 事务注解
      private final boolean publicMethodsOnly;
      // 注解解析类
      private final Set annotationParsers;
    
      public AnnotationTransactionAttributeSource() {
        this(true);
      }
    
      public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
        this.publicMethodsOnly = publicMethodsOnly;
        this.annotationParsers = new LinkedHashSet(4);
        // 默认支持 spring 事务解析, 目前事务注解都是走的这里
        this.annotationParsers.add(new SpringTransactionAnnotationParser());
      }
    
      protected TransactionAttribute findTransactionAttribute(Class clazz) {
        return this.determineTransactionAttribute(clazz);
      }
    
      // 这两个方法 返回值、方法名称一样,入参不一样,属于静态的多态
      protected TransactionAttribute findTransactionAttribute(Method method) {
        return this.determineTransactionAttribute(method);
      }
      
      // 解析类、方法上面的注解
      protected TransactionAttribute determineTransactionAttribute(AnnotatedElement element) {
        if (element.getAnnotations().length > 0) {
          // 解析的类可能存在多个,所以是循环
          Iterator var2 = this.annotationParsers.iterator();
    
          while(var2.hasNext()) {
            TransactionAnnotationParser annotationParser = (TransactionAnnotationParser)var2.next();
            // 开始机械
            TransactionAttribute attr = annotationParser.parseTransactionAnnotation(element);
            // 解析就快速结束
            if (attr != null) {
              return attr;
            }
          }
        }
    
        return null;
      }
    
      protected boolean allowPublicMethodsOnly() {
        return this.publicMethodsOnly;
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    事务注解解析器

    一个接口,一个实现类,面向接口编程
    比较符合先设计,再实现。spring这个模式非常的好。但是工作中有的时候很难做到
    这种方法确实是和扩展。同时发现spring中类名起的都很好,形容词 名称 动词
    每次自己起名字是真的难

    public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {
    
    	@Override
    	public TransactionAttribute parseTransactionAnnotation(AnnotatedElement element) {
    	   // 获取事务注解的所有属性
    		AnnotationAttributes attributes = AnnotatedElementUtils.getMergedAnnotationAttributes(
    				element, Transactional.class);
    		// 解析		
    		if (attributes != null) {
    			return parseTransactionAnnotation(attributes);
    		}
    		else {
    			return null;
    		}
    	}
    
        // 从这里就可以知道,spring 事务注解支持哪些配置
    	protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
    		RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
    
            // 传播范围
    		Propagation propagation = attributes.getEnum("propagation");
    		rbta.setPropagationBehavior(propagation.value());
    		// 隔离级别
    		Isolation isolation = attributes.getEnum("isolation");
    		rbta.setIsolationLevel(isolation.value());
    		// 超时时间
    		rbta.setTimeout(attributes.getNumber("timeout").intValue());
    		// 只读
    		rbta.setReadOnly(attributes.getBoolean("readOnly"));
    		rbta.setQualifier(attributes.getString("value"));
    
    		List rollbackRules = new ArrayList();
    		// 什么异常进行回滚
    		for (Class rbRule : attributes.getClassArray("rollbackFor")) {
    			rollbackRules.add(new RollbackRuleAttribute(rbRule));
    		}
    		// 什么类回滚
    		for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
    			rollbackRules.add(new RollbackRuleAttribute(rbRule));
    		}
    		// 哪些类不需要回滚
    		for (Class rbRule : attributes.getClassArray("noRollbackFor")) {
    			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    		}
    		// 哪些类名不需要回滚
    		for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
    			rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
    		}
    		rbta.setRollbackRules(rollbackRules);
    
    		return rbta;
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    增强器

    package org.springframework.transaction.interceptor;
    TransactionInterceptor.java
    就是具体的事务增强器

    增强器对象的获取

    
    package org.springframework.aop.support;
    public abstract class AbstractBeanFactoryPointcutAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {
    
    	private String adviceBeanName;
    
    	// transient 序列化的时候,不进行序列化
        // volatile 关键字是可见性,不加可以吗?
        // 其实这里可以不加这关键字,因为单例本身是做了检查的,可以保证对象只创建一次, 加上就可以不用每次都走单例的流程,
        // 在这里就可以直接解决了,加快是了速度,所以这里加这个关键字主要是为了单例对象使用(大部分都是单例)
        // 而原始对象由于初始化的是有 synchronized关键字,所以可以保证其有序性、原子性
    	private transient volatile Advice advice;
        // transient 序列化的时候,不进行序列化
        // 没有明白这里使用 volatile关键字的作用,由于这个对象会暴露出去,同时可以被重置,所以
        // 需要加上 volatile 关键字,保证其可见性
        private transient volatile Object adviceMonitor = new Object();
    
      // org.springframework.transaction.interceptor.BeanFactoryTransactionAttributeSourceAdvisor
        // 继承了该类, 同时设置了adviceBeanName, TransactionInterceptor
    	public void setAdviceBeanName(String adviceBeanName) {
    		this.adviceBeanName = adviceBeanName;
    	}
    
    	@Override
    	public Advice getAdvice() {
    	    // 单例,已经实例化的直接返回
    		Advice advice = this.advice;
    		if (advice != null || this.adviceBeanName == null) {
    			return advice;
    		}
    
    		Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve 'adviceBeanName'");
    		if (this.beanFactory.isSingleton(this.adviceBeanName)) {
    			// 直接通过容器初始化增强器,由于增强器beanDefinition 在xml解析阶段已经注册,所以这里通过beanName可以完成bean的对象的
                // 初始化
    			advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
    			this.advice = advice;
    			return advice;
    		}
    		
    		else {
    			// No singleton guarantees from the factory -> let's lock locally but
    			// reuse the factory's singleton lock, just in case a lazy dependency
    			// of our advice bean happens to trigger the singleton lock implicitly...
              // 对于非单利的,也要进行单利处理,springAOP只能使用单利,因为如果使用非单例,将会生成n个对象
              // 对内存造成非常大的浪费,如果有1000个类,则要生成1000个对象,完成没有必要,所以切面本身要线程
              // 安全,因为切面通过锁巧妙的实现了单例
              synchronized (this.adviceMonitor) {
    				if (this.advice == null) {
    					this.advice = this.beanFactory.getBean(this.adviceBeanName, Advice.class);
    				}
    				return this.advice;
    			}
    		}
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    增强器对象的具体实现细节

    这里是

    public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    
    	public TransactionInterceptor() {
    	}
    
    	@Override
    	public Object invoke(final MethodInvocation invocation) throws Throwable {
    		// Work out the target class: may be {@code null}.
    		// The TransactionAttributeSource should be passed the target class
    		// as well as the method, which may be from an interface.
    		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
    		// 调用父类的方法, 通过回调方法,进行扩展
    		return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
    			@Override
    			public Object proceedWithInvocation() throws Throwable {
    				return invocation.proceed();
    			}
    		});
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    事务操作整体流程

    public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    	/**
    	 * Key to use to store the default transaction manager.
    	 */
    	private static final Object DEFAULT_TRANSACTION_MANAGER_KEY = new Object();
    
    	// 事务信息,放到这里,进行方法调用的时候进行传播
    	private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
    			new NamedThreadLocal<TransactionInfo>("Current aspect-driven transaction");
    
    	// bean的两个属性,bean初始化的时候必须设置,否则会报错,后面会检查
    	private PlatformTransactionManager transactionManager;
    	private TransactionAttributeSource transactionAttributeSource;
    
    	// spring 事务处理的主流程
    	protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
    			throws Throwable {
    
            // 解析事务属性
    		final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    		// 获取事务管理器,根据配置文件中配置的事务管理beanId, 从spring容器中生成事务管理器bean(bean也在配置文件中配置了) 
    		final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    		// 增强方法标识, com.lzg.note.aop.TestA.insert(包名+类名+方法名)
    		final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    		// true 走这里 
    		if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    			// Standard transaction demarcation with getTransaction and commit/rollback calls.
                // 生成事务如果需要的,根据事务的传播行为
    			TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    			Object retVal;
    			try {
                    // 回调处理,继续走其他的切面增强,和目标方法
    				retVal = invocation.proceedWithInvocation();
    			}
    			catch (Throwable ex) {
    				// target invocation exception
    				completeTransactionAfterThrowing(txInfo, ex);
    				throw ex;
    			}
    			finally {
    			    // 清除事务相关的数据, 主要是thradLocal中的,同时会恢复之前被挂起的事务(如果之前有挂起的事务的话)
    				cleanupTransactionInfo(txInfo);
    			}
    			// 目标方法处理完成,处理完成,事务提交,或者回滚, 最后会进行 resume, 如果之前进行过suspend操作的话
    			commitTransactionAfterReturning(txInfo);
    			return retVal;
    		} else {// 非核心代码删除 }
    	}
    
    	/**
    	 * 获取事务管理器,根据配置
    	 */
    	protected PlatformTransactionManager determineTransactionManager(TransactionAttribute txAttr){
            // Do not attempt to lookup tx manager if no tx attributes are set
            if (txAttr == null || this.beanFactory == null) {
              return getTransactionManager();
            }
    
            String qualifier = txAttr.getQualifier();
            if (StringUtils.hasText(qualifier)) {
              return determineQualifiedTransactionManager(qualifier);
            }
            // 解析配置文件的时候已经设置了 
            else if (StringUtils.hasText(this.transactionManagerBeanName)) {
              return determineQualifiedTransactionManager(this.transactionManagerBeanName);
            } else { //删除非核心代码 }
            }
            private PlatformTransactionManager determineQualifiedTransactionManager (String qualifier){
              PlatformTransactionManager txManager = this.transactionManagerCache.get(qualifier);
              if (txManager == null) {
                // 从容器中获取
                txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
                        this.beanFactory, PlatformTransactionManager.class, qualifier);
                this.transactionManagerCache.putIfAbsent(qualifier, txManager);
              }
              return txManager;
            }
    
            // 根据传播机制,进行生成/取出/挂起已有的事务	
            protected TransactionInfo createTransactionIfNecessary (
                    PlatformTransactionManager tm, TransactionAttribute txAttr,
            final String joinpointIdentification){
    
              TransactionStatus status = null;
              if (txAttr != null) {
                if (tm != null) {
                  // 根据传播机制,进行生成/取出/挂起已有的事务
                  status = tm.getTransaction(txAttr);
                }
              }
              // 包装事务状态到事务信息中, txInfo
              return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
            }
    
            protected TransactionInfo prepareTransactionInfo (PlatformTransactionManager tm,
                    TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus
            status){
    
              TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
              // The transaction manager will flag an error if an incompatible tx already exists.
              txInfo.newTransactionStatus(status);
              // 设置到threadLocal, 同时如果已经有txInfo存在,则把上一个保存到oldTxInfo字段中,后续resumer的时候会用到
              txInfo.bindToThread();
              return txInfo;
            }
    
            /**
             * Execute after successful completion of call, but not after an exception was handled.
             * Do nothing if we didn't create a transaction.
             * @param txInfo information about the current transaction
             */
            protected void commitTransactionAfterReturning (TransactionInfo txInfo){
              if (txInfo != null && txInfo.hasTransaction()) {
                txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
              }
            }
    
            /**
             * Handle a throwable, completing the transaction.
             * We may commit or roll back, depending on the configuration.
             * @param txInfo information about the current transaction
             * @param ex throwable encountered
             */
            protected void completeTransactionAfterThrowing (TransactionInfo txInfo, Throwable ex){
              if (txInfo != null && txInfo.hasTransaction()) {
                if (logger.isTraceEnabled()) {
                  logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                          "] after exception: " + ex);
                }
                if (txInfo.transactionAttribute.rollbackOn(ex)) {
                  try {
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                  } catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                  } catch (RuntimeException ex2) {
                    logger.error("Application exception overridden by rollback exception", ex);
                    throw ex2;
                  } catch (Error err) {
                    logger.error("Application exception overridden by rollback error", ex);
                    throw err;
                  }
                } else {
                  // We don't roll back on this exception.
                  // Will still roll back if TransactionStatus.isRollbackOnly() is true.
                  try {
                    txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
                  } catch (TransactionSystemException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    ex2.initApplicationException(ex);
                    throw ex2;
                  } catch (RuntimeException ex2) {
                    logger.error("Application exception overridden by commit exception", ex);
                    throw ex2;
                  } catch (Error err) {
                    logger.error("Application exception overridden by commit error", ex);
                    throw err;
                  }
                }
              }
            }
    
            protected void cleanupTransactionInfo (TransactionInfo txInfo){
              if (txInfo != null) {
                txInfo.restoreThreadLocalStatus();
              }
            }
    
            protected final class TransactionInfo {
              private void bindToThread() {
                this.oldTransactionInfo = transactionInfoHolder.get();
                transactionInfoHolder.set(this);
              }
    
              private void restoreThreadLocalStatus() {
                transactionInfoHolder.set(this.oldTransactionInfo);
              }
            }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180

    事务管理器抽象类处理主流程

    org.springframework.transaction.support.AbstractPlatformTransactionManager

    public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    
      // 获取事务
      public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
        // 具体获取事务,由子类实现,DataSourceTransactionManager, 该类是spring-jdbc中去做,是另外一个jar包,定义和实现分离
        // 如果当前线程的ThradLcoal中已经存在一个,则这里拿到的是已有的连接
        Object transaction = doGetTransaction();
    
        // 如果当前事务,是之前创建的,就根据传播机制进行重新生成、挂起,等操作
        // 不管是什么传播机制,除了需要抛出异常的情况,其他情况都会重新生成一个新的TransactionStatus
        // 但是 transaction可能一样,如果是都 PROPAGATION_REQUIRE
        // 这个是因为每个@Transactionl 注解的属性可能不一样,但是其数据库连接是复用的(如果不需要生成新的事务的情况下)
        // 我们都知道数据库连接资源是有限的,所以在一个事务中,应该尽可能使用同一个事务连接,非不要不要创建新的
        if (isExistingTransaction(transaction)) {
          return handleExistingTransaction(definition, transaction, debugEnabled);
        }
        // 当前没有事务存在, 抛出异常,如果有事务在上一步返回
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
          throw new IllegalTransactionStateException(
                  "No existing transaction found for transaction marked with propagation 'mandatory'");
        } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
          SuspendedResourcesHolder suspendedResources = suspend(null);
          try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 开始事务, 获取数据库连接的连接,设置超时时间,隔离级别等事务的准备工作
            doBegin(transaction, definition);
            // 把当前事务设置threadLocal中,后续使用,主要是为了线程内的同步(一个事务跨越了多个线程帧栈)
            prepareSynchronization(status, definition);
            return status;
          } catch (RuntimeException ex) {
            resume(null, suspendedResources);
            throw ex;
          } catch (Error err) {
            resume(null, suspendedResources);
            throw err;
          }
        }
      }
    
      // 把当前事务信息,设置到threLocal里面,为了后续同步控制使用
      protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
        if (status.isNewSynchronization()) {
          TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
          TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                  definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                          definition.getIsolationLevel() : null);
          TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
          TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
          TransactionSynchronizationManager.initSynchronization();
        }
      }
    
      // 传播机制的设置
      private TransactionStatus handleExistingTransaction(
              TransactionDefinition definition, Object transaction, boolean debugEnabled)
              throws TransactionException {
    
        // 不需要事务,则抛出异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
          throw new IllegalTransactionStateException(
                  "Existing transaction found for transaction marked with propagation 'never'");
        }
    
        // 不支持事务,则挂起当前事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
          // 挂起事务,把旧事务信息保存到  suspendedResources, 每个事务状态中会保存上一个挂起的事务,resume 的时候进行恢复
          // 通过保存前一个事务的方式,把多个事务链接起来,形成一个链表
          // 把当前的dataSrouce 和 数据库连接,从threadLocal中去掉,解除绑定
          Object suspendedResources = suspend(transaction);
          boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
          // 事务设置为空
          return prepareTransactionStatus(
                  definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        // 需要重新生成事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
          SuspendedResourcesHolder suspendedResources = suspend(transaction);
          try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 重新开启事务, 创建新的数据库连接,数据库连接资源有限,所以尽可能少的创建新的连接
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
          } catch (RuntimeException beginEx) {
            // 遇到异常,会进行回滚
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
          } catch (Error beginErr) {
            resumeAfterBeginException(transaction, suspendedResources, beginErr);
            throw beginErr;
          }
        }
    
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
          // 使用savePoint的话,保存一个savePoint,复用已有的事务,
          // 这里这样做的目的是为了减少资源的消费,因为数据库连接资源是有限的,宝贵的
          // DataSourceTransactionManager 默认是打开的,所以走的是这里 
          if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
          } else {
            // 生成一个新的事务
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
          }
        }
        // 除了需要特别处理的(生成新的事务,抛出异常,savePoint), 其他情况都走这里,使用已有的事务
        // 使用已有事务就意味着,复用旧事务的超时时间,隔离级别, 回滚配置,注意可能导致事务注解失效
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
      }
    
      // 生成一个新的status,但是事务还是使用之前的
      protected final DefaultTransactionStatus prepareTransactionStatus(
              TransactionDefinition definition, Object transaction, boolean newTransaction,
              boolean newSynchronization, boolean debug, Object suspendedResources) {
    
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
        prepareSynchronization(status, definition);
        return status;
      }
      
      // 挂起当前事务, 返回上一个事务的数据,恢复事务的时候使用
      protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
          // 对于已经执行的事务语句,进行挂起操作,主要是释放资源 sqlSessionFactory 和sql语句相关的资源
          // 比如使用 mybatis 生成的mapper接口,就是使用的是 sqlSession
          List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
          try {
            Object suspendedResources = null;
            if (transaction != null) {
              // 解除资源绑定,数据库连接 从threadLocal 中, 子类,DataSourceTransactionManager  实现
              suspendedResources = doSuspend(transaction);
            }
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            // 需要保存上一个数据库连接,用来恢复事务
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
          }
          catch (RuntimeException ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
          }
          catch (Error err) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw err;
          }
        }
        else if (transaction != null) {
          // Transaction active but no synchronization active.
          Object suspendedResources = doSuspend(transaction);
          return new SuspendedResourcesHolder(suspendedResources);
        }
        else {
          // Neither transaction nor synchronization active.
          return null;
        }
      }
    
      protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
              throws TransactionException {
    
        if (resourcesHolder != null) {
          Object suspendedResources = resourcesHolder.suspendedResources;
          if (suspendedResources != null) {
            // 子类实现,把数据库连接重新设置到threadLocal中
            doResume(transaction, suspendedResources);
          }
          List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
          if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            // 恢复sqlSessoionFactory 相关
            doResumeSynchronization(suspendedSynchronizations);
          }
        }
      }
    
      @Override
      public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
          throw new IllegalTransactionStateException(
                  "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }
    
        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        if (defStatus.isLocalRollbackOnly()) {
          processRollback(defStatus);
          return;
        }
        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
          processRollback(defStatus);
          if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
            throw new UnexpectedRollbackException(
                    "Transaction rolled back because it has been marked as rollback-only");
          }
          return;
        }
    
        processCommit(defStatus);
      }
    
      private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
          boolean beforeCompletionInvoked = false;
          try {
            prepareForCommit(status);
            triggerBeforeCommit(status);
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;
            boolean globalRollbackOnly = false;
            if (status.hasSavepoint()) {
              status.releaseHeldSavepoint();
            }
            else if (status.isNewTransaction()) {
              // 如果不是一个新的事务,则不进行提交, 对于 @Transactionl 注解同时嵌套调用的情况,
              // 开始 methdA->methodB 结束 的两个方法都有Transactionl注解,
              // methodB处理完之后,是不会调用这里的,只有methodA处理完走到这里的时候才进行调用
              // 只会线程栈帧 从methodB, 回到methodA,并且走到这里的时候才行
              // 想一下,如果 methodB 如果执行了commit的话,事务就结束了
              doCommit(status);
            }
          }
          catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
          }
          catch (TransactionException ex) {
            // can only be caused by doCommit
            if (isRollbackOnCommitFailure()) {
              doRollbackOnCommitException(status, ex);
            }
            else {
              triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
          }
          catch (RuntimeException ex) {
            if (!beforeCompletionInvoked) {
              triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, ex);
            throw ex;
          }
          catch (Error err) {
            if (!beforeCompletionInvoked) {
              triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, err);
            throw err;
          }
    
          // Trigger afterCommit callbacks, with an exception thrown there
          // propagated to callers but the transaction still considered as committed.
          try {
            triggerAfterCommit(status);
          }
          finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
          }
    
        }
        finally {
          // 清除当前数据,同时如果还有父事务,则进行事务的恢复 ,resumer
          cleanupAfterCompletion(status);
        }
      }
    
      private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
          // 清理事务管理器
          TransactionSynchronizationManager.clear();
        }
        if (status.isNewTransaction()) {
          //只有最初生成数据库连接的事务结束, 才能清理数据库连接,释放数据库资源, 
          doCleanupAfterCompletion(status.getTransaction());
        }
        if (status.getSuspendedResources() != null) {
          // 恢复之前挂起的事务,让上一个事务继续运行
          resume(status.getTransaction(), (SuspendedResourcesHolder) status.getSuspendedResources());
        }
      }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318

    spring-jdbc 事务管理器具体细节实现

    处理数据库连接的创建,释放,开始,提交,回滚,恢复等细节
    org.springframework.jdbc.datasource.DataSourceTransactionManager
    spring-jdbc jar包

    /**
     * 
     * 
     * 
     *  
     * 配置了数据原
     */
    public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
            implements ResourceTransactionManager, InitializingBean {
    
      private DataSource dataSource;
      
      // 获取一个事务
      @Override
      protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        // 第一次这里返回null, 从当前threadLocal里面取出,一个连接,key 是dataSource
        // 事务是每个线程只能有一个连接,如果有多个连接就无法知道当前事务用的是哪个连接,就乱掉了
        // 如果当前存在连接,则直接返回
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
      }
      
      // 开始事务
      @Override
      protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;
    
        try {
          // 第一次走这里,因为是空的
          if (!txObject.hasConnectionHolder() ||
                  txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 通过dataSource 数据源生成一个连接,数据源一般都是一个连接池
            Connection newCon = this.dataSource.getConnection();
            // 第一次生成连接,所以设置为true,标识最开始
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
          }
    
          txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
          con = txObject.getConnectionHolder().getConnection();
    
          // 设置 readOnly ,和 事务的隔离级别,如果当前配置的不是默认值的话
          Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
          txObject.setPreviousIsolationLevel(previousIsolationLevel);
    
          // 关闭事务自动提交,改为手动提交, 因为默认获取数据库连接的时候,事务已经开启了,所以这里关闭
          // 事务提交,就标识接下里的数据库操作都是同一个事务中, 在提交之前,所以只有新的连接处理完成之后
          // 才会调用commit接口,如果每次都调用commit就有问题了
          if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            con.setAutoCommit(false);
          }
    
          // 准备工作
          prepareTransactionalConnection(con, definition);
          // 激活事务,事务已经准备好了
          txObject.getConnectionHolder().setTransactionActive(true);
    
          int timeout = determineTimeout(definition);
          if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
          }
    
          // 刚创建的新连接,需要把数据源和连接绑定一下
          if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
          }
        }
    
        catch (Throwable ex) {
          if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, this.dataSource);
            txObject.setConnectionHolder(null, false);
          }
          throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
      }
    
      @Override
      protected Object doSuspend(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        txObject.setConnectionHolder(null);
        return TransactionSynchronizationManager.unbindResource(this.dataSource);
      }
    
      @Override
      protected void doResume(Object transaction, Object suspendedResources) {
        TransactionSynchronizationManager.bindResource(this.dataSource, suspendedResources);
      }
    
      @Override
      protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
          logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
          con.commit();
        }
        catch (SQLException ex) {
          throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
      }
    
      @Override
      protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
          logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
        }
        try {
          con.rollback();
        }
        catch (SQLException ex) {
          throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125

    事务同步机制管理

    org.springframework.transaction.support.TransactionSynchronizationManager
    把数据库连接放入到threadLocal中,进行保存,方便在方法调用中进行使用
    mybatis等都需要使用
    spring-tx jar包

    // 事务同步机制管理器,主要管理数据库连接,以及mybatis这种sqlSession的管理
    public class TransactionSynchronizationManager {
      public static Object getResource(Object key) {
        // 针对代理,回去目标对象
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Object value = doGetResource(actualKey);
        return value;
      }
    
      /**
       * Actually check the value of the resource that is bound for the given key.
       */
      private static Object doGetResource(Object actualKey) {
        Map<Object, Object> map = resources.get();
        // 第一次获取的是空的
        if (map == null) {
          return null;
        }
        Object value = map.get(actualKey);
        // Transparently remove ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
          map.remove(actualKey);
          // Remove entire ThreadLocal if empty...
          if (map.isEmpty()) {
            resources.remove();
          }
          value = null;
        }
        return value;
      }
    
      // 把数据连接 和 数据dataSrouce 进行绑定,开始事务的时候需要做,方便后续事务获取,复用已有的事务
      public static void bindResource(Object key, Object value) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Assert.notNull(value, "Value must not be null");
        Map<Object, Object> map = resources.get();
        // set ThreadLocal Map if none found
        if (map == null) {
          map = new HashMap<Object, Object>();
          resources.set(map);
        }
        Object oldValue = map.put(actualKey, value);
        // Transparently suppress a ResourceHolder that was marked as void...
        if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
          oldValue = null;
        }
        // 正常情况下,只有新生成的事务才需要绑定,复用的不需要绑定
        if (oldValue != null) {
          throw new IllegalStateException("Already value [" + oldValue + "] for key [" +
                  actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
        }
      }
    
      // 解除资源帮定,当事务结束,或者需要挂起事务的时候 
      public static Object unbindResource(Object key) throws IllegalStateException {
        Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
        Object value = doUnbindResource(actualKey);
        if (value == null) {
          throw new IllegalStateException(
                  "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
        }
        return value;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    织入

    通过springAOP技术实现,事务增强器作为springAOP增强器链中的一个元素,进行增强。
    这里就不再详细讲了,主要就是把利用aop机制,根据advisor,匹配之后把
    TransactionInterceptor 作为其增强链中的一环,具体可以看
    spring-aop这篇文章。这里只记录 transactionAdvisor如何注册
    advice TransactionInterceptor 如何进行增强的细节,aop看另外的文章。

    事务传播机制

    getTransaction()方法里面写的比较清楚,可以再看下代码
    事务传播机制在这个类里面有详细的介绍,想看原理,可以看
    上面该类的源码解析
    AbstractPlatformTransactionManager.getTransaction()

    public interface TransactionDefinition {
    
    	/**
    	 * Support a current transaction; create a new one if none exists.
    	 *  如果当前没有事务,则生成一个事务。如果有则复用已有的事务
    	 *   要注意的就是,既然是复用已有的事务,那肯定是要复用已有事务的配置,比如超时时间,回滚,不回滚
    	 *   这些事务配置,如果你的事务有特殊配置,就应该使用 PROPAGATION_REQUIRED_NEW
    	 *    重新生成一个事务, 如果没有特殊配置则不需要
    	 ** /
    	int PROPAGATION_REQUIRED = 0;
    
    	/**
    	 * Support a current transaction; execute non-transactionally if none exists.
    	 * 当前已经存在事务,则使用已有的,否则也不会新建事务
    	 */
    	int PROPAGATION_SUPPORTS = 1;
    
    	/**
    	 * Support a current transaction; throw an exception if no current transaction
    	 * exists. 
    	 * 当前必须要有事务,否则会抛出异常, 有一些事务要求自己处于另外一个事务当中,不能单独存在,使用这种
    	 * 比如参加活动,新增活动记录、增加积分。假如拆成了两个本地事务,则后面的一定要和前面的事务处于同一个
    	 * 事务当中,否则可能会导致数据的不一致。对于同时操作多个本地数据库的,但是处于不同的方法中的时候,就
    	 * 需要这个类型
    	 * AbstractPlatformTransactionManager.getTransaction() 有详细的说明,可以下上面
    	int PROPAGATION_MANDATORY = 2;
    
    	/**
    	 * Create a new transaction, suspending the current transaction if one exists.
    	 * 如果当前没有事务则创建一个新的事务,如果当前已经有事务了,则挂起当前的事务。
    	 * 如果两个事务的配置不一样,有特殊配置的事务,这种就需要
    	 */
    	int PROPAGATION_REQUIRES_NEW = 3;
    
    	/**
    	 * Do not support a current transaction; rather always execute non-transactionally.
    	 * 如果当前已经存在事务则挂起当前事务,如果没有也不会新建
    	 * JtaTransactionManage 特有的
    	 */
    	int PROPAGATION_NOT_SUPPORTED = 4;
    
    	/**
    	 * Do not support a current transaction; throw an exception if a current transaction
         * 不支持事务,如果有事务则抛出异常	
    	 */
    	int PROPAGATION_NEVER = 5;
    
    	/**
    	 * Execute within a nested transaction if a current transaction exists,
         * 执行一个嵌套的事务,目前已有的 DataSourceTransactionMangeer	
         * 默认使用 savePoint 进行支持,除非手动关闭这个配置,
         * 使用 savePoint 达成一样的效果,还可以节省宝贵的数据库连接资源
         * 当一些事务失败,可以忽略的时候,可以使用这个注解
         * 把一些不重要的处理,从大的事务中拆出来,放到单独的事务中,进行处理
         * 也是为了保证事务的柔性吧,即使一些失败了,也可以接受,只要核心事务没问题就行
    	 */
    	int PROPAGATION_NESTED = 6;
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    事务失效的原因

    1, aop失效的时候,事务肯定是失效的,
    比如非公共方法,内部调用, 这个是由于aop代理机制的问题导致的
    非公共方法不会被代理
    内部调用的对象使用的非代理对象,如果内部调用需要使用代理对象,需要特殊处理才行
    2, 回滚类特殊设置和最初开始事务的时候设置的不一样,导致回滚失效了
    或者配置不生效
    3, aop的配置
    proxy-target-class=“true”
    属性的配置为true,使用cglib代码,生成是子类,继承当前类,这种模式
    可以使用当前类的类型,从spring容器中获取生成的代理对象,或者进行注入。
    但是对于 false的,使用 JdkDynamicAopProxy
    生成的代理对象,和原来的类没有关系,只能使用类实现的接口进行获取,并且
    只能使用接口方法。
    所以如果是非接口的事务方法,必须使用true,比较稳妥

  • 相关阅读:
    《低代码指南》——如何通过维格表实现生产采购管理
    三、java基础语法
    JedisRedis6客户端工具——Jedis
    深聊测试开发之:从订单支付流程来聊一聊,如何预防重复支付,建议收藏。
    剖析 Vue.js 内部运行机制
    uniapp之ios开发及支付整体流程爬坑记录
    Spring-@Value用法介绍
    【第62篇】DepGraph:适用任何结构的剪枝
    应用场景丨迭代市政综合管廊监测系统建设
    LetCode刷题[简单题](5)按摩师,迭代出最优解(卡尔曼滤波也是类似迭代)
  • 原文地址:https://blog.csdn.net/li740207611/article/details/126184323