• spring-transaction源码分析(5)TransactionInterceptor事务拦截逻辑


    spring-tx的事务拦截逻辑在TransactionInterceptor类,本文将详细分析其实现方式。

    事务拦截器TransactionInterceptor

    spring-tx的事务拦截逻辑在TransactionInterceptor类,它实现了MethodInterceptor接口。

    MethodInterceptor接口

    MethodInterceptor接口的实现类封装aop切面拦截逻辑:

    public interface MethodInterceptor extends Interceptor {
    
    	/**
    	 * Implement this method to perform extra treatments before and after the invocation.
    	 */
    	Object invoke(MethodInvocation invocation) throws Throwable;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    TransactionInterceptor类

    TransactionInterceptor类封装了事务拦截逻辑:

    public class TransactionInterceptor 
        extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    
    	// ...
    
    	@Override
    	public Object invoke(MethodInvocation invocation) throws Throwable {
    		// Work out the target class: may be {@code null}.
    		// The TransactionAttributeSource should be passed the target class
    		// as well as the method, which may be from an interface.
    		Class<?> targetClass = 
                (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
    
    		// Adapt to TransactionAspectSupport's invokeWithinTransaction...
    		return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    	}
    
    	// ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    事务逻辑在父类TransactionAspectSupport的invokeWithinTransaction方法中。

    invokeWithinTransaction方法

    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
    		final InvocationCallback invocation) throws Throwable {
    
    	// 如果方法没有被Transactional注解标注,则返回null
    	// 返回的是AnnotationTransactionAttributeSource对象
    	// 用于获取Transactional注解相关属性,
    	// 比如rollbackOn, propagationBehavior, isolationLevel等
    	TransactionAttributeSource tas = getTransactionAttributeSource();
    	TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    
    	// 获取事务管理器
    	// 后面会转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
        TransactionManager tm = determineTransactionManager(txAttr);
    
    	// Reactive事务,略
    
    	// 转型成PlatformTransactionManager对象,可以开启事务、提交、回滚
    	PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
    	// 获取事务方法的唯一标识
    	// 格式为"类名.方法名"
    	final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
    
    	if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
    		// 创建事务
    		TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
    
    		Object retVal;
    		try {
    			// This is an around advice: Invoke the next interceptor in the chain.
    			// This will normally result in a target object being invoked.
    			retVal = invocation.proceedWithInvocation();
    		} catch (Throwable ex) {
    			// Handle a throwable, completing the transaction.
    			// We may commit or roll back, depending on the configuration.
    			completeTransactionAfterThrowing(txInfo, ex);
    			throw ex;
    		} finally {
    			// Reset the TransactionInfo ThreadLocal.
    			// Call this in all cases: exception or normal return!
    			cleanupTransactionInfo(txInfo);
    		}
    
    		if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
    			// Set rollback-only in case of Vavr failure matching our rollback rules...
    			TransactionStatus status = txInfo.getTransactionStatus();
    			if (status != null && txAttr != null) {
    				retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
    			}
    		}
    
    		// 提交事务
    		commitTransactionAfterReturning(txInfo);
    
    		return retVal;
    	} else {
    		// CallbackPreferringPlatformTransactionManager事务管理器逻辑,略
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    创建事务

    Create a transaction if necessary based on the given TransactionAttribute. Allows callers to perform custom TransactionAttribute lookups through the TransactionAttributeSource.

    protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,
    		TransactionAttribute txAttr, final String joinpointIdentification) {
    
    	// If no name specified, apply method identification as transaction name.
    	if (txAttr != null && txAttr.getName() == null) {
    		txAttr = new DelegatingTransactionAttribute(txAttr) {
    			@Override
    			public String getName() {
    				return joinpointIdentification;
    			}
    		};
    	}
    
    	TransactionStatus status = null;
    	if (txAttr != null) {
    		if (tm != null) {
    			// 创建新事务或者返回已存在事务, 这取决于传播级别。
    			// 隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。
    			status = tm.getTransaction(txAttr);
    		}
    	}
    
    	// 使用指定的事务属性和TransactionStatus创建TransactionInfo
    	return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    getTransaction(txAttr)

    这个方法创建新事务或者返回已存在事务,这取决于传播级别。隔离级别或超时等参数只在新事务时生效,已存在事务会忽略这些参数。

    public final TransactionStatus getTransaction(TransactionDefinition definition)
    		throws TransactionException {
    
    	// Use defaults if no transaction definition given.
    	TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
    
    	// DataSourceTransactionManager实现类返回DataSourceTransactionObject对象
    	// DataSourceTransactionObject对象封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
    	Object transaction = doGetTransaction();
    	boolean debugEnabled = logger.isDebugEnabled();
    
    	if (isExistingTransaction(transaction)) {
    		// Existing transaction found -> check propagation behavior to find out how to behave.
    		return handleExistingTransaction(def, transaction, debugEnabled);
    	}
    
    	// No existing transaction found -> check propagation behavior to find out how to proceed.
    	if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    		// 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
    		throw new IllegalTransactionStateException(
    				"No existing transaction found for transaction marked with propagation 'mandatory'");
    	} else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    			def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    		SuspendedResourcesHolder suspendedResources = suspend(null);
    		// Creating new transaction
    		try {
    			return startTransaction(def, transaction, debugEnabled, suspendedResources);
    		} catch (RuntimeException | Error ex) {
    			resume(null, suspendedResources);
    			throw ex;
    		}
    	} else {
    		// Create "empty" transaction: no actual transaction, but potentially synchronization.
    		if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    			logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    					"isolation level will effectively be ignored: " + def);
    		}
    		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    		return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    doGetTransaction()

    protected Object doGetTransaction() {
    	// DataSourceTransactionObject封装着数据库连接、previousIsolationLevel、readOnly、savepointAllowed等
    	DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    	// 是否允许设置保存点,NESTED传播级别时使用,DataSourceTransactionManager类型该属性为true
    	txObject.setSavepointAllowed(isNestedTransactionAllowed());
    	// 从ThreadLocal获取当前线程上绑定的ConnectionHolder
    	// ConnectionHolder对象保存着数据库连接
    	// 业务方法第一次执行时为null
    	ConnectionHolder conHolder =
    			(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    	txObject.setConnectionHolder(conHolder, false);
    	return txObject;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    当前有事务

    handleExistingTransaction方法

    if (isExistingTransaction(transaction)) {
    	// Existing transaction found -> check propagation behavior to find out how to behave.
    	return handleExistingTransaction(def, transaction, debugEnabled);
    }
    
    • 1
    • 2
    • 3
    • 4

    isExistingTransaction方法判断当前是否存在事务:

    protected boolean isExistingTransaction(Object transaction) {
    	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    	// 判断存在数据库连接且开启了事务
    	return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    handleExistingTransaction方法:

    private TransactionStatus handleExistingTransaction(
    		TransactionDefinition definition, Object transaction, boolean debugEnabled)
    		throws TransactionException {
    
    	// 传播级别为NEVER
    	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    		// 传播级别设置为NEVER时,如果当前有事务,则抛出异常
    		throw new IllegalTransactionStateException(
    				"Existing transaction found for transaction marked with propagation 'never'");
    	}
    
    	// 传播级别为NOT_SUPPORTED
    	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    		// 挂起当前事务
    		Object suspendedResources = suspend(transaction);
    		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    		return prepareTransactionStatus(
    				definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    	}
    
    	// 传播级别为REQUIRES_NEW
    	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    		// 挂起当前事务,然后创建新事务
    		SuspendedResourcesHolder suspendedResources = suspend(transaction);
    		try {
    			return startTransaction(definition, transaction, debugEnabled, suspendedResources);
    		} catch (RuntimeException | Error beginEx) {
    			resumeAfterBeginException(transaction, suspendedResources, beginEx);
    			throw beginEx;
    		}
    	}
    
    	// 传播级别为NESTED
    	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    		if (!isNestedTransactionAllowed()) {
    			throw new NestedTransactionNotSupportedException(
    					"Transaction manager does not allow nested transactions by default - " +
    					"specify 'nestedTransactionAllowed' property with value 'true'");
    		}
    		// Creating nested transaction
    		if (useSavepointForNestedTransaction()) {
    			// Create savepoint within existing Spring-managed transaction,
    			// through the SavepointManager API implemented by TransactionStatus.
    			// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
    			DefaultTransactionStatus status =
    					prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    			status.createAndHoldSavepoint();
    			return status;
    		}
    		else {
    			// Nested transaction through nested begin and commit/rollback calls.
    			// Usually only for JTA: Spring synchronization might get activated here
    			// in case of a pre-existing JTA transaction.
    			return startTransaction(definition, transaction, debugEnabled, null);
    		}
    	}
    
    	// 传播级别为SUPPORTS/REQUIRED
    	if (isValidateExistingTransaction()) {
    		if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
    			Integer currentIsolationLevel = 
                    TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    			if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
    				Constants isoConstants = DefaultTransactionDefinition.constants;
    				throw new IllegalTransactionStateException(
                        "Participating transaction with definition [" + definition + 
                        "] specifies isolation level which is incompatible with existing transaction: " +
    						(currentIsolationLevel != null ?
    								isoConstants.toCode(currentIsolationLevel,
                                                        DefaultTransactionDefinition.PREFIX_ISOLATION) :
    								"(unknown)"));
    			}
    		}
    		if (!definition.isReadOnly()) {
    			if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
    				throw new IllegalTransactionStateException("Participating transaction with definition [" +
    						definition + "] is not marked as read-only but existing transaction is");
    			}
    		}
    	}
    
    	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    	return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    传播级别为NEVER

    传播级别设置为NEVER时,如果当前有事务,抛出异常:

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
    	throw new IllegalTransactionStateException(
    			"Existing transaction found for transaction marked with propagation 'never'");
    }
    
    • 1
    • 2
    • 3
    • 4

    传播级别为NOT_SUPPORTED

    挂起当前事务,业务方法以无事务方式执行:

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
    	// 挂起当前事务
    	Object suspendedResources = suspend(transaction);
    	boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    	return prepareTransactionStatus(
    			definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    prepareTransactionStatus方法:

    1. 创建DefaultTransactionStatus对象,把SuspendedResources封装进去,以便后续恢复旧事务
    2. 使用TransactionSynchronizationManager将事务属性绑定到当前线程
    3. 初始化当前线程TransactionSynchronization集

    由于传播级别为NOT_SUPPORTED所以此处不会开启事务。

    传播级别为REQUIRES_NEW

    挂起当前事务,创建新事务:

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    	// 挂起当前事务
    	SuspendedResourcesHolder suspendedResources = suspend(transaction);
    	try {
    		// 开启新事务
    		return startTransaction(definition, transaction, debugEnabled, suspendedResources);
    	} catch (RuntimeException | Error beginEx) {
    		resumeAfterBeginException(transaction, suspendedResources, beginEx);
    		throw beginEx;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    开启新事务:

    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
    		boolean debugEnabled, SuspendedResourcesHolder suspendedResources) {
    
    	// 值为true
    	boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    	DefaultTransactionStatus status = newTransactionStatus(
    			definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    	// 开启新事务
    	doBegin(transaction, definition);
    	// 初始化当前线程TransactionSynchronization集
    	prepareSynchronization(status, definition);
    	return status;
    }
    
    protected void doBegin(Object transaction, TransactionDefinition definition) {
    	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    	Connection con = null;
    
    	try {
    		if (!txObject.hasConnectionHolder() ||
    				txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    			// 打开一个新连接
    			Connection newCon = obtainDataSource().getConnection();
    			txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    		}
    
    		txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    		con = txObject.getConnectionHolder().getConnection();
    
    		Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    		txObject.setPreviousIsolationLevel(previousIsolationLevel);
    		txObject.setReadOnly(definition.isReadOnly());
    
    		// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
    		// so we don't want to do it unnecessarily (for example if we've explicitly
    		// configured the connection pool to set it already).
    		if (con.getAutoCommit()) {
    			txObject.setMustRestoreAutoCommit(true);
    			// 设置手动提交
    			con.setAutoCommit(false);
    		}
    
    		// The default implementation executes a "SET TRANSACTION READ ONLY" statement 
    		// if the "enforceReadOnly" flag is set to true and the transaction definition 
    		// indicates a read-only transaction.
    		prepareTransactionalConnection(con, definition);
    		txObject.getConnectionHolder().setTransactionActive(true);
    
    		int timeout = determineTimeout(definition);
    		if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
    			txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    		}
    
    		// Bind the connection holder to the thread.
    		if (txObject.isNewConnectionHolder()) {
    			TransactionSynchronizationManager
                    .bindResource(obtainDataSource(), txObject.getConnectionHolder());
    		}
    	} catch (Throwable ex) {
    		if (txObject.isNewConnectionHolder()) {
    			DataSourceUtils.releaseConnection(con, obtainDataSource());
    			txObject.setConnectionHolder(null, false);
    		}
    		throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    传播级别为NESTED

    为当前连接设置保存点,如果业务方法出现异常,会回滚到该保存点位置:

    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    	if (!isNestedTransactionAllowed()) {
    		throw new NestedTransactionNotSupportedException(
    				"Transaction manager does not allow nested transactions by default - " +
    				"specify 'nestedTransactionAllowed' property with value 'true'");
    	}
    	// Creating nested transaction
    	// 默认就是true
    	if (useSavepointForNestedTransaction()) {
    		// Create savepoint within existing Spring-managed transaction,
    		// through the SavepointManager API implemented by TransactionStatus.
    		// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
    		DefaultTransactionStatus status =
    				prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
    		// 设置保存点
    		status.createAndHoldSavepoint();
    		return status;
    	} else {
    		// Nested transaction through nested begin and commit/rollback calls.
    		// Usually only for JTA: Spring synchronization might get activated here
    		// in case of a pre-existing JTA transaction.
    		return startTransaction(definition, transaction, debugEnabled, null);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    设置保存点:

    public void createAndHoldSavepoint() throws TransactionException {
    	setSavepoint(getSavepointManager().createSavepoint());
    }
    
    // JdbcTransactionObjectSupport#createSavepoint
    public Object createSavepoint() throws TransactionException {
    	ConnectionHolder conHolder = getConnectionHolderForSavepoint();
    	try {
    		if (!conHolder.supportsSavepoints()) {
    			throw new NestedTransactionNotSupportedException("不支持");
    		}
    		if (conHolder.isRollbackOnly()) {
    			throw new CannotCreateTransactionException("只读");
    		}
    		// 使用jdbc设置保存点
    		return conHolder.createSavepoint();
    	} catch (SQLException ex) {
    		throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    传播级别为SUPPORTS/REQUIRED/MANDATORY

    // 默认false
    if (isValidateExistingTransaction()) {
    	if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
    		Integer currentIsolationLevel =
                TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    		if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
    			Constants isoConstants = DefaultTransactionDefinition.constants;
    			throw new IllegalTransactionStateException(
                    "Participating transaction with definition [" + definition + 
                    "] specifies isolation level which is incompatible with existing transaction: " +
    					(currentIsolationLevel != null ?
    							isoConstants.toCode(currentIsolationLevel,
                                                    DefaultTransactionDefinition.PREFIX_ISOLATION) :
    							"(unknown)"));
    		}
    	}
    	if (!definition.isReadOnly()) {
    		if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
    			throw new IllegalTransactionStateException("Participating transaction with definition [" +
    					definition + "] is not marked as read-only but existing transaction is");
    		}
    	}
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    事务挂起

    把当前线程上绑定的资源、事务配置信息移除封装到SuspendedResourcesHolder对象,传递给新创建的TransactionStatus对象,以便在业务方法执行结束后恢复旧事务:

    protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
    	// 判断当前线程的Set已经存在
    	// TransactionSynchronization: 事务回调同步器,定义了事务挂起、恢复等方法
    	// 例如mybatis-spring中有SqlSessionSynchronization实现类
    	if (TransactionSynchronizationManager.isSynchronizationActive()) {
    		List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
    		try {
    			Object suspendedResources = null;
    			if (transaction != null) {
    				// 挂起事务
    				suspendedResources = doSuspend(transaction);
    			}
    
    			// 清除当前线程事务配置参数:事务名、只读属性、隔离级别等
    			String name = TransactionSynchronizationManager.getCurrentTransactionName();
    			TransactionSynchronizationManager.setCurrentTransactionName(null);
    			boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
    			TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
    			Integer isolationLevel = 
                    TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    			TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
    			boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
    			TransactionSynchronizationManager.setActualTransactionActive(false);
    
    			// 把当前线程的事务相关信息封装起来以便后续恢复
    			return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations,
                                                    name, readOnly, isolationLevel, wasActive);
    		} catch (RuntimeException | Error ex) {
    			// doSuspend failed - original transaction is still active...
    			doResumeSynchronization(suspendedSynchronizations);
    			throw ex;
    		}
    	} else if (transaction != null) {
    		// Transaction active but no synchronization active.
    		Object suspendedResources = doSuspend(transaction);
    		return new SuspendedResourcesHolder(suspendedResources);
    	} else {
    		// Neither transaction nor synchronization active.
    		return null;
    	}
    }
    
    private List<TransactionSynchronization> doSuspendSynchronization() {
    	List<TransactionSynchronization> suspendedSynchronizations =
    			TransactionSynchronizationManager.getSynchronizations();
    	// 挂起所有的TransactionSynchronization
    	// 比如SqlSessionSynchronization实现类会清除当前线程的SessionFactory
    	for (TransactionSynchronization synchronization : suspendedSynchronizations) {
    		synchronization.suspend();
    	}
    	// 清除线程上的TransactionSynchronization集
    	TransactionSynchronizationManager.clearSynchronization();
    	return suspendedSynchronizations;
    }
    
    protected Object doSuspend(Object transaction) {
    	DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    	txObject.setConnectionHolder(null);
    	// ConnectionHolder对象
    	return TransactionSynchronizationManager.unbindResource(obtainDataSource());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    事务恢复

    protected final void resume(Object transaction, SuspendedResourcesHolder resourcesHolder)
    		throws TransactionException {
    
    	if (resourcesHolder != null) {
    		Object suspendedResources = resourcesHolder.suspendedResources;
    		if (suspendedResources != null) {
    			// 恢复之前挂起的是ConnectionHolder对象
    			doResume(transaction, suspendedResources);
    		}
    		List<TransactionSynchronization> suspendedSynchronizations =
                resourcesHolder.suspendedSynchronizations;
    		if (suspendedSynchronizations != null) {
    			TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
    			TransactionSynchronizationManager
                    .setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
    			TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
    			TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
    			doResumeSynchronization(suspendedSynchronizations);
    		}
    	}
    }
    
    protected void doResume(Object transaction, Object suspendedResources) {
    	// 恢复之前挂起的是ConnectionHolder对象
    	TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    当前无事务

    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    	// 传播级别设置为PROPAGATION_MANDATORY时,如果当前没有事务,则抛出异常
    	throw new IllegalTransactionStateException(
    			"No existing transaction found for transaction marked with propagation 'mandatory'");
    } else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
    		def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
    		def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    	SuspendedResourcesHolder suspendedResources = suspend(null);
    	// Creating new transaction
    	try {
    		return startTransaction(def, transaction, debugEnabled, suspendedResources);
    	} catch (RuntimeException | Error ex) {
    		resume(null, suspendedResources);
    		throw ex;
    	}
    } else {
    	// Create "empty" transaction: no actual transaction, but potentially synchronization.
    	if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
    		logger.warn("Custom isolation level specified but no actual transaction initiated; " +
    				"isolation level will effectively be ignored: " + def);
    	}
    	boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
    	return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    创建TransactionInfo

    protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm,
    		TransactionAttribute txAttr, String joinpointIdentification,
    		TransactionStatus status) {
    
    	TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    	if (txAttr != null) {
    		// The transaction manager will flag an error if an incompatible tx already exists.
    		txInfo.newTransactionStatus(status);
    	} else {
    		// The TransactionInfo.hasTransaction() method will return false. We created it only
    		// to preserve the integrity of the ThreadLocal stack maintained in this class.
    	}
    
    	// We always bind the TransactionInfo to the thread, even if we didn't create
    	// a new transaction here. This guarantees that the TransactionInfo stack
    	// will be managed correctly even if no transaction was created by this aspect.
    	txInfo.bindToThread();
    	return txInfo;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    异常处理

    protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
    	if (txInfo != null && txInfo.getTransactionStatus() != null) {
    		if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
    			try {
    				txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
    			} catch (TransactionSystemException ex2) {
    				logger.error("Application exception overridden by rollback exception", ex);
    				ex2.initApplicationException(ex);
    				throw ex2;
    			} catch (RuntimeException | Error ex2) {
    				logger.error("Application exception overridden by rollback exception", ex);
    				throw ex2;
    			}
    		} else {
    			// We don't roll back on this exception.
    			// Will still roll back if TransactionStatus.isRollbackOnly() is true.
    			try {
    				txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    			} catch (TransactionSystemException ex2) {
    				logger.error("Application exception overridden by commit exception", ex);
    				ex2.initApplicationException(ex);
    				throw ex2;
    			} catch (RuntimeException | Error ex2) {
    				logger.error("Application exception overridden by commit exception", ex);
    				throw ex2;
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    回滚:

    public final void rollback(TransactionStatus status) throws TransactionException {
    	if (status.isCompleted()) {
    		throw new IllegalTransactionStateException("Transaction is already completed");
    	}
    
    	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    	processRollback(defStatus, false);
    }
    
    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    	try {
    		boolean unexpectedRollback = unexpected;
    
    		try {
    			triggerBeforeCompletion(status);
    
    			if (status.hasSavepoint()) {
    				// 回滚到指定保存点
    				status.rollbackToHeldSavepoint();
    			} else if (status.isNewTransaction()) {
    				// 事务回滚
    				doRollback(status);
    			} else {
    				// Participating in larger transaction
    				if (status.hasTransaction()) {
    					if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
    						// 设置rollback-only
    						doSetRollbackOnly(status);
    					}
    				}
    				// Unexpected rollback only matters here if we're asked to fail early
    				if (!isFailEarlyOnGlobalRollbackOnly()) {
    					unexpectedRollback = false;
    				}
    			}
    		} catch (RuntimeException | Error ex) {
    			triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    			throw ex;
    		}
    
    		triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    
    		// Raise UnexpectedRollbackException if we had a global rollback-only marker
    		if (unexpectedRollback) {
    			throw new UnexpectedRollbackException(
    					"Transaction rolled back because it has been marked as rollback-only");
    		}
    	} finally {
    		// 这里面有恢复挂起事务的逻辑
    		cleanupAfterCompletion(status);
    	}
    }
    
    // 回滚到指定保存点
    public void rollbackToSavepoint(Object savepoint) throws TransactionException {
    	ConnectionHolder conHolder = getConnectionHolderForSavepoint();
    	try {
    		conHolder.getConnection().rollback((Savepoint) savepoint);
    		conHolder.resetRollbackOnly();
    	} catch (Throwable ex) {
    		throw new TransactionSystemException("Could not roll back to JDBC savepoint", ex);
    	}
    }
    // 释放保存点
    public void releaseSavepoint(Object savepoint) throws TransactionException {
    	ConnectionHolder conHolder = getConnectionHolderForSavepoint();
    	try {
    		conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
    	} catch (Throwable ex) {
    		logger.debug("Could not explicitly release JDBC savepoint", ex);
    	}
    }
    
    // 事务回滚
    protected void doRollback(DefaultTransactionStatus status) {
    	DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    	Connection con = txObject.getConnectionHolder().getConnection();
    	try {
    		con.rollback();
    	} catch (SQLException ex) {
    		throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    	}
    }
    
    private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    	status.setCompleted();
    	if (status.isNewSynchronization()) {
    		TransactionSynchronizationManager.clear();
    	}
    	if (status.isNewTransaction()) {
    		// 恢复连接的事务属性,比如自动提交方式、隔离级别、只读属性等
    		// 将连接归还给数据源,清除ConnectionHolder的conn
    		doCleanupAfterCompletion(status.getTransaction());
    	}
    	if (status.getSuspendedResources() != null) {
    		// 恢复之前挂起的事务
    		Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
    		resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    事务清理

    private void restoreThreadLocalStatus() {
    	// Use stack to restore old transaction TransactionInfo.
    	// Will be null if none was set.
    	transactionInfoHolder.set(this.oldTransactionInfo);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    事务提交

    protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    	if (txInfo != null && txInfo.getTransactionStatus() != null) {
    		txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    提交:

    public final void commit(TransactionStatus status) throws TransactionException {
    	if (status.isCompleted()) {
    		throw new IllegalTransactionStateException("Transaction is already completed");
    	}
    
    	DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    	if (defStatus.isLocalRollbackOnly()) {
    		// Transactional code has requested rollback
    		// 回滚
    		processRollback(defStatus, false);
    		return;
    	}
    
    	if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
    		// Global transaction is marked as rollback-only but transactional code requested commit
    		// 回滚
    		processRollback(defStatus, true);
    		return;
    	}
    
    	// 提交事务
    	processCommit(defStatus);
    }
    
    // 提交事务
    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    	try {
    		boolean beforeCompletionInvoked = false;
    
    		try {
    			boolean unexpectedRollback = false;
    			prepareForCommit(status);
    			triggerBeforeCommit(status);
    			triggerBeforeCompletion(status);
    			beforeCompletionInvoked = true;
    
    			if (status.hasSavepoint()) {
    				unexpectedRollback = status.isGlobalRollbackOnly();
    				// 释放保存点
    				status.releaseHeldSavepoint();
    			} else if (status.isNewTransaction()) {
    				unexpectedRollback = status.isGlobalRollbackOnly();
    				// 提交事务
    				doCommit(status);
    			} else if (isFailEarlyOnGlobalRollbackOnly()) {
    				unexpectedRollback = status.isGlobalRollbackOnly();
    			}
    
    			// Throw UnexpectedRollbackException if we have a global rollback-only
    			// marker but still didn't get a corresponding exception from commit.
    			if (unexpectedRollback) {
    				throw new UnexpectedRollbackException(
    						"Transaction silently rolled back because it has been marked as rollback-only");
    			}
    		} catch (UnexpectedRollbackException ex) {
    			// can only be caused by doCommit
    			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
    			throw ex;
    		} catch (TransactionException ex) {
    			// can only be caused by doCommit
    			if (isRollbackOnCommitFailure()) {
    				doRollbackOnCommitException(status, ex);
    			} else {
    				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
    			}
    			throw ex;
    		} catch (RuntimeException | Error ex) {
    			if (!beforeCompletionInvoked) {
    				triggerBeforeCompletion(status);
    			}
    			doRollbackOnCommitException(status, ex);
    			throw ex;
    		}
    
    		// Trigger afterCommit callbacks, with an exception thrown there
    		// propagated to callers but the transaction still considered as committed.
    		try {
    			triggerAfterCommit(status);
    		} finally {
    			triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
    		}
    
    	} finally {
    		cleanupAfterCompletion(status);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
  • 相关阅读:
    学会这些python知识,就可以去面试了
    java批量生成海量测试数据及用ChatGPT提示语一键生成的方法
    AOP相关概念总结
    支持CAN FD的Kvaser PCIEcan 4xCAN v2编码: 73-30130-01414-5如何应用?
    Intel RealSense D435i与IMU标定用于vins-fusion
    NIO‘s Sword(思维,取模,推公式)
    QDir类【官翻】
    体育场馆预约小程序,体育馆预约小程序,体育馆预约系统小程序
    flex_dashboard
    用AI打造一个属于自己的歌手,让她C位霸气出道
  • 原文地址:https://blog.csdn.net/xuguofeng2016/article/details/130859243