• 聊聊spring事务的REQUIRES_NEW


    本文主要研究一下spring事务的REQUIRES_NEW

    TransactionDefinition

    org/springframework/transaction/TransactionDefinition.java

    	/**
    	 * Create a new transaction, suspending the current transaction if one exists.
    	 * Analogous to the EJB transaction attribute of the same name.
    	 * 

    NOTE: Actual transaction suspension will not work out-of-the-box * on all transaction managers. This in particular applies to * {@link org.springframework.transaction.jta.JtaTransactionManager}, * which requires the {@code javax.transaction.TransactionManager} to be * made available it to it (which is server-specific in standard Java EE). *

    A {@code PROPAGATION_REQUIRES_NEW} scope always defines its own * transaction synchronizations. Existing synchronizations will be suspended * and resumed appropriately. * @see org.springframework.transaction.jta.JtaTransactionManager#setTransactionManager */ int PROPAGATION_REQUIRES_NEW = 3;

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    PROPAGATION_REQUIRES_NEW在有事务的场景下会suspend当前事务,然后创建新事务

    AbstractPlatformTransactionManager

    org/springframework/transaction/support/AbstractPlatformTransactionManager.java

    		if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
    			if (debugEnabled) {
    				logger.debug("Suspending current transaction, creating new transaction with name [" +
    						definition.getName() + "]");
    			}
    			SuspendedResourcesHolder suspendedResources = suspend(transaction);
    			try {
    				boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    				DefaultTransactionStatus status = newTransactionStatus(
    						definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    				doBegin(transaction, definition);
    				prepareSynchronization(status, definition);
    				return status;
    			}
    			catch (RuntimeException | Error beginEx) {
    				resumeAfterBeginException(transaction, suspendedResources, beginEx);
    				throw beginEx;
    			}
    		}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    handleExistingTransaction方法在判断是PROPAGATION_REQUIRES_NEW,会执行suspend方法,然后newTransactionStatus,执行doBegin及prepareSynchronization

    suspend

    org/springframework/transaction/support/AbstractPlatformTransactionManager.java

    	/**
    	 * Suspend the given transaction. Suspends transaction synchronization first,
    	 * then delegates to the {@code doSuspend} template method.
    	 * @param transaction the current transaction object
    	 * (or {@code null} to just suspend active synchronizations, if any)
    	 * @return an object that holds suspended resources
    	 * (or {@code null} if neither transaction nor synchronization active)
    	 * @see #doSuspend
    	 * @see #resume
    	 */
    	@Nullable
    	protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    		if (TransactionSynchronizationManager.isSynchronizationActive()) {
    			List suspendedSynchronizations = doSuspendSynchronization();
    			try {
    				Object suspendedResources = null;
    				if (transaction != null) {
    					suspendedResources = doSuspend(transaction);
    				}
    				String name = TransactionSynchronizationManager.getCurrentTransactionName();
    				TransactionSynchronizationManager.setCurrentTransactionName(null);
    				boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
    				TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
    				Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
    				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
    				boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
    				TransactionSynchronizationManager.setActualTransactionActive(false);
    				return new SuspendedResourcesHolder(
    						suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
    			}
    			catch (RuntimeException | Error ex) {
    				// doSuspend failed - original transaction is still active...
    				doResumeSynchronization(suspendedSynchronizations);
    				throw ex;
    			}
    		}
    		else if (transaction != null) {
    			// Transaction active but no synchronization active.
    			Object suspendedResources = doSuspend(transaction);
    			return new SuspendedResourcesHolder(suspendedResources);
    		}
    		else {
    			// Neither transaction nor synchronization active.
    			return null;
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    suspend方法主要是执行doSuspendSynchronization方法返回suspendedSynchronizations,执行doSuspend返回suspendedResources,最后根据这两个创建SuspendedResourcesHolder

    doSuspendSynchronization

    	/**
    	 * Suspend all current synchronizations and deactivate transaction
    	 * synchronization for the current thread.
    	 * @return the List of suspended TransactionSynchronization objects
    	 */
    	private List doSuspendSynchronization() {
    		List suspendedSynchronizations =
    				TransactionSynchronizationManager.getSynchronizations();
    		for (TransactionSynchronization synchronization : suspendedSynchronizations) {
    			synchronization.suspend();
    		}
    		TransactionSynchronizationManager.clearSynchronization();
    		return suspendedSynchronizations;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    doSuspendSynchronization这个遍历suspendedSynchronizations,挨个执行suspend,然后clearSynchronization

    ResourceHolderSynchronization

    org/springframework/transaction/support/ResourceHolderSynchronization.java

    	public void suspend() {
    		if (this.holderActive) {
    			TransactionSynchronizationManager.unbindResource(this.resourceKey);
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5

    ResourceHolderSynchronization的suspend执行的是TransactionSynchronizationManager.unbindResource

    unbindResource

    org/springframework/transaction/support/TransactionSynchronizationManager.java

    	/**
    	 * Unbind a resource for the given key from the current thread.
    	 * @param key the key to unbind (usually the resource factory)
    	 * @return the previously bound value (usually the active resource object)
    	 * @throws IllegalStateException if there is no value bound to the thread
    	 * @see ResourceTransactionManager#getResourceFactory()
    	 */
    	public static Object unbindResource(Object key) throws IllegalStateException {
    		Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    		Object value = doUnbindResource(actualKey);
    		if (value == null) {
    			throw new IllegalStateException(
    					"No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    		}
    		return value;
    	}
    
    	/**
    	 * Actually remove the value of the resource that is bound for the given key.
    	 */
    	@Nullable
    	private static Object doUnbindResource(Object actualKey) {
    		Map map = resources.get();
    		if (map == null) {
    			return null;
    		}
    		Object value = map.remove(actualKey);
    		// Remove entire ThreadLocal if empty...
    		if (map.isEmpty()) {
    			resources.remove();
    		}
    		// Transparently suppress a ResourceHolder that was marked as void...
    		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
    			value = null;
    		}
    		if (value != null && logger.isTraceEnabled()) {
    			logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
    					Thread.currentThread().getName() + "]");
    		}
    		return value;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    unbindResource主要是执行doUnbindResource,从resources中移除

    cleanupAfterCompletion

    org/springframework/transaction/support/AbstractPlatformTransactionManager.java

    	/**
    	 * Clean up after completion, clearing synchronization if necessary,
    	 * and invoking doCleanupAfterCompletion.
    	 * @param status object representing the transaction
    	 * @see #doCleanupAfterCompletion
    	 */
    	private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    		status.setCompleted();
    		if (status.isNewSynchronization()) {
    			TransactionSynchronizationManager.clear();
    		}
    		if (status.isNewTransaction()) {
    			doCleanupAfterCompletion(status.getTransaction());
    		}
    		if (status.getSuspendedResources() != null) {
    			if (status.isDebug()) {
    				logger.debug("Resuming suspended transaction after completion of inner transaction");
    			}
    			Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
    			resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    在内嵌事务执行完之后,会判断是否有suspendedResources,如果有则执行resume,恢复之前suspend的事务

    resume

    org/springframework/transaction/support/AbstractPlatformTransactionManager.java

    	/**
    	 * Resume the given transaction. Delegates to the {@code doResume}
    	 * template method first, then resuming transaction synchronization.
    	 * @param transaction the current transaction object
    	 * @param resourcesHolder the object that holds suspended resources,
    	 * as returned by {@code suspend} (or {@code null} to just
    	 * resume synchronizations, if any)
    	 * @see #doResume
    	 * @see #suspend
    	 */
    	protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
    			throws TransactionException {
    
    		if (resourcesHolder != null) {
    			Object suspendedResources = resourcesHolder.suspendedResources;
    			if (suspendedResources != null) {
    				doResume(transaction, suspendedResources);
    			}
    			List suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
    			if (suspendedSynchronizations != null) {
    				TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
    				TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
    				TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
    				TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
    				doResumeSynchronization(suspendedSynchronizations);
    			}
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    resume方法执行doResume,然后恢复之前的TransactionSynchronizationManager的一些设置

    doResume

    org/springframework/orm/jpa/JpaTransactionManager.java

    	@Override
    	protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    		SuspendedResourcesHolder resourcesHolder = (SuspendedResourcesHolder) suspendedResources;
    		TransactionSynchronizationManager.bindResource(
    				obtainEntityManagerFactory(), resourcesHolder.getEntityManagerHolder());
    		if (getDataSource() != null && resourcesHolder.getConnectionHolder() != null) {
    			TransactionSynchronizationManager.bindResource(getDataSource(), resourcesHolder.getConnectionHolder());
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    doResume这里就是给bind回来

    小结

    spring事务的REQUIRES_NEW传播级别的实现就是对当前事务进行suspend,底层是unbind,然后创建新事务,执行完毕判断是否有suspend的事务,有则执行resume,底层是bind。具体对于mysql来讲,它不感知这些嵌套事务,它先接收到的是内嵌的新事务的sql,然后提交,最后接收到了外层resume回来的事务。

  • 相关阅读:
    阿里巴巴中国站上传图片到1688 API 返回值说明
    加密算法总结
    Linux下进程地址空间初步理解
    二、进程管理(四)经典同步互斥问题
    Spark3 AQE (Adaptive Query Execution) 一文搞懂 新特性
    晨控CK-GW06系列网关与汇川可编程控制器MOSBUSTCP通讯手册
    KMS在腾讯云的微服务实践助力其降本50%
    GNU Radio 教程
    知识图谱从入门到应用——知识图谱的存储与查询:基于原生图数据库的知识图谱存储
    MVCC中的可见性算法
  • 原文地址:https://blog.csdn.net/hello_ejb3/article/details/132890549