• TransactionSynchronizationManager的使用场景以及原理分析


    目录

    一、是什么

    二、如何使用

    三、原理分析


    前段时间,在开发一个功能时,遇到这么一个场景:有一个同步操作A需要在另一个aop的功能B(涉及事务)完成之后,才去调用。简而言之就是A操作需要在一个B事务成功提交之后才去执行,A和B,这时候用到了TransactionSynchronizationManager(这里当然同样可以使用AOP去处理,事务的本质就是aop),这里我使用到了TransactionSynchronizationManager!

     阅读本文前提需要读者了解spring事物的基本操作和事务的传播机制:

    Spring注解开发(十四)——事务基本原理分析_advicemode_cj_eryue的博客-CSDN博客

    Spring事务的隔离级别以及传播_spring事务隔离级别_cj_eryue的博客-CSDN博客

    一、是什么

    翻译过来就是事务同步管理器,我们可以自定义实现TransactionSynchronization类,来监听Spring的事务操作。可以在事务提交之后,回调TransactionSynchronization类的方法。
     

    二、如何使用

    我们通过一个简单的demo来看下如何使用:

    依赖如下:

    1. <dependency>
    2. <groupId>org.springframeworkgroupId>
    3. <artifactId>spring-contextartifactId>
    4. <version>5.2.9.RELEASEversion>
    5. dependency>
    6. <dependency>
    7. <groupId>commons-logginggroupId>
    8. <artifactId>commons-loggingartifactId>
    9. <version>1.2version>
    10. dependency>
    11. <dependency>
    12. <groupId>org.springframeworkgroupId>
    13. <artifactId>spring-aspectsartifactId>
    14. <version>5.2.9.RELEASEversion>
    15. dependency>
    16. <dependency>
    17. <groupId>org.springframeworkgroupId>
    18. <artifactId>spring-jdbcartifactId>
    19. <version>4.2.2.RELEASEversion>
    20. dependency>
    21. <dependency>
    22. <groupId>mysqlgroupId>
    23. <artifactId>mysql-connector-javaartifactId>
    24. <version>8.0.28version>
    25. dependency>
    26. <dependency>
    27. <groupId>c3p0groupId>
    28. <artifactId>c3p0artifactId>
    29. <version>0.9.1.2version>
    30. dependency>
    31. dependencies>

    配置类:

    1. package com.cjian.config;
    2. import com.mchange.v2.c3p0.ComboPooledDataSource;
    3. import org.springframework.context.annotation.Bean;
    4. import org.springframework.context.annotation.ComponentScan;
    5. import org.springframework.context.annotation.Configuration;
    6. import org.springframework.jdbc.core.JdbcTemplate;
    7. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    8. import org.springframework.transaction.PlatformTransactionManager;
    9. import org.springframework.transaction.annotation.EnableTransactionManagement;
    10. import javax.sql.DataSource;
    11. import java.beans.PropertyVetoException;
    12. /**
    13. * @Author: cjian
    14. * @Date: 2023/9/25 10:58
    15. * @Des:
    16. */
    17. @Configuration
    18. @ComponentScan("com.cjian.tx")
    19. @EnableTransactionManagement
    20. public class TxConfig {
    21. @Bean
    22. public DataSource dataSource() throws PropertyVetoException {
    23. ComboPooledDataSource dataSource = new ComboPooledDataSource();
    24. dataSource.setUser("root");
    25. dataSource.setPassword("111111");
    26. dataSource.setDriverClass("com.mysql.cj.jdbc.Driver");
    27. dataSource.setJdbcUrl("jdbc:mysql://localhost:3306/test");
    28. return dataSource;
    29. }
    30. @Bean
    31. public JdbcTemplate jdbcTemplate() throws PropertyVetoException {
    32. return new JdbcTemplate(dataSource());
    33. }
    34. @Bean
    35. public PlatformTransactionManager platformTransactionManager() throws PropertyVetoException {
    36. return new DataSourceTransactionManager(dataSource());
    37. }
    38. }

    Dao

    1. package com.cjian.tx;
    2. import org.springframework.beans.factory.annotation.Autowired;
    3. import org.springframework.jdbc.core.JdbcTemplate;
    4. import org.springframework.stereotype.Repository;
    5. import org.springframework.transaction.annotation.Propagation;
    6. import org.springframework.transaction.annotation.Transactional;
    7. import java.util.List;
    8. import java.util.Map;
    9. import java.util.UUID;
    10. /**
    11. * @Author: cjian
    12. * @Date: 2023/9/25 11:05
    13. * @Des:
    14. */
    15. @Repository
    16. public class UserDao {
    17. @Autowired
    18. private JdbcTemplate jdbcTemplate;
    19. public void insertUser() {
    20. String sql = "insert into person(name,age) values(?,?)";
    21. String name = UUID.randomUUID().toString().substring(0, 5);
    22. jdbcTemplate.update(sql, name, 30);
    23. }
    24. // 这里我通过开启一个新的事务,来模拟两个不同事务
    25. @Transactional(propagation = Propagation.REQUIRES_NEW)
    26. public void findUser() {
    27. String sql = "select * from person where age = 30";
    28. List> maps = jdbcTemplate.queryForList(sql);
    29. System.out.println(maps.size());
    30. }
    31. }

    Service

    1. package com.cjian.tx;
    2. import org.springframework.beans.factory.annotation.Autowired;
    3. import org.springframework.stereotype.Service;
    4. import org.springframework.transaction.annotation.Transactional;
    5. import org.springframework.transaction.support.TransactionSynchronizationAdapter;
    6. import org.springframework.transaction.support.TransactionSynchronizationManager;
    7. /**
    8. * @Author: cjian
    9. * @Date: 2023/9/25 11:58
    10. * @Des:
    11. */
    12. @Service("userService")
    13. public class UserService {
    14. @Autowired
    15. private UserDao userDao;
    16. @Transactional
    17. public void inertUser() {
    18. userDao.insertUser();
    19. System.out.println("插入成功!");
    20. if (TransactionSynchronizationManager.isSynchronizationActive()) {
    21. TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
    22. @Override
    23. public void beforeCommit(boolean readOnly) {
    24. userDao.findUser();
    25. }
    26. @Override
    27. public void afterCommit() {
    28. userDao.findUser();
    29. }
    30. });
    31. }
    32. }
    33. }

    测试类 

    1. private static AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(
    2. TxConfig.class);
    3. public static void main(String[] args) {
    4. UserService userService = applicationContext.getBean(UserService.class);
    5. userService.inertUser();
    6. applicationContext.close();
    7. }

    输入结果为:

    1. 插入成功!
    2. 0
    3. 1

    事务提交前输出0:是因为findUser开启了一个新的事务(如果不加上新的事务,由spring的事务传播机制可知insertUser和findUser处在同一个事务里,同一个事务里的新增对后续的查是可见的)。

    事务提交后输出1:在insertUser的事务提交后才去查询

    三、原理分析

    这里顺带分析下jdbcTemplate的事务原理,以及TransactionSynchronizationManager在源码中的使用。

    先看下TransactionSynchronizationManager 的一些属性:

    1. public abstract class TransactionSynchronizationManager {
    2. //线程上下文中保存着【线程池对象:ConnectionHolder】的Map对象。线程可以通过该属性获取到同一个Connection对象。
    3. private static final ThreadLocal> resources = new NamedThreadLocal<>("Transactional resources");
    4. //事务同步器,是Spring交由程序员进行扩展的代码,每个线程可以注册N个事务同步器。
    5. private static final ThreadLocal> synchronizations = new NamedThreadLocal<>("Transaction synchronizations");
    6. // 事务的名称
    7. private static final ThreadLocal currentTransactionName = new NamedThreadLocal<>("Current transaction name");
    8. // 事务是否是只读
    9. private static final ThreadLocal currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");
    10. // 事务的隔离级别
    11. private static final ThreadLocal currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");
    12. // 事务是否开启 actual:真实的
    13. private static final ThreadLocal actualTransactionActive = new NamedThreadLocal<>("Actual transaction active");
    14. public static void bindResource(Object key, Object value) throws IllegalStateException {
    15. Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    16. Assert.notNull(value, "Value must not be null");
    17. Map map = (Map)resources.get();
    18. if (map == null) {
    19. map = new HashMap();
    20. resources.set(map);
    21. }
    22. //将Connection对象绑定到resources 上。
    23. Object oldValue = ((Map)map).put(actualKey, value);
    24. if (oldValue instanceof ResourceHolder && ((ResourceHolder)oldValue).isVoid()) {
    25. oldValue = null;
    26. }
    27. if (oldValue != null) {
    28. throw new IllegalStateException("Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    29. } else {
    30. if (logger.isTraceEnabled()) {
    31. logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" + Thread.currentThread().getName() + "]");
    32. }
    33. }
    34. }
    35. }

    resources 用来存储数据库连接,真的吗?来跟下DataSourceTransactionManager链路

    1. @Bean
    2. public PlatformTransactionManager platformTransactionManager() throws
    3. PropertyVetoException {
    4. return new DataSourceTransactionManager(dataSource());
    5. }
    1. public class DataSourceTransactionManager extends AbstractPlatformTransactionManager implements ResourceTransactionManager, InitializingBean {
    2. //...
    3. protected void doBegin(Object transaction, TransactionDefinition definition) {
    4. DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
    5. Connection con = null;
    6. try {
    7. if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
    8. Connection newCon = this.obtainDataSource().getConnection();
    9. if (this.logger.isDebugEnabled()) {
    10. this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
    11. }
    12. txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
    13. }
    14. txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
    15. con = txObject.getConnectionHolder().getConnection();
    16. Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
    17. txObject.setPreviousIsolationLevel(previousIsolationLevel);
    18. if (con.getAutoCommit()) {
    19. txObject.setMustRestoreAutoCommit(true);
    20. if (this.logger.isDebugEnabled()) {
    21. this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
    22. }
    23. //关闭自动提交,转交由事务管理器控制,也就是交给spring的aop来控制
    24. con.setAutoCommit(false);
    25. }
    26. this.prepareTransactionalConnection(con, definition);
    27. txObject.getConnectionHolder().setTransactionActive(true);
    28. int timeout = this.determineTimeout(definition);
    29. if (timeout != -1) {
    30. txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
    31. }
    32. //将Connection对象绑定到事务管理器中的Thread中
    33. if (txObject.isNewConnectionHolder()) {
    34. TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
    35. }
    36. } catch (Throwable var7) {
    37. if (txObject.isNewConnectionHolder()) {
    38. DataSourceUtils.releaseConnection(con, this.obtainDataSource());
    39. txObject.setConnectionHolder((ConnectionHolder)null, false);
    40. }
    41. throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
    42. }
    43. }
    44. //...
    45. }
    46. public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    47. //...
    48. public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    49. Object transaction = this.doGetTransaction();
    50. boolean debugEnabled = this.logger.isDebugEnabled();
    51. if (definition == null) {
    52. definition = new DefaultTransactionDefinition();
    53. }
    54. if (this.isExistingTransaction(transaction)) {
    55. return this.handleExistingTransaction((TransactionDefinition)definition, transaction, debugEnabled);
    56. } else if (((TransactionDefinition)definition).getTimeout() < -1) {
    57. throw new InvalidTimeoutException("Invalid transaction timeout", ((TransactionDefinition)definition).getTimeout());
    58. } else if (((TransactionDefinition)definition).getPropagationBehavior() == 2) {
    59. throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");
    60. } else if (((TransactionDefinition)definition).getPropagationBehavior() != 0 && ((TransactionDefinition)definition).getPropagationBehavior() != 3 && ((TransactionDefinition)definition).getPropagationBehavior() != 6) {
    61. if (((TransactionDefinition)definition).getIsolationLevel() != -1 && this.logger.isWarnEnabled()) {
    62. this.logger.warn("Custom isolation level specified but no actual transaction initiated; isolation level will effectively be ignored: " + definition);
    63. }
    64. boolean newSynchronization = this.getTransactionSynchronization() == 0;
    65. return this.prepareTransactionStatus((TransactionDefinition)definition, (Object)null, true, newSynchronization, debugEnabled, (Object)null);
    66. } else {
    67. AbstractPlatformTransactionManager.SuspendedResourcesHolder suspendedResources = this.suspend((Object)null);
    68. if (debugEnabled) {
    69. this.logger.debug("Creating new transaction with name [" + ((TransactionDefinition)definition).getName() + "]: " + definition);
    70. }
    71. try {
    72. boolean newSynchronization = this.getTransactionSynchronization() != 2;
    73. DefaultTransactionStatus status = this.newTransactionStatus((TransactionDefinition)definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    74. //重点方法
    75. this.doBegin(transaction, (TransactionDefinition)definition);
    76. //重点方法
    77. this.prepareSynchronization(status, (TransactionDefinition)definition);
    78. return status;
    79. } catch (Error | RuntimeException var7) {
    80. this.resume((Object)null, suspendedResources);
    81. throw var7;
    82. }
    83. }
    84. }
    85. protected abstract void doBegin(Object var1, TransactionDefinition var2) throws TransactionException;
    86. protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    87. if (status.isNewSynchronization()) {
    88. TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
    89. TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != -1 ? definition.getIsolationLevel() : null);
    90. TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
    91. TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
    92. TransactionSynchronizationManager.initSynchronization();
    93. }
    94. }
    95. //...
    96. }
    97. //PlatformTransactionManager是事务的核心接口
    98. public interface PlatformTransactionManager {
    99. TransactionStatus getTransaction(@Nullable TransactionDefinition var1) throws TransactionException;
    100. void commit(TransactionStatus var1) throws TransactionException;
    101. void rollback(TransactionStatus var1) throws TransactionException;
    102. }
    可以看到,数据连接都会绑定到TransactionSynchronizationManager的resources属性上。
    

    我们通过调用TransactionSynchronizationManager的如下方法,将自定义的事务同步方法添加到synchronizations

    1. public static void registerSynchronization(TransactionSynchronization synchronization) throws IllegalStateException {
    2. Assert.notNull(synchronization, "TransactionSynchronization must not be null");
    3. if (!isSynchronizationActive()) {
    4. throw new IllegalStateException("Transaction synchronization is not active");
    5. } else {
    6. ((Set)synchronizations.get()).add(synchronization);
    7. }
    8. }

    至于synchronizations的执行时机,可将断点打在getSynchronizations()方法上,一看便知,是在事务提交前/后,遍历执行synchronizations中对应的事务同步方法。

    类似于bean的后置处理器的涉及思想。

    1. public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    2. // ...
    3. private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    4. try {
    5. try {
    6. this.prepareForCommit(status);
    7. this.triggerBeforeCommit(status);
    8. this.triggerBeforeCompletion(status);
    9. //...
    10. this.doCommit(status);
    11. // ...
    12. try {
    13. this.triggerAfterCommit(status);
    14. } finally {
    15. this.triggerAfterCompletion(status, 0);
    16. }
    17. } finally {
    18. this.cleanupAfterCompletion(status);
    19. }
    20. }
    21. // ...
    22. }

    总结:

    TransactionSynchronizationManager.registerSynchronization()的原理就是通过threadLocal获取到当前线程所持有的的dataResource,注册一个乃至多个事务同步方法,在事务提交前后分别执行对应的事务同步方法。

  • 相关阅读:
    [暑假]Js对象部分的学习
    1-31 正则表达式 String Buffer String Builder
    从0开始用C写贪吃蛇(基于链表)
    python中的if ... is None else ...含义与用法
    mysql字符串函数大全(更新完成)
    【Spring IOC容器加载过程】
    《ElementUI 基础知识》el-tree 之“我的电脑”目录结构效果
    antd4 icon使用svg
    滑动窗口练习(一)— 固定窗口最大值问题
    【ORACLE】Oracle里有“time”数据类型吗?--关于对Oracle数据类型的一点研究
  • 原文地址:https://blog.csdn.net/cj_eryue/article/details/133271991