

public interface PlatformTransactionManager {
// 获取事务状态
TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
// 事务提交
void commit(TransactionStatus status) throws TransactionException;
// 事务回滚
void rollback(TransactionStatus status) throws TransactionException;
}

AbstractPlatformTransactionManager.getTransaction()
@Override
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
// 如果当前已经存在事务
if (isExistingTransaction(transaction)) {
// 根据不同传播机制不同处理
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// 超时不能小于默认值
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// 当前不存在事务,传播机制=MANDATORY(支持当前事务,没事务报错),报错
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}// 当前不存在事务,传播机制=REQUIRED/REQUIRED_NEW/NESTED,这三种情况,需要新开启事务,且加上事务同步
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {// 是否需要新开启同步// 开启// 开启
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);// 开启新事务
prepareSynchronization(status, definition);//预备同步
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
else {
// 当前不存在事务当前不存在事务,且传播机制=PROPAGATION_SUPPORTS/PROPAGATION_NOT_SUPPORTED/PROPAGATION_NEVER,这三种情况,创建“空”事务:没有实际事务,但可能是同步。警告:定义了隔离级别,但并没有真实的事务初始化,隔离级别被忽略有隔离级别但是并没有定义实际的事务初始化,有隔离级别但是并没有定义实际的事务初始化,
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
handleExistingTransaction()
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 1.NERVER(不支持当前事务;如果当前事务存在,抛出异常)报错
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// 2.NOT_SUPPORTED(不支持当前事务,现有同步将被挂起)挂起当前事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 3.REQUIRES_NEW挂起当前事务,创建新事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}// 挂起当前事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {// 创建新事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
catch (Error beginErr) {
resumeAfterBeginException(transaction, suspendedResources, beginErr);
throw beginErr;
}
}
// 4.NESTED嵌套事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}// 是否支持保存点:非JTA事务走这个分支。AbstractPlatformTransactionManager默认是true,JtaTransactionManager复写了该方法false,DataSourceTransactionmanager没有复写,还是true,
if (useSavepointForNestedTransaction()) {
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();// 创建保存点
return status;
}
else {
// JTA事务走这个分支,创建新事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}// 到这里PROPAGATION_SUPPORTS 或 PROPAGATION_REQUIRED或PROPAGATION_MANDATORY,存在事务加入事务即可,prepareTransactionStatus第三个参数就是是否需要新事务。false代表不需要新事物
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
suspend() 挂起当前事务
protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {// 1.当前存在同步,
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {// 事务不为空,挂起事务
suspendedResources = doSuspend(transaction);
}// 解除绑定当前事务各种属性:名称、只读、隔离级别、是否是真实的事务.
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException ex) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
catch (Error err) {
// doSuspend failed - original transaction is still active...
doResumeSynchronization(suspendedSynchronizations);
throw err;
}
}// 2.没有同步但,事务不为空,挂起事务
else if (transaction != null) {
// Transaction active but no synchronization active.
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}// 2.没有同步但,事务为空,什么都不用做
else {
// Neither transaction nor synchronization active.
return null;
}
}
DataSourceTransactionManager.doSuspend():
protected Object doSuspend(Object transaction) {
DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
txObject.setConnectionHolder((ConnectionHolder)null);
return TransactionSynchronizationManager.unbindResource(this.obtainDataSource());
}
TransactionSynchronizationManager 为事务同步管理器, 维护了多个ThreadLocal

DataSourceTransactionManager.doBegin(): 开启新事务的准备工作

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {// 如果事务还没有connection或者connection在事务同步状态,重置新的connectionHolder
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = this.dataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}// 重置新的connectionHolder
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
//设置新的连接为事务同步中
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
//conn设置事务隔离级别,只读
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);//DataSourceTransactionObject设置事务隔离级别
// 如果是自动提交切换到手动提交
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 如果只读,执行sql设置事务只读
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);// 设置connection持有者的事务开启状态
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);// 设置超时秒数
}
// 绑定connection持有者到当前线程
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, this.dataSource);
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
SqlSessionSynchronization是SqlSessionUtil 的一个内部类
继承自TransactionSynchronizationAdapter抽象类,实现了事务同步接口TransactionSynchronization

TransactionSynchronization 定义了事务操作时对应的资源的管理方法


AbstractPlatformTransactionManager.commit()
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 如果事务已完结,报错无法再次提交
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果事务明确标记为回滚
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 执行回滚
processRollback(defStatus, false);
return;
}
// 如果不需要全局回滚时提交 且 全局回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 执行回滚
processRollback(defStatus, true);
return;
}
// 执行提交事务
processCommit(defStatus);
}
processCommit()
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 3个前置操作
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// 1.有保持点, 为嵌套事务
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 释放保存点
status.releaseHeldSavepoint();
}
// 2.新事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 提交事务
doCommit(status);
}
// 非新事务, 且全局回滚失败的话, 但是提交时候没有异常, 抛出异常
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// 触发完成后事务同步,状态为回滚
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
// 事务异常
catch (TransactionException ex) {
// 提交失败回滚
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
// 触发完成后回调,事务同步状态为未知
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
// 运行时异常
catch (RuntimeException | Error ex) {
// 如果3个前置步骤未完成,调用前置的最后一步操作
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 提交异常回滚
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
prepareForCommit

triggerBeforeCommit


triggerBeforeCompletion


TransactionSynchronizationManager定义了多个ThreadLocal, 线程本地变量
遍历事务同步器,把每个事务同步器都执行“提交前”操作。
triggerAfterCommit



TransactionSynchronizationAdapter中复写过, SqlSessionSynchronization 继承TransactionSynchronizationAdapter

triggerAfterCompletion



afterCompletion:
cleanupAfterCompletion

设置事务状态为已完成。
如果是新的事务同步,解绑当前线程绑定的数据库资源,重置数据库连接
如果存在挂起的事务(嵌套事务),唤醒挂起的老事务的各种资源:数据库资源、同步器。

DataSourceTransactionManager.doCleanupAfterCompletion()
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 如果是最新的连接持有者,解绑当前线程绑定的<数据库资源,ConnectionHolder>
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(this.dataSource);
}
// 重置数据库连接(隔离级别、只读)
Connection con = txObject.getConnectionHolder().getConnection();
try {
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
DataSourceUtils.resetConnectionAfterTransaction(con, txObject.getPreviousIsolationLevel());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}// 资源引用计数-1,关闭数据库连接
DataSourceUtils.releaseConnection(con, this.dataSource);
}
// 重置连接持有者的全部属性
txObject.getConnectionHolder().clear();
}

AbstractPlatformTransactionManager.rollback()
public final void rollback(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
processRollback(defStatus);
}
processRollback()
private void processRollback(DefaultTransactionStatus status) {
try {
try {
// 解绑当前线程绑定的会话工厂,并关闭会话
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
// 1.如果有保存点,即嵌套式事务
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
//回滚到保存点
status.rollbackToHeldSavepoint();
}
//2.如果就是一个简单事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
//回滚核心方法
doRollback(status);
}
//3.当前存在事务且没有保存点,即加入当前事务的
else if (status.hasTransaction()) {
//如果已经标记为回滚 或 当加入事务失败时全局回滚(默认true)
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
//debug时会打印:加入事务失败-标记已存在事务为回滚
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 设置当前connectionHolder:当加入一个已存在事务时回滚
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
}
catch (RuntimeException ex)
{
// 关闭会话,重置SqlSessionHolder属性
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
catch (Error err) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw err;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
finally {
// 解绑当前线程
cleanupAfterCompletion(status);
}
}
DataSourceTransactionManager.doRollback()
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}