• Spring事务在多线程下保证原子性


    背景

    Springboot项目,有个需求,需要提供接口,接口调用方每一次调用时,都会上报大量的数据,接口需要满足以下要求:

    • 数据保存要保证数据原子性:要么全部保存成功,要么全部不保存。
    • 保证接口性能。

    实践发现,即使使用批量保存,接口耗时也高达一秒多,所以需要开启多线程来保存。现在的问题是,在开启多线程保存的情况下,如何保证数据的原子性

    思路

    • 开启多线程,每个线程都是使用独立的DB连接。否则由于数据库是串行阻塞操作,最终还是会变成排队操作数据库。
    • 依赖spring事务异常回滚机制。
    • 有个统一的标识来标识“是否有线程操作失败”。
    • 线程如果出现异常:先捕获异常,将标识设置为失败,然后继续抛出异常。
    • 线程如果没有异常,在执行的最后,判断标识是失败,也就是“有其他线程有执行失败”,就自定义抛出异常来回滚。
    • 通过锁来保证:所有的线程都操作完之后,一起判断标识是否成功;确保不会出现“还有线程的业务未执行完成,其他线程就已经结束工作”。

    流程图

    失败流程如下:
    在这里插入图片描述

    缺点

    多个线程都操作成功,同时提交事务时,如果某个线程在完成commit之前自身发生了奔溃,而其他线程已经commit完成,会导致数据不一致。

    代码

    @Slf4j
    @Component
    public class AtomicConcurrentTransactionalExecutor {
        @Autowired
        private TransactionalWorker transactionalWorker;
    
        /**
         * @param threadWwaitTerminationTimeout
         * @param runnables
         */
        public boolean execute(int threadWwaitTerminationTimeout, Runnable... runnables) {
            int threadSize = runnables.length;
            CyclicBarrier workerCyclicBarrier = new CyclicBarrier(threadSize);
            AtomicInteger successCounter = new AtomicInteger(threadSize);
            ExecutorService executorService = Executors.newFixedThreadPool(threadSize);
            for (Runnable runnable : runnables) {
                executorService.submit(() -> {
                    try {
                        transactionalWorker.run(workerCyclicBarrier, successCounter, runnable);
                    } catch (Exception e) {
                        log.error("TransactionalWorker current thread execute error before runnable.run!", e);
                    }
                });
            }
            ThreadUtils.shutdown(executorService, threadWwaitTerminationTimeout, TimeUnit.SECONDS);
            return successCounter.get() == 0;
        }
    
        /**
         * @param threadWwaitTerminationTimeout
         * @param threadPollSize
         * @param runnable
         * @return boolean
         * @author minchin
         * @date 2020/2/12 12:33 下午
         */
        public boolean execute(int threadWwaitTerminationTimeout, int threadPollSize, Runnable runnable) {
            Runnable[] runnables = IntStream.range(0, threadPollSize)
                    .mapToObj(i -> runnable)
                    .toArray(Runnable[]::new);
            return execute(threadWwaitTerminationTimeout, runnables);
        }
    }
    
    
    @Component
    @Slf4j
    public class TransactionalWorker {
    
        /**
         * @param workerCyclicBarrier
         * @param successCounter
         * @param runnable
         */
        @Transactional(rollbackFor = Exception.class)
        public void run(CyclicBarrier workerCyclicBarrier, AtomicInteger successCounter, Runnable runnable) {
            boolean isSuccess = false;
            try {
                runnable.run();
                successCounter.decrementAndGet();
                isSuccess = true;
            } catch (Exception e) {
                log.error("TransactionalWorker current thread execute error!", e);
                isSuccess = false;
                throw e;
            } finally {
                try {
                    // 如果是数据库操作慢导致长时间阻塞,并不会被线程池中断(Interrupt),也就是会等到数据库操作完成之后,进入到这一步,然后直接报超时异常
                    workerCyclicBarrier.await();
                } catch (Exception e) {
                    // 等待其他线程时超时
                    log.error("TransactionalWorker current thread execute CyclicBarrier.await error!", e);
                    if (isSuccess) {
                        // 要回滚计数,否则:假设全部线程都操作成功,但刚好超时,主线程shutdown线程池后,计数为0,会返回成功
                        successCounter.incrementAndGet();
                    }
                }
            }
            if (successCounter.get() != 0) {
                log.error("TransactionalWorker other thread execute error, create SystemException to rollback!");
                throw new SystemException("TransactionalWorker other thread execute error, create SystemException to rollback!");
            }
        }
    }
    
    
    @Slf4j
    public class ThreadUtils {
    
        private ThreadUtils() {
        }
    
        /**
         * @param pool
         * @param awaitTerminationTimeout
         * @param timeUnit
         * @return 如果出现异常,则返回false
         */
        public static boolean shutdown(ExecutorService pool, int awaitTerminationTimeout, TimeUnit timeUnit) {
            try {
                pool.shutdown();
                boolean done = false;
                try {
                    done = awaitTerminationTimeout > 0 && pool.awaitTermination(awaitTerminationTimeout, timeUnit);
                } catch (InterruptedException e) {
                    log.error("thread pool awaitTermination error!", e);
                }
                if (!done) {
                    pool.shutdownNow();
                    if(awaitTerminationTimeout > 0) {
                        pool.awaitTermination(awaitTerminationTimeout, timeUnit);
                    }
                }
            } catch (Exception e) {
                log.error("thread pool shutdown error!", e);
                return false;
            }
            return true;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120

    使用例子

    同样的业务,拆分多个线程来

    return atomicConcurrentTransactionalExecutor.execute(10, 2,
      // 业务
      () -> testService.test1()
    }
    
    • 1
    • 2
    • 3
    • 4

    不同的业务,每个线程操作不同的业务

    return atomicConcurrentTransactionalExecutor.execute(10,
      // 业务1
      () -> testService.test1(),
      // 业务2
      () -> testService.test2(),
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    注意在使用时,同一个类内,调用内部方法,Spring事务不生效的问题。
    任务超时的边界测试还需要严格再测试!

  • 相关阅读:
    P8662 [蓝桥杯 2018 省 AB] 全球变暖--2024蓝桥杯冲刺省一
    RBF神经网络python实践学习(BP算法)
    阶乘数码洛谷
    Android Kotlin知识汇总(一)编程语言
    工程与建设杂志工程与建设杂志社工程与建设编辑部2022年第3期目录
    Linux多线程编程- 条件变量(Conditional variable)
    【【萌新的FPGA学习之实战流水灯】】
    多 bit 数据流传输&FIFO-跨时钟域处理(2)
    【SLAM】3三维刚体运动
    C++性能优化笔记-6-C++元素的效率差异-7-类型转换
  • 原文地址:https://blog.csdn.net/luoyang_java/article/details/126728773