• spring 使用多线程,保证事务一致性


    1、背景

    最近接受到接口优化的任务,查看代码逻辑后发现在批量处理数据耗时长,想到使用多线程处理批量数据,又要保持原来的事务一致性。

    2、实现方法

    (1)、创建多线程事务管理

    @Component
    @Slf4j
    public class MultiThreadingTransactionManager {
        /**
         * 数据源事务管理器
         */
        @Autowired
        private DataSourceTransactionManager dataSourceTransactionManager;
        @Autowired
        private ThreadPoolTaskExecutor executorService;
        private long timeout = 120;
    
        /**
         * 用于判断子线程业务是否处理完成
         * 处理完成时threadCountDownLatch的值为0
         */
        private CountDownLatch threadCountDownLatch;
    
        /**
         * 用于等待子线程全部完成后,子线程统一进行提交和回滚
         * 进行提交和回滚时mainCountDownLatch的值为0
         */
        private final CountDownLatch mainCountDownLatch = new CountDownLatch(1);
    
        /**
         * 是否提交事务,默认是true,当子线程有异常发生时,设置为false,回滚事务
         */
        private final AtomicBoolean isSubmit = new AtomicBoolean(true);
    
        public boolean execute(List<Runnable> runnableList,String factorySchema) {
            isSubmit.set(true);
            setThreadCountDownLatch(runnableList.size());
            runnableList.forEach(runnable -> executorService.execute(() -> executeThread(factorySchema,runnable, threadCountDownLatch, mainCountDownLatch, isSubmit)));
            // 等待子线程全部执行完毕
            try {
                // 若计数器变为零了,则返回 true
                boolean isFinish = threadCountDownLatch.await(timeout, TimeUnit.SECONDS);
                if (!isFinish) {
                    // 如果还有为执行完成的就回滚
                    isSubmit.set(false);
                    log.info("存在子线程在预期时间内未执行完毕,任务将全部回滚");
                }
            } catch (Exception exception) {
                log.info("主线程发生异常,异常为: " + exception.getMessage());
            } finally {
                // 计数器减1,代表该主线程执行完毕
                mainCountDownLatch.countDown();
            }
            // 返回结果,是否执行成功,事务提交即为执行成功,事务回滚即为执行失败
            return isSubmit.get();
        }
    
        private void executeThread(String factorySchema,Runnable runnable, CountDownLatch threadCountDownLatch, CountDownLatch mainCountDownLatch, AtomicBoolean isSubmit) {
            log.info("子线程: [" + Thread.currentThread().getName() + "]");
            // 判断别的子线程是否已经出现错误,错误别的线程已经出现错误,那么所有的都要回滚,这个子线程就没有必要执行了
            if (!isSubmit.get()) {
                log.info("整个事务中有子线程执行失败需要回滚, 子线程: [" + Thread.currentThread().getName() + "] 终止执行");
                // 计数器减1,代表该子线程执行完毕
                threadCountDownLatch.countDown();
                return;
            }
            //动态数据源切换
            SchemaContextHolder.setSchema(factorySchema);
            // 开启事务
            DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
            TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(defaultTransactionDefinition);
            try {
                // 执行业务逻辑
                runnable.run();
            } catch (Exception exception) {
                // 发生异常需要进行回滚,设置isSubmit为false
                isSubmit.set(false);
                log.info("子线程: [" + Thread.currentThread().getName() + "]执行业务发生异常,异常为: " + exception.getMessage());
            } finally {
                // 计数器减1,代表该子线程执行完毕
                threadCountDownLatch.countDown();
            }
            try {
                // 等待主线程执行
                mainCountDownLatch.await();
            } catch (Exception exception) {
                log.info("子线程: [" + Thread.currentThread().getName() + "]等待提交或回滚异常,异常为: " + exception.getMessage());
            }
            try {
                // 提交
                if (isSubmit.get()) {
                    dataSourceTransactionManager.commit(transactionStatus);
                    log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务提交");
                } else {
                    dataSourceTransactionManager.rollback(transactionStatus);
                    log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务回滚");
                }
            } catch (Exception exception) {
                log.info("子线程: [" + Thread.currentThread().getName() + "]进行事务提交或回滚出现异常,异常为:" + exception.getMessage());
            }
        }
    
        private void setThreadCountDownLatch(int num) {
            this.threadCountDownLatch = new CountDownLatch(num);
        }
    }
    

    (2)、测试类

    @RestController
    @RequestMapping("test")
    public class TestController {
        @Autowired
        TestService testService;
        @Autowired
        MultiThreadingTransactionManager multiThreadingTransactionManager;
        @RequestMapping("test")
        public String test(){
            List<TestBean> list = new ArrayList<>();
            list.add(new TestBean("2",1));
            list.add(new TestBean("3",2));
            List<Runnable> runnableList = new ArrayList<>();
            list.forEach(testBean -> runnableList.add(() -> {
                    testService.insert(testBean);
            }));
            boolean isSuccess = multiThreadingTransactionManager.execute(runnableList,"db9771");
            System.out.println(isSuccess);
    
            return "ok";
        };
    }
    

    3、总结

    大体思路,就是所有子线程在各自线程内开启事务,执行业务逻辑后,判断是否抛错,一旦抛错,会把全局AtomicBoolean置为false,因为其具有原子性所以不会有线程不安全问题。所有子线程完业务代码会等待主线程,全部子线程执行业务结束后,主线程等待结束,判断AtomicBoolean是什么状态,一旦false,所有子线程回滚,否则提交。

  • 相关阅读:
    MySQL事务
    POJ3687Labeling Balls题解
    最远点采样(Farthest Point Sampling,FPS)算法详解
    PTA 7-169 斐波那契数列
    每日汇评:积极的数据可能会推动澳元/美元的上涨
    web前端框架JS学习之JavaScript类型转换
    7.MMD 法线贴图的设置与调教
    java3、异常
    K8S安装过程七:Kubernetes 节点配置调整
    Flink学习之旅:(三)Flink源算子(数据源)
  • 原文地址:https://blog.csdn.net/qq_21190847/article/details/139331363