目录
前段时间,在开发一个功能时,遇到这么一个场景:有一个同步操作A需要在另一个aop的功能B(涉及事务)完成之后,才去调用。简而言之就是A操作需要在一个B事务成功提交之后才去执行,A和B,这时候用到了TransactionSynchronizationManager(这里当然同样可以使用AOP去处理,事务的本质就是aop),这里我使用到了TransactionSynchronizationManager!
阅读本文前提需要读者了解spring事物的基本操作和事务的传播机制:
Spring注解开发(十四)——事务基本原理分析_advicemode_cj_eryue的博客-CSDN博客
Spring事务的隔离级别以及传播_spring事务隔离级别_cj_eryue的博客-CSDN博客
翻译过来就是事务同步管理器,我们可以自定义实现TransactionSynchronization类,来监听Spring的事务操作。可以在事务提交之后,回调TransactionSynchronization类的方法。
我们通过一个简单的demo来看下如何使用:
依赖如下:
- <dependency>
- <groupId>org.springframeworkgroupId>
- <artifactId>spring-contextartifactId>
- <version>5.2.9.RELEASEversion>
- dependency>
- <dependency>
- <groupId>commons-logginggroupId>
- <artifactId>commons-loggingartifactId>
- <version>1.2version>
- dependency>
-
- <dependency>
- <groupId>org.springframeworkgroupId>
- <artifactId>spring-aspectsartifactId>
- <version>5.2.9.RELEASEversion>
- dependency>
- <dependency>
- <groupId>org.springframeworkgroupId>
- <artifactId>spring-jdbcartifactId>
- <version>4.2.2.RELEASEversion>
- dependency>
- <dependency>
- <groupId>mysqlgroupId>
- <artifactId>mysql-connector-javaartifactId>
- <version>8.0.28version>
- dependency>
-
- <dependency>
- <groupId>c3p0groupId>
- <artifactId>c3p0artifactId>
- <version>0.9.1.2version>
- dependency>
- dependencies>
配置类:
- package com.cjian.config;
-
- import com.mchange.v2.c3p0.ComboPooledDataSource;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.ComponentScan;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.jdbc.datasource.DataSourceTransactionManager;
- import org.springframework.transaction.PlatformTransactionManager;
- import org.springframework.transaction.annotation.EnableTransactionManagement;
-
- import javax.sql.DataSource;
- import java.beans.PropertyVetoException;
-
- /**
- * @Author: cjian
- * @Date: 2023/9/25 10:58
- * @Des:
- */
- @Configuration
- @ComponentScan("com.cjian.tx")
- @EnableTransactionManagement
- public class TxConfig {
- @Bean
- public DataSource dataSource() throws PropertyVetoException {
- ComboPooledDataSource dataSource = new ComboPooledDataSource();
- dataSource.setUser("root");
- dataSource.setPassword("111111");
- dataSource.setDriverClass("com.mysql.cj.jdbc.Driver");
- dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/test");
- return dataSource;
- }
-
- @Bean
- public JdbcTemplate jdbcTemplate() throws PropertyVetoException {
- return new JdbcTemplate(dataSource());
- }
-
- @Bean
- public PlatformTransactionManager platformTransactionManager() throws PropertyVetoException {
- return new DataSourceTransactionManager(dataSource());
- }
- }
Dao
- package com.cjian.tx;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.stereotype.Repository;
- import org.springframework.transaction.annotation.Propagation;
- import org.springframework.transaction.annotation.Transactional;
-
- import java.util.List;
- import java.util.Map;
- import java.util.UUID;
-
- /**
- * @Author: cjian
- * @Date: 2023/9/25 11:05
- * @Des:
- */
- @Repository
- public class UserDao {
- @Autowired
- private JdbcTemplate jdbcTemplate;
-
- public void insertUser() {
- String sql = "insert into person(name,age) values(?,?)";
- String name = UUID.randomUUID().toString().substring(0, 5);
- jdbcTemplate.update(sql, name, 30);
- }
-
- // 这里我通过开启一个新的事务,来模拟两个不同事务
- @Transactional(propagation = Propagation.REQUIRES_NEW)
- public void findUser() {
- String sql = "select * from person where age = 30";
- List
- System.out.println(maps.size());
- }
-
- }
Service
- package com.cjian.tx;
-
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
- import org.springframework.transaction.support.TransactionSynchronizationAdapter;
- import org.springframework.transaction.support.TransactionSynchronizationManager;
-
- /**
- * @Author: cjian
- * @Date: 2023/9/25 11:58
- * @Des:
- */
- @Service("userService")
- public class UserService {
- @Autowired
- private UserDao userDao;
-
- @Transactional
- public void inertUser() {
- userDao.insertUser();
- System.out.println("插入成功!");
- if (TransactionSynchronizationManager.isSynchronizationActive()) {
- TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
- @Override
- public void beforeCommit(boolean readOnly) {
- userDao.findUser();
- }
-
- @Override
- public void afterCommit() {
- userDao.findUser();
- }
- });
- }
- }
- }
测试类
- private static AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(
- TxConfig.class);
-
- public static void main(String[] args) {
- UserService userService = applicationContext.getBean(UserService.class);
- userService.inertUser();
- applicationContext.close();
-
- }
输入结果为:
- 插入成功!
- 0
- 1
事务提交前输出0:是因为findUser开启了一个新的事务(如果不加上新的事务,由spring的事务传播机制可知insertUser和findUser处在同一个事务里,同一个事务里的新增对后续的查是可见的)。
事务提交后输出1:在insertUser的事务提交后才去查询
这里顺带分析下jdbcTemplate的事务原理,以及TransactionSynchronizationManager在源码中的使用。
先看下TransactionSynchronizationManager 的一些属性:
- public abstract class TransactionSynchronizationManager {
-
- //线程上下文中保存着【线程池对象:ConnectionHolder】的Map对象。线程可以通过该属性获取到同一个Connection对象。
- private static final ThreadLocal
-
- //事务同步器,是Spring交由程序员进行扩展的代码,每个线程可以注册N个事务同步器。
- private static final ThreadLocal
> synchronizations = new NamedThreadLocal<>("Transaction synchronizations"); -
- // 事务的名称
- private static final ThreadLocal
currentTransactionName = new NamedThreadLocal<>("Current transaction name"); - // 事务是否是只读
- private static final ThreadLocal
currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status"); - // 事务的隔离级别
- private static final ThreadLocal
currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level"); - // 事务是否开启 actual:真实的
- private static final ThreadLocal
actualTransactionActive = new NamedThreadLocal<>("Actual transaction active"); -
- public static void bindResource(Object key, Object value) throws IllegalStateException {
- Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
- Assert.notNull(value, "Value must not be null");
- Map
- if (map == null) {
- map = new HashMap();
- resources.set(map);
- }
-
- //将Connection对象绑定到resources 上。
- Object oldValue = ((Map)map).put(actualKey, value);
- if (oldValue instanceof ResourceHolder && ((ResourceHolder)oldValue).isVoid()) {
- oldValue = null;
- }
-
- if (oldValue != null) {
- throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
- } else {
- if (logger.isTraceEnabled()) {
- logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]");
- }
-
- }
- }
- }
resources 用来存储数据库连接,真的吗?来跟下DataSourceTransactionManager链路
- @Bean
- public PlatformTransactionManager platformTransactionManager() throws
- PropertyVetoException {
- return new DataSourceTransactionManager(dataSource());
- }
- public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
- //...
- protected void doBegin(Object transaction, TransactionDefinition definition) {
- DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
- Connection con = null;
-
- try {
- if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
- Connection newCon = this.obtainDataSource().getConnection();
- if (this.logger.isDebugEnabled()) {
- this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
- }
-
- txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
- }
-
- txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
- con = txObject.getConnectionHolder().getConnection();
- Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
- txObject.setPreviousIsolationLevel(previousIsolationLevel);
- if (con.getAutoCommit()) {
- txObject.setMustRestoreAutoCommit(true);
- if (this.logger.isDebugEnabled()) {
- this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
- }
- //关闭自动提交,转交由事务管理器控制,也就是交给spring的aop来控制
- con.setAutoCommit(false);
- }
-
- this.prepareTransactionalConnection(con, definition);
- txObject.getConnectionHolder().setTransactionActive(true);
- int timeout = this.determineTimeout(definition);
- if (timeout != -1) {
- txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
- }
-
- //将Connection对象绑定到事务管理器中的Thread中
- if (txObject.isNewConnectionHolder()) {
- TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
- }
-
- } catch (Throwable var7) {
- if (txObject.isNewConnectionHolder()) {
- DataSourceUtils.releaseConnection(con, this.obtainDataSource());
- txObject.setConnectionHolder((ConnectionHolder)null, false);
- }
-
- throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
- }
- }
- //...
- }
-
- public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
- //...
- public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
- Object transaction = this.doGetTransaction();
- boolean debugEnabled = this.logger.isDebugEnabled();
- if (definition == null) {
- definition = new DefaultTransactionDefinition();
- }
-
- if (this.isExistingTransaction(transaction)) {
- return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);
- } else if (((TransactionDefinition)definition).getTimeout() < -1) {
- throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout());
- } else if (((TransactionDefinition)definition).getPropagationBehavior() == 2) {
- throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
- } else if (((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) {
- if (((TransactionDefinition)definition).getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
- this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition);
- }
-
- boolean newSynchronization = this.getTransactionSynchronization() == 0;
- return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
- } else {
- AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
- if (debugEnabled) {
- this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition);
- }
-
- try {
- boolean newSynchronization = this.getTransactionSynchronization() != 2;
- DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
- //重点方法
- this.doBegin(transaction, (TransactionDefinition)definition);
- //重点方法
- this.prepareSynchronization(status, (TransactionDefinition)definition);
- return status;
- } catch (Error | RuntimeException var7) {
- this.resume((Object)null, suspendedResources);
- throw var7;
- }
- }
- }
-
- protected abstract void doBegin(Object var1, TransactionDefinition var2) throws TransactionException;
-
- protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
- if (status.isNewSynchronization()) {
- TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
- TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != -1 ? definition.getIsolationLevel() : null);
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
- TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
- TransactionSynchronizationManager.initSynchronization();
- }
-
- }
- //...
- }
-
- //PlatformTransactionManager是事务的核心接口
- public interface PlatformTransactionManager {
- TransactionStatus getTransaction(@Nullable TransactionDefinition var1) throws TransactionException;
-
- void commit(TransactionStatus var1) throws TransactionException;
-
- void rollback(TransactionStatus var1) throws TransactionException;
- }
可以看到,数据连接都会绑定到TransactionSynchronizationManager的resources属性上。
我们通过调用TransactionSynchronizationManager的如下方法,将自定义的事务同步方法添加到synchronizations
- public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException {
- Assert.notNull(synchronization, "TransactionSynchronization must not be null");
- if (!isSynchronizationActive()) {
- throw new IllegalStateException("Transaction synchronization is not active");
- } else {
- ((Set)synchronizations.get()).add(synchronization);
- }
- }
至于synchronizations的执行时机,可将断点打在getSynchronizations()方法上,一看便知,是在事务提交前/后,遍历执行synchronizations中对应的事务同步方法。
类似于bean的后置处理器的涉及思想。
- public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
- // ...
-
- private void processCommit(DefaultTransactionStatus status) throws TransactionException {
- try {
-
- try {
-
- this.prepareForCommit(status);
- this.triggerBeforeCommit(status);
- this.triggerBeforeCompletion(status);
- //...
- this.doCommit(status);
- // ...
-
- try {
- this.triggerAfterCommit(status);
- } finally {
- this.triggerAfterCompletion(status, 0);
- }
- } finally {
- this.cleanupAfterCompletion(status);
- }
-
- }
- // ...
- }
总结:
TransactionSynchronizationManager.registerSynchronization()的原理就是通过threadLocal获取到当前线程所持有的的dataResource,注册一个乃至多个事务同步方法,在事务提交前后分别执行对应的事务同步方法。