• Spring 事务原理总结五


    很抱歉,Spring事务本来应该在上一篇就结束的,但因为梳理过程中发现了更多的未知知识,所以便再啰嗦几篇。本篇主要针对前一篇文章——《Spring 事务原理总结四》——末尾提到的几个问题进行梳理,这里再回顾一下这几个问题:

    1. 本篇文章中我们梳理了完整的流程,但是还有一个地方梳理的不够完整,即调用PlatformTransactionManager对象的getTransaction()方法(该方法需要一个TransactionAttribute对象,具体逻辑可以参考JdbcTransactionManager类的父类AbstractPlatformTransactionManager中的源码)获取TransactionStatus对象这个地方
    2. 为什么这里没看到conn.setAutoCommit(false)?
    3. Spring事务异常回滚的执行流程是什么?
    4. Spring事务失效的场景有那些?

    通过前一篇文章,我们可以很确定Spring事务是通过代理方式实现的,其最终的处理类为TransactionInterceptor,而这个类中的invoke(MethodInvocation)方法是执行事务的起点,这个方法又继续调用了本类(TransactionInterceptor)的父类——TransactionAspectSupport——中的invokeWithinTransaction(Method, Class, InvocationCallback)方法,这个方法是实现事务控制逻辑的核心,继续跟踪会看到上节提到的创建TransactionInfo对象的代码,即:

    TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);

    之后我们继续进入该方法(其位于TransactionAspectSupport类中),这个方法内部有一段创建TransactionStatus对象的逻辑:

    1. TransactionStatus status = null;
    2. if (txAttr != null) {
    3. if (tm != null) {
    4. status = tm.getTransaction(txAttr);
    5. }
    6. else {
    7. if (logger.isDebugEnabled()) {
    8. logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
    9. "] because no transaction manager has been configured");
    10. }
    11. }
    12. }

    从此处,我们进入到AbstractPlatformTransactionManager#getTransaction(TransactionDefinition)方法内部(注意:AbstractPlatformTransactionManager抽象类实现了PlatformTransactionManager接口,如果想了解其继承结构,可以看一下《Spring 事务原理总结三》这篇文章;还有上篇博客《Spring 事务原理总结四》中说这个方法接收的参数的类型为TransactionAttribute,这里想纠正一下,实际类型为TransactionDefinition),这里我们再贴一下该方法的源码:

    1. @Override
    2. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
    3. throws TransactionException {
    4. // Use defaults if no transaction definition given.
    5. TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    6. Object transaction = doGetTransaction();
    7. boolean debugEnabled = logger.isDebugEnabled();
    8. if (isExistingTransaction(transaction)) {
    9. // Existing transaction found -> check propagation behavior to find out how to behave.
    10. return handleExistingTransaction(def, transaction, debugEnabled);
    11. }
    12. // Check definition settings for new transaction.
    13. if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    14. throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
    15. }
    16. // No existing transaction found -> check propagation behavior to find out how to proceed.
    17. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    18. throw new IllegalTransactionStateException(
    19. "No existing transaction found for transaction marked with propagation 'mandatory'");
    20. }
    21. else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    22. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    23. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    24. SuspendedResourcesHolder suspendedResources = suspend(null);
    25. if (debugEnabled) {
    26. logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
    27. }
    28. try {
    29. return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
    30. }
    31. catch (RuntimeException | Error ex) {
    32. resume(null, suspendedResources);
    33. throw ex;
    34. }
    35. }
    36. else {
    37. // Create "empty" transaction: no actual transaction, but potentially synchronization.
    38. if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    39. logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    40. "isolation level will effectively be ignored: " + def);
    41. }
    42. boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    43. return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    44. }
    45. }

    下面让我们一起来梳理一下这个方法的具体处理逻辑吧!首先让我们一起来看一下下面这幅图:

    从图中不难发现,这个方法会首先获得一个TransactionDefinition对象(如果传递进来的对象为空则会返回一个StaticTransactionDefinition类型的TransactionDefinition对象),接下来会调用DataSourceTransactionManager#doGetTransaction(),创建一个DataSourceTransactionObject类型的对象(该类是DataSourceTransactionManager中定义的一个静态内部类),关于该类的继承结构如下图所示:

    下面看一下doGetTransaction()方法的源码,具体如下所示:

    1. protected Object doGetTransaction() {
    2. DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    3. txObject.setSavepointAllowed(isNestedTransactionAllowed());
    4. ConnectionHolder conHolder =
    5. (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    6. txObject.setConnectionHolder(conHolder, false);
    7. return txObject;
    8. }

    下面再来看一下ConnectionHolder这个类,它位于org.springframework.jdbc.datasource中,其继承结构如下所示:

    总之,结合doGetTransaction()方法源码,我们可以清楚看到,其主要作用就是创建一个DataSourceTransactionObject对象。回到AbstractPlatformTransactionManager中的getTransaction()方法中,继续向下走,见下图:

    下面让我们一起看一下DataSourceTransactionManager类中的isExistingTransaction()方法的源码,具体如下所示:

    1. @Override
    2. protected boolean isExistingTransaction(Object transaction) {
    3. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    4. return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
    5. }

    这个方法的主要作用就是判断当前是否存在存活的事务,如果存在事务,则处理之,否则继续。接着就是判断事务是否超时,如果超时,直接抛出超时异常,否则继续。之后就是对事务传播行为的判断,首先就是判断当前的事务传播行为是否为PROPAGATION_MANDATORY,根据《Spring 事务原理总结一》这篇文章的介绍,这个事务传播属性的作用是如果当前存在事务,则加入该事务;如果当前没有事务,则抛出异常。所以这里有这样一段代码:

    1. if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    2. throw new IllegalTransactionStateException(
    3. "No existing transaction found for transaction marked with propagation 'mandatory'");
    4. }

    接着判断当前的事务传播行为是否是PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED,根据《Spring 事务原理总结一》这篇文章的介绍,它们三个的作用分别为:如果当前存在事务,则加入该事务;如果当前没有事务,则创建一个新的事务创建一个新的事务,如果当前存在事务,则把当前事务挂起如果当前存在事务,则创建一个事务作为当前事务的嵌套事务来运行;如果当前没有事务,则该取值等价于REQUIRED。这个判断逻辑的详细代码如下所示:

    1. else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    2. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    3. def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    4. SuspendedResourcesHolder suspendedResources = suspend(null);
    5. if (debugEnabled) {
    6. logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
    7. }
    8. try {
    9. return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
    10. }
    11. catch (RuntimeException | Error ex) {
    12. resume(null, suspendedResources);
    13. throw ex;
    14. }
    15. }

    由于我们使用的是默认配置,即PROPAGATION_REQUIRED,所以我们跟踪的案例会走到这段分支。这个分支中的suspend(null)表示暂停某某某(关于这部分暂时不做解释)。先来看下面这幅图:

    接着看一下startTransaction(TransactionDefinition definition, Object transaction, boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources)这个方法(注意这个方法位于AbstractPlatformTransactionManager类中)的源码(根据该方法的方法名可知该方法的主要作用是开始一个新事务,这是个人理解,后续会根据理解的深入而进行变更),具体如下所示:

    1. private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
    2. boolean nested, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
    3. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    4. DefaultTransactionStatus status = newTransactionStatus(
    5. definition, transaction, true, newSynchronization, nested, debugEnabled, suspendedResources);
    6. this.transactionExecutionListeners.forEach(listener -> listener.beforeBegin(status));
    7. try {
    8. doBegin(transaction, definition);
    9. }
    10. catch (RuntimeException | Error ex) {
    11. this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, ex));
    12. throw ex;
    13. }
    14. prepareSynchronization(status, definition);
    15. this.transactionExecutionListeners.forEach(listener -> listener.afterBegin(status, null));
    16. return status;
    17. }

    这段逻辑中我们主要看一下DataSourceTransactionManager#doBegin(Object transaction, TransactionDefinition definition)方法,其源码如下所示:

    1. protected void doBegin(Object transaction, TransactionDefinition definition) {
    2. DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    3. Connection con = null;
    4. try {
    5. if (!txObject.hasConnectionHolder() ||
    6. txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    7. Connection newCon = obtainDataSource().getConnection();
    8. if (logger.isDebugEnabled()) {
    9. logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    10. }
    11. txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    12. }
    13. txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    14. con = txObject.getConnectionHolder().getConnection();
    15. Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    16. txObject.setPreviousIsolationLevel(previousIsolationLevel);
    17. txObject.setReadOnly(definition.isReadOnly());
    18. // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
    19. // so we don't want to do it unnecessarily (for example if we've explicitly
    20. // configured the connection pool to set it already).
    21. if (con.getAutoCommit()) {
    22. txObject.setMustRestoreAutoCommit(true);
    23. if (logger.isDebugEnabled()) {
    24. logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
    25. }
    26. con.setAutoCommit(false);
    27. }
    28. prepareTransactionalConnection(con, definition);
    29. txObject.getConnectionHolder().setTransactionActive(true);
    30. int timeout = determineTimeout(definition);
    31. if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
    32. txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    33. }
    34. // Bind the connection holder to the thread.
    35. if (txObject.isNewConnectionHolder()) {
    36. TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
    37. }
    38. }
    39. catch (Throwable ex) {
    40. if (txObject.isNewConnectionHolder()) {
    41. DataSourceUtils.releaseConnection(con, obtainDataSource());
    42. txObject.setConnectionHolder(null, false);
    43. }
    44. throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    45. }
    46. }

    注意这段逻辑中有这样一句代码:con.setAutoCommit(false)。这就解释了上篇文章末尾和本篇文章开头提到的问题:为什么这里没看到conn.setAutoCommit(false)。这个逻辑写在了DataSourceTransactionManager类的doBegin()方法中。关于这个方法中的其他细节,这里不过多解释,后期如果理解透彻了,我会继续添加。最后让我们一起回到AbstractPlatformTransactionManager的getTransaction(@Nullable TransactionDefinition definition)方法中,然后继续返回一直返回到最初的调用者,即TransactionAspectSupport的createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification)方法中,然后继续向上返回,直到TransactionAspectSupport类的invokeWithinTransaction(Method method, @Nullable Class targetClass, final InvocationCallback invocation)方法中。这样我们就创建了一个TransactionInfo对象,然后就可以按照前面一章——《Spring 事务原理总结四》——描述的流程继续向下走了。

    至此我们解决了昨天遗留问题中的两个,即本篇篇首所列的问题一和问题二。那其他两个问题呢?首先第三个问题个人觉得是流程梳理性质的问题,所以本篇不再过分着墨,我会在新篇章中对这个问题进行详尽的梳理。接着第四个问题属于面试性质的考题,可以在本篇文章中进行梳理。因此接下来我将对第四个问题进行梳理,如果有不对的地方,还请大家多多指教,谢谢!常见的造成事务失效的场景有以下几种

    1. 服务未托管给Spring:如果带有@Transactional注解的方法所在的类没有被Spring容器管理,即该类实例不是通过Spring IOC容器创建的,那么Spring将无法对这个方法应用事务管理。
    2. 受检异常处理不当Spring默认只回滚运行时异常(继承自RuntimeException的异常)和声明在@Transactional(rollbackFor=...)中的异常类型。若抛出了检查型异常(非运行时异常),而没有在注解中明确配置为回滚,则事务可能不会回滚。
    3. 异常捕获但未传播:当业务逻辑中捕获了异常并自行处理,而没有再次抛出或重新抛出到Spring事务代理可以感知的地方,事务将无法正确执行回滚操作。
    4. 切面顺序问题由于AOP代理的特性,如果在同一个类内部方法之间进行相互调用,并且调用的是未经过代理的对象上的事务方法,事务将不会生效。解决办法是通过注入服务的方式间接调用,确保经过代理。
    5. 非public方法Spring AOP代理默认仅作用于public方法上,如果将带有@Transactional注解的方法设置为protected、private或default访问权限,事务将不会生效
    6. 自调用事务方法在一个类内部,一个@Transactional方法直接调用本类内的另一个@Transactional方法,由于绕过了Spring AOP代理,会导致事务失效
    7. 异步调用场景:在Spring事务环境下,如果在事务方法中启动新的线程或者使用消息队列等异步机制调用事务性方法,由于线程切换,原事务上下文将不会被传递,导致新线程中的事务失效。
    8. 注解配置错误:如@Transactional注解没有正确配置在类或方法上,或者传播行为设置不当,可能导致事务失效。
    9. 未满足开启事务的条件:Spring事务需要由动态代理对象来调用带有@Transactional注解的方法,如果不是由Spring生成的代理对象调用,事务管理将不起作用。

    好了,今天就这样吧,虽然没能兑现自己的诺言,但总算弄懂了几个问题(剩余的那个问题我将在下一篇博文中重点介绍)。回头看看那遍布荆棘的来路,不知道自己是怎么一步一步走来的,抬头望着前方那泥泞不堪的陡峭绝路,我的内心毫无波澜,不是什么都不会导致的麻木不仁,更不是破罐子破摔的自弃心理作祟,只是我知道急躁会使事情更加糟糕。所以与其手忙脚乱,不如一步一个脚印的慢慢前行。

  • 相关阅读:
    Python运算符、函数与模块和程序控制结构
    G120变频器控制方式(宏18)模拟量控制的具体方法示例
    深度解析当贝盒子B3、腾讯极光5S、小米盒子4S之间的区别
    Redis Cluster集群方案
    [激光原理与应用-39]:《光电检测技术-6》- 光干涉的原理与基础
    Spring Cloud Loadbalancer
    西门子二次开发3GL环境搭建
    读高性能MySQL(第4版)笔记19_云端和合规性
    百战RHCE(第四十七战:运维工程师必会技-Ansible学习2-Ansible安装配置练习环境)
    排序算法之-选择
  • 原文地址:https://blog.csdn.net/java_lover20111106/article/details/136116316