- public void much(){
- //业务操作1
- doBusiness1();
- //业务操作2
- doBusiness2();
- //业务操作3
- doBusiness3();
- //业务操作4
- doBusiness4();
- }
- private void doBusiness1() {
- //执行sql1
- //执行sql2
- //执行sql3
- //执行sql4
- }
事务的开始是由@EnableTransactionManagement 注解产生,这个注解在运行时会导入TransactionManagementConfigurationSelector这个类,这个类本质上是一个ImportSelector,他根据adviceMode将特定的配置类导入进去,分别为AutoProxyRegistrar 后置处理器和ProxyTransactionManagementConfiguration Advisor。
AutoProxyRegistrar 实现了ImportBeanDefinitionRegistrar 重写了registerBeanDefinitions 方法
- public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
- boolean candidateFound = false;
- Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
- for (String annType : annTypes) {
- // ...
- AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
- }
- // ...
- }
- @Nullable
- public static BeanDefinition registerAutoProxyCreatorIfNecessary(
- BeanDefinitionRegistry registry, @Nullable Object source) {
- return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
- }
- public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
- if (bean != null) {
- Object cacheKey = getCacheKey(bean.getClass(), beanName);
- if (this.earlyProxyReferences.remove(cacheKey) != bean) {
- return wrapIfNecessary(bean, beanName, cacheKey);
- }
- }
- return bean;
- }
- protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
- // ...
- // 拿当前bean去匹配容器中的 Advisors,如果找到符合的就生成代理对象
- // Create proxy if we have advice.
- Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
- if (specificInterceptors != DO_NOT_PROXY) {
- this.advisedBeans.put(cacheKey, Boolean.TRUE);
- Object proxy = createProxy(
- bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
- this.proxyTypes.put(cacheKey, proxy.getClass());
- return proxy;
- }
- this.advisedBeans.put(cacheKey, Boolean.FALSE);
- return bean;
- }
- protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
- final InvocationCallback invocation) throws Throwable {
- //TransactionAttributeSource内部保存着当前类某个方法对应的TransactionAttribute---事务属性源
- //可以看做是一个存放TransactionAttribute与method方法映射的池子
- TransactionAttributeSource tas = getTransactionAttributeSource();
- //获取当前事务方法对应的TransactionAttribute
- final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
- //定位TransactionManager
- final TransactionManager tm = determineTransactionManager(txAttr);
- .....
- //类型转换为局部事务管理器
- PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
- final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
- if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
- //TransactionManager根据TransactionAttribute创建事务后返回
- //TransactionInfo封装了当前事务的信息--包括TransactionStatus
- TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
- Object retVal;
- try {
- //继续执行过滤器链---过滤链最终会调用目标方法
- //因此可以理解为这里是调用目标方法
- retVal = invocation.proceedWithInvocation();
- }
- catch (Throwable ex) {
- //目标方法抛出异常则进行判断是否需要回滚
- completeTransactionAfterThrowing(txInfo, ex);
- throw ex;
- }
- finally {
- //清除当前事务信息
- cleanupTransactionInfo(txInfo);
- }
- ...
- //正常返回,那么就正常提交事务呗(当然还是需要判断TransactionStatus状态先)
- commitTransactionAfterReturning(txInfo);
- return retVal;
- }
- ...
- @Configuration(proxyBeanMethods = false)
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
- @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
- TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
- BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
- advisor.setTransactionAttributeSource(transactionAttributeSource);
- advisor.setAdvice(transactionInterceptor);
- if (this.enableTx != null) {
- advisor.setOrder(this.enableTx.
getNumber("order")); - }
- return advisor;
- }
- @Bean
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public TransactionAttributeSource transactionAttributeSource() {
- // TransactionAttributeSource 是一个接口,具体注入的是 Annotationxxxx
- return new AnnotationTransactionAttributeSource();
- }
- @Bean
- @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
- public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
- TransactionInterceptor interceptor = new TransactionInterceptor();
- interceptor.setTransactionAttributeSource(transactionAttributeSource);
- if (this.txManager != null) {
- interceptor.setTransactionManager(this.txManager);
- }
- return interceptor;
- }
- }
- @Nullable
- private TransactionAttributeSource transactionAttributeSource;
- private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
- @Override
- @Nullable
- protected TransactionAttributeSource getTransactionAttributeSource() {
- return transactionAttributeSource;
- }
- };
- /**
- * Set the transaction attribute source which is used to find transaction
- * attributes. This should usually be identical to the source reference
- * set on the transaction interceptor itself.
- * @see TransactionInterceptor#setTransactionAttributeSource
- */
- public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
- this.transactionAttributeSource = transactionAttributeSource;
- }
- /**
- * Set the {@link ClassFilter} to use for this pointcut.
- * Default is {@link ClassFilter#TRUE}.
- */
- public void setClassFilter(ClassFilter classFilter) {
- this.pointcut.setClassFilter(classFilter);
- }
- @Override
- public Pointcut getPointcut() {
- return this.pointcut;
- }
- public class TransactionMain {
- public static void main(String[] args) throws ClassNotFoundException, SQLException {
- test();
- }
- private static void test() {
- DataSource dataSource = getDS();
- JdbcTransactionManager jtm = new JdbcTransactionManager(dataSource);
- //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
- //包括隔离级别和传播行为等
- DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
- //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
- TransactionStatus ts = jtm.getTransaction(transactionDef);
- //进行业务逻辑操作
- try {
- update(dataSource);
- jtm.commit(ts);
- }catch (Exception e){
- jtm.rollback(ts);
- System.out.println("发生异常,我已回滚");
- }
- }
- private static void update(DataSource dataSource) throws Exception {
- JdbcTemplate jt = new JdbcTemplate();
- jt.setDataSource(dataSource);
- jt.update("UPDATE Department SET Dname=\"大忽悠\" WHERE id=6");
- throw new Exception("我是来捣乱的");
- }
- }
- @Transactional
- public void testDirect() {
- new Thread(()->{
- Per per = new Per();
- per.setName("t1");
- perService.save(per);
- }).start();
- new Thread(()->{
- Per per1 = new Per();
- per1.setName("t2");
- perService.save(per1);
- throw new RuntimeException("Exception test");
- }).start();
- }
- package com.user.util;
- import lombok.RequiredArgsConstructor;
- import org.springframework.jdbc.datasource.DataSourceTransactionManager;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.TransactionStatus;
- import org.springframework.transaction.support.DefaultTransactionDefinition;
- import javax.sql.DataSource;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Executor;
- import java.util.concurrent.atomic.AtomicBoolean;
- @Component
- @RequiredArgsConstructor
- public class MultiplyThreadTransactionManager {
- /**
- * 如果是多数据源的情况下,需要指定具体是哪一个数据源
- */
- private final DataSource dataSource;
- public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
- if(executor==null){
- throw new IllegalArgumentException("线程池不能为空");
- }
- DataSourceTransactionManager transactionManager = getTransactionManager();
- //是否发生了异常
- AtomicBoolean ex=new AtomicBoolean();
- List<CompletableFuture> taskFutureList=new ArrayList<>(tasks.size());
- List<TransactionStatus> transactionStatusList=new ArrayList<>(tasks.size());
- tasks.forEach(task->{
- taskFutureList.add(CompletableFuture.runAsync(
- () -> {
- try{
- //1.开启新事务
- transactionStatusList.add(openNewTransaction(transactionManager));
- //2.异步任务执行
- task.run();
- }catch (Throwable throwable){
- //打印异常
- throwable.printStackTrace();
- //其中某个异步任务执行出现了异常,进行标记
- ex.set(Boolean.TRUE);
- //其他任务还没执行的不需要执行了
- taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
- }
- }
- , executor)
- );
- });
- try {
- //阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
- CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- }
- //发生了异常则进行回滚操作,否则提交
- if(ex.get()){
- System.out.println("发生异常,全部事务回滚");
- transactionStatusList.forEach(transactionManager::rollback);
- }else {
- System.out.println("全部事务正常提交");
- transactionStatusList.forEach(transactionManager::commit);
- }
- }
- private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
- //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
- //包括隔离级别和传播行为等
- DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
- //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
- return transactionManager.getTransaction(transactionDef);
- }
- private DataSourceTransactionManager getTransactionManager() {
- return new DataSourceTransactionManager(dataSource);
- }
- }
- public void test(){
- List<Runnable> tasks=new ArrayList<>();
- tasks.add(()->{
- Per per = new Per();
- per.setName("t1");
- perService.save(per);
- });
- tasks.add(()->{
- Per per = new Per();
- per.setName("t2");
- perService.save(per);
- });
- multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(tasks, Executors.newCachedThreadPool());
- }
- java.lang.IllegalStateException: No value for key [HikariDataSource (HikariPool-1)] bound to thread
- at org.springframework.transaction.support.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:198) ~[spring-tx-5.3.10.jar:5.3.10]
- at org.springframework.jdbc.datasource.DataSourceTransactionManager.doCleanupAfterCompletion(DataSourceTransactionManager.java:371) ~[spring-jdbc-5.3.10.jar:5.3.10]
- at org.springframework.transaction.support.AbstractPlatformTransactionManager.cleanupAfterCompletion(AbstractPlatformTransactionManager.java:992) ~[spring-tx-5.3.10.jar:5.3.10]
- at org.springframework.transaction.suppoAbstractPlatformTransactionrt.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager
- /**
- * Bind the given resource for the given key to the current thread.
- * @param key the key to bind the value to (usually the resource factory)
- * @param value the value to bind (usually the active resource object)
- * @throws IllegalStateException if there is already a value bound to the thread
- * @see ResourceTransactionManager#getResourceFactory()
- */
- public static void bindResource(Object key, Object value) throws IllegalStateException {
- Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
- Assert.notNull(value, "Value must not be null");
- Map<Object, Object> map = resources.get();
- // set ThreadLocal Map if none found
- if (map == null) {
- map = new HashMap<>();
- resources.set(map);
- }
- Object oldValue = map.put(actualKey, value);
- // Transparently suppress a ResourceHolder that was marked as void...
- if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {
- oldValue = null;
- }
- if (oldValue != null) {
- throw new IllegalStateException(
- "Already value [" + oldValue + "] for key [" + actualKey + "] bound to thread");
- }
- }
- @Override
- @Nullable
- public Object invoke(MethodInvocation invocation) throws Throwable {
- // Work out the target class: may be {@code null}.
- // The TransactionAttributeSource should be passed the target class
- // as well as the method, which may be from an interface.
- Class> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
- // Adapt to TransactionAspectSupport's invokeWithinTransaction...
- //重点,这里注册了一个回调,最后会调回下面
- //父类实现
- return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
- @Override
- @Nullable
- public Object proceedWithInvocation() throws Throwable {
- //原始方法
- return invocation.proceed();
- }
- @Override
- public Object getTarget() {
- return invocation.getThis();
- }
- @Override
- public Object[] getArguments() {
- return invocation.getArguments();
- }
- });
- }
- /**
- * General delegate for around-advice-based subclasses, delegating to several other template
- * methods on this class. Able to handle {@link CallbackPreferringPlatformTransactionManager}
- * as well as regular {@link PlatformTransactionManager} implementations and
- * {@link ReactiveTransactionManager} implementations for reactive return types.
- * @param method the Method being invoked
- * @param targetClass the target class that we're invoking the method on
- * @param invocation the callback to use for proceeding with the target invocation
- * @return the return value of the method, if any
- * @throws Throwable propagated from the target invocation
- */
- @Nullable
- protected Object invokeWithinTransaction(Method method, @Nullable Class> targetClass,
- final InvocationCallback invocation) throws Throwable {
- // If the transaction attribute is null, the method is non-transactional.
- //说人话就是获取事务资源,装配事务管理器
- TransactionAttributeSource tas = getTransactionAttributeSource();
- final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
- final TransactionManager tm = determineTransactionManager(txAttr);
- PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
- final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
- if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
- // Standard transaction demarcation with getTransaction and commit/rollback calls.
- //事务相关信息,包括传播级别,什么异常下回滚等
- TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
- Object retVal;
- try {
- // This is an around advice: Invoke the next interceptor in the chain.
- // This will normally result in a target object being invoked.
- //就是他的子类注册的回调,真正的业务逻辑
- retVal = invocation.proceedWithInvocation();
- }
- catch (Throwable ex) {
- // target invocation exception
- //回滚
- completeTransactionAfterThrowing(txInfo, ex);
- throw ex;
- }
- finally {
- //清理事务信息
- cleanupTransactionInfo(txInfo);
- }
- if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
- // Set rollback-only in case of Vavr failure matching our rollback rules...
- TransactionStatus status = txInfo.getTransactionStatus();
- if (status != null && txAttr != null) {
- retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
- }
- }
- //提交
- commitTransactionAfterReturning(txInfo);
- return retVal;
- }
- ...
- }
- protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
- if (txInfo != null && txInfo.getTransactionStatus() != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
- }
- txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
- }
- }
- @Override
- public final void commit(TransactionStatus status) throws TransactionException {
- if (status.isCompleted()) {
- throw new IllegalTransactionStateException(
- "Transaction is already completed - do not call commit or rollback more than once per transaction");
- }
- DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
- if (defStatus.isLocalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Transactional code has requested rollback");
- }
- processRollback(defStatus, false);
- return;
- }
- if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
- if (defStatus.isDebug()) {
- logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
- }
- processRollback(defStatus, true);
- return;
- }
- processCommit(defStatus);
- }
- package com.controller;
- import lombok.Builder;
- import lombok.RequiredArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.slf4j.MDC;
- import org.springframework.jdbc.datasource.DataSourceTransactionManager;
- import org.springframework.stereotype.Component;
- import org.springframework.transaction.TransactionStatus;
- import org.springframework.transaction.support.DefaultTransactionDefinition;
- import org.springframework.transaction.support.TransactionSynchronization;
- import org.springframework.transaction.support.TransactionSynchronizationManager;
- import org.springframework.util.CollectionUtils;
- import javax.sql.DataSource;
- import java.util.*;
- import java.util.concurrent.*;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicInteger;
- /**
- * 多线程事务管理
- */
- @Component
- @Slf4j
- @RequiredArgsConstructor
- public class MultiplyThreadTransactionManager {
- /**
- * 如果是多数据源的情况下,需要指定具体是哪一个数据源
- */
- private final DataSource dataSource;
- private final static ThreadLocal<Boolean> immediatelyCommitFlag = new ThreadLocal<>();
- private final static ThreadLocal<List<TransactionStatus>> transactionStatusListThreadLocal = new ThreadLocal<>();
- private final static ThreadLocal<List<TransactionResource>> transactionResourcesthreadLocal = new ThreadLocal<>();
- private final static ThreadLocal<Map<Object, Object>> mainNativeResourceThreadLocal = new ThreadLocal<>();
- /**
- * 多线程下事务执行
- *
- * @param tasks 任务列表
- * @param immediatelyCommit 是否需要立即提交
- */
- public List<CompletableFuture> runAsyncButWaitUntilAllDown(List<Runnable> tasks, Boolean immediatelyCommit) {
- Executor executor = Executors.newCachedThreadPool();
- DataSourceTransactionManager transactionManager = getTransactionManager();
- //是否发生了异常
- AtomicBoolean ex = new AtomicBoolean();
- List<CompletableFuture> taskFutureList = new CopyOnWriteArrayList<>();
- List<TransactionStatus> transactionStatusList = new CopyOnWriteArrayList<>();
- List<TransactionResource> transactionResources = new CopyOnWriteArrayList<>();
- //记录原生主事务资源
- //这一步可能在原生sql执行前,也可能在原生sql执行后,所以这个资源可能不够充分,需要在下面继续处理
- //如果返回的是原资源集合的引用,下面一步可以不用
- Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
- if (!CollectionUtils.isEmpty(resourceMap)) {
- mainNativeResourceThreadLocal.set(new HashMap<>(resourceMap));
- }
- Map<String, String> copyOfContextMap = MDC.getCopyOfContextMap();
- Executor finalExecutor = executor;
- AtomicInteger atomicInteger = new AtomicInteger(0);
- tasks.forEach(task -> {
- taskFutureList.add(CompletableFuture.runAsync(
- () -> {
- log.info("任务开始");
- try {
- //1.开启新事务
- TransactionStatus transactionStatus = openNewTransaction(transactionManager);
- log.info("开启新事务 successfully");
- transactionStatusList.add(transactionStatus);
- atomicInteger.incrementAndGet();
- System.out.println("atomicInteger.get()"+atomicInteger.incrementAndGet());
- System.out.println(transactionStatus);
- //2.异步任务执行
- task.run();
- log.info("异步任务执行 successfully");
- //3.继续事务资源复制,因为在sql执行是会产生新的资源对象
- transactionResources.add(TransactionResource.copyTransactionResource());
- } catch (Throwable throwable) {
- log.info("任务执行异常"+throwable.getMessage());
- log.error("任务执行异常",throwable);
- //其中某个异步任务执行出现了异常,进行标记
- ex.set(Boolean.TRUE);
- //其他任务还没执行的不需要执行了
- taskFutureList.forEach(completableFuture -> completableFuture.cancel(true));
- }
- }
- , finalExecutor)
- );
- });
- try {
- //阻塞直到所有任务全部执行结束---如果有任务被取消,这里会抛出异常滴,需要捕获
- CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[]{})).get();
- } catch (InterruptedException | ExecutionException e) {
- log.info("任务被取消");
- log.error("任务被取消",e);
- }
- //发生了异常则进行回滚操作,否则提交
- if (ex.get()) {
- log.info("发生异常,全部事务回滚");
- for (int i = 0; i < transactionStatusList.size(); i++) {
- transactionResources.get(i).autoWiredTransactionResource();
- Map<Object, Object> rollBackResourceMap = TransactionSynchronizationManager.getResourceMap();
- log.info("回滚前事务资源size{},本身{}",rollBackResourceMap.size(),rollBackResourceMap);
- transactionManager.rollback(transactionStatusList.get(i));
- transactionResources.get(i).removeTransactionResource();
- }
- } else {
- if (immediatelyCommit) {
- log.info("全部事务正常提交");
- for (int i = 0; i < transactionStatusList.size(); i++) {
- //transactionResources.get(i).autoWiredTransactionResource();
- Map<Object, Object> commitResourceMap = TransactionSynchronizationManager.getResourceMap();
- log.info("提交前事务资源size{},本身{}",commitResourceMap.size(),commitResourceMap);
- transactionManager.commit(transactionStatusList.get(i));
- transactionResources.get(i).removeTransactionResource();
- }
- } else {
- //缓存全部待提交数据
- immediatelyCommitFlag.set(immediatelyCommit);
- transactionResourcesthreadLocal.set(transactionResources);
- transactionStatusListThreadLocal.set(transactionStatusList);
- }
- }
- //交还给主事务
- if (immediatelyCommit) {
- mainTransactionResourceBack(!ex.get());
- }
- return taskFutureList;
- }
- public void multiplyThreadTransactionCommit() {
- try {
- Boolean immediatelyCommit = immediatelyCommitFlag.get();
- if (immediatelyCommit) {
- throw new IllegalStateException("immediatelyCommit cant call multiplyThreadTransactionCommit");
- }
- //提交
- //获取存储的事务资源和状态
- List<TransactionResource> transactionResources = transactionResourcesthreadLocal.get();
- List<TransactionStatus> transactionStatusList = transactionStatusListThreadLocal.get();
- if (CollectionUtils.isEmpty(transactionResources) || CollectionUtils.isEmpty(transactionStatusList)) {
- throw new IllegalStateException("transactionResources or transactionStatusList is empty");
- }
- //重新提交
- DataSourceTransactionManager transactionManager = getTransactionManager();
- log.info("全部事务正常提交");
- for (int i = 0; i < transactionStatusList.size(); i++) {
- transactionResources.get(i).autoWiredTransactionResource();
- Map<Object, Object> commitResourceMap = TransactionSynchronizationManager.getResourceMap();
- log.info("提交前事务资源size{},本身{}",commitResourceMap.size(),commitResourceMap);
- transactionManager.commit(transactionStatusList.get(i));
- transactionResources.get(i).removeTransactionResource();
- }
- } catch (Exception e) {
- mainTransactionResourceBack(false);
- log.error("multiplyThreadTransactionCommit fail", e);
- } finally {
- transactionResourcesthreadLocal.remove();
- transactionStatusListThreadLocal.remove();
- immediatelyCommitFlag.remove();
- }
- //交还给主事务
- mainTransactionResourceBack(true);
- }
- //主线程事务资源返还
- public void mainTransactionResourceBack(Boolean subTransactionSuccess) {
- if (CollectionUtils.isEmpty(mainNativeResourceThreadLocal.get())) {
- //清除数据
- mainNativeResourceThreadLocal.remove();
- return;
- }
- Map<Object, Object> nativeResource = new HashMap<>(mainNativeResourceThreadLocal.get());
- Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
- log.info("当前线程资事务源size{}--------------------------------{}",resourceMap.size(), resourceMap);
- log.info("原生线程事务资源size{}--------------------------------{}",nativeResource.size(), nativeResource);
- //已经被绑定的资源不能重复绑定
- if (!CollectionUtils.isEmpty(resourceMap)) {
- for (Object o : resourceMap.keySet()) {
- if (nativeResource.containsKey(o)) {
- nativeResource.remove(o);
- }
- }
- }
- nativeResource.forEach((k,v)->{
- if (!(k instanceof DataSource)){
- log.info("nativeResource 没有 DataSource");
- }
- });
- //交还不能绑定factory
- nativeResource.forEach((k,v)->{
- if (k instanceof DataSource){
- TransactionSynchronizationManager.bindResource(k,v);
- }
- });
- Map<Object, Object> finResource = TransactionSynchronizationManager.getResourceMap();
- log.info("主线程最终事务源size{}--------------------------------{}",finResource.size(), finResource);
- //防止未激活事务
- if (!TransactionSynchronizationManager.isSynchronizationActive()) {
- TransactionSynchronizationManager.initSynchronization();
- }
- //清除数据
- mainNativeResourceThreadLocal.remove();
- if (!subTransactionSuccess) {
- throw new RuntimeException("子事务失败,需要回滚");
- }
- }
- private TransactionStatus openNewTransaction(DataSourceTransactionManager transactionManager) {
- //JdbcTransactionManager根据TransactionDefinition信息来进行一些连接属性的设置
- //包括隔离级别和传播行为等
- DefaultTransactionDefinition transactionDef = new DefaultTransactionDefinition();
- //开启一个新事务---此时autocommit已经被设置为了false,并且当前没有事务,这里创建的是一个新事务
- return transactionManager.getTransaction(transactionDef);
- }
- private DataSourceTransactionManager getTransactionManager() {
- return new DataSourceTransactionManager(dataSource);
- }
- /**
- * 保存当前事务资源,用于线程间的事务资源COPY操作
- */
- @Builder
- private static class TransactionResource {
- //事务结束后默认会移除集合中的DataSource作为key关联的资源记录
- private Map<Object, Object> resources = new HashMap<>();
- //下面五个属性会在事务结束后被自动清理,无需我们手动清理
- private Set<TransactionSynchronization> synchronizations = new HashSet<>();
- private String currentTransactionName;
- private Boolean currentTransactionReadOnly;
- private Integer currentTransactionIsolationLevel;
- private Boolean actualTransactionActive;
- public static TransactionResource copyTransactionResource() {
- return TransactionResource.builder()
- //返回的是不可变集合,这里为了更加灵活,copy出一个集合过来
- .resources(new HashMap<>(TransactionSynchronizationManager.getResourceMap()))
- //如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
- .synchronizations(new LinkedHashSet<>())
- .currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
- .currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
- .currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
- .actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
- .build();
- }
- //装配事务资源,为提交/回滚做储备
- public void autoWiredTransactionResource() {
- //获取当前线程事务资源
- Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
- for (Object o : resourceMap.keySet()) {
- if (resourceMap.containsKey(o)) {
- //移除重复事务资源key,避免绑定报错
- resources.remove(o);
- }
- }
- boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
- //绑定事务资源,注意 绑定是绑定到当前主线程上,记得最后释放交换主线程,再由主线程收回原有事务自选
- resources.forEach(TransactionSynchronizationManager::bindResource);
- //如果需要注册事务监听者,这里记得修改--我们这里不需要,就采用默认负责--spring事务内部默认也是这个值
- //避免重复激活或者事务未激活
- if (!synchronizationActive) {
- TransactionSynchronizationManager.initSynchronization();
- }
- TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
- TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
- TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
- TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
- }
- public void removeTransactionResource() {
- Map<Object, Object> resourceMap = new HashMap<>(TransactionSynchronizationManager.getResourceMap());
- //事务结束后默认会移除集合中的DataSource作为key关联的资源记录
- //DataSource如果重复移除,unbindResource时会因为不存在此key关联的事务资源而报错
- resources.keySet().forEach(key -> {
- if (resourceMap.containsKey(key)) {
- TransactionSynchronizationManager.unbindResource(key);
- }
- });
- }
- }
- }
- @Transactional
- public String test(Integer par) {
- log.info("get(" + par + ")");
- if (par == 3 || par == 5 || par == 6) {
- Per per2 = new Per();
- per2.setName("t3");
- per2.setGrou(Thread.currentThread().getName());
- perService.save(per2);
- }
- List<Runnable> list = new ArrayList<>();
- list.add(() -> {
- Per per = new Per();
- per.setName("t1");
- per.setGrou(Thread.currentThread().getName());
- log.info("任务开始save");
- perService.save(per);
- log.info("任务完成save");
- if (par == 1) {
- throw new RuntimeException();
- }
- });
- list.add(() -> {
- Per per1 = new Per();
- per1.setName("t2");
- per1.setGrou(Thread.currentThread().getName());
- log.info("任务开始save");
- perService.save(per1);
- log.info("任务完成save");
- if (par == 2) {
- throw new RuntimeException();
- }
- });
- log.info("runAsyncButWaitUntilAllDown start");
- multiplyThreadTransactionManager.runAsyncButWaitUntilAllDown(list, false);
- if (par == 4 || par == 5 || par == 6) {
- Per per3 = new Per();
- per3.setName("t4");
- per3.setGrou(Thread.currentThread().getName());
- perService.save(per3);
- if (par == 6) {
- throw new RuntimeException();
- }
- }
- log.info("multiplyThreadTransactionCommit start");
- multiplyThreadTransactionManager.multiplyThreadTransactionCommit();
- return "ss";
- }