📑需求:多线程插入,(保证原子性,要么都成功,要么都失败)其中一个线程报错,所有线程回滚

首先事务的四大特性(ACID)特性都知道吧,面试中张口就来!!!
有了这四个特性,在回头看多线程事务的概念,隔离性(划重点,要考)

还是没看懂?没关系给你举个荔枝🍒,线程A、B、C三个线程做操作,B出现问题,希望A、C一起回滚,这就是想要的多线程,但是A、B、C三个线程是各跑各的,B出错要带着A、C一起回滚,这就造成了各个事务之前的互相干扰,破坏了隔离性吗!事务的特性里面卡的死死的,所以多线程理论上是行不通的。

说到隔离性。那么,Spring 的源码里面,对于事务的隔离性是如何保证的呢? 答案就是 ThreadLocal。
在事务开启的时候,把当前的链接保存在了 ThreadLocal 里面,从而保证了多线程之间的隔离性,就是每个线程里面都自自玩自己的,我们不可能打破 ThreadLocal 的使用规则,让各个线程共享同一个 ThreadLocal 吧?所以,无论从理论上,还是代码实现上,我都认为这个需求是不能实现的。
但是我在冲浪的时候看了很多博客是用编程式事务,例如100w的数据,开10个线程有一个线程出现异常,需要全部回滚,这边随便找了一个贴出来看看()
- @Component
- public class SelfTransactionManager {
- private TransactionStatus transactionStatus;
- //获取事务源
- @Autowired
- private PlatformTransactionManager platformTransactionManager;
-
- @Autowired
- private TransactionDefinition transactionDefinition;
- /**
- * 手动开启事务
- */
- public TransactionStatus begin() {
- transactionStatus = platformTransactionManager.getTransaction(transactionDefinition);
- return transactionStatus;
- }
-
- /**
- * 提交事务
- */
- public void commit(TransactionStatus transactionStatus) {
- platformTransactionManager.commit(transactionStatus);
- }
-
- /**
- * 回滚事务
- */
- public void rollBack() {
- platformTransactionManager.rollback(transactionStatus);
- }
- }
- @Service
- @AllArgsConstructor
- public class BookingInfoServiceImpl implements IBookingInfoService {
-
- private BookingInfoMapper bookingInfoMapper;
- private SelfTransactionManager selfTransactionManager;
- private TaskExecutor taskExecutor;//自定义的线程池
-
- @Override
- @Transactional(rollbackFor=Exception.class)
- public boolean test() {
- // final InheritableThreadLocal
IS_OK = new InheritableThreadLocal<>(); - // IS_OK.set(Boolean.TRUE);
- AtomicBoolean IS_OK = new AtomicBoolean(true);
-
- //子线程计数器
- CountDownLatch childMonitor = new CountDownLatch(1);
- //子线程结果集
- CopyOnWriteArrayList
childResponse = new CopyOnWriteArrayList<>(); - //主线程计数器
- CountDownLatch mainMonitor = new CountDownLatch(1);
-
- /**
- * ================================子线程1=======================================
- */
- CompletableFuture.runAsync(() -> {
- TransactionStatus transactionStatus = selfTransactionManager.begin();
- boolean result = false;
- try {
- //当前子线程开始插入数据
- //---------业务代码----------
- BookingInfo bookingInfo = new BookingInfo();
- bookingInfo.setCreateTime(DateUtil.date());
- bookingInfo.setPhoneNumber("13215592666");
- bookingInfo.setNickName("ceshi0");
- result = bookingInfoMapper.insertBookingInfo(bookingInfo) < 1 ? false : true;
- //---------业务代码----------
-
- //插入结果 true or false
- childResponse.add(result);
- //执行到这里,等待主线程执行完,因为当前子线程要拿到 IS_OK 的结果,IS_OK的结果由主线程决策
- mainMonitor.await();
- //根据主线程决策的IS_OK,判断是回滚还是提交
- if (IS_OK.get()) {
- //提交
- selfTransactionManager.commit(transactionStatus);
- }else {
- //回滚
- selfTransactionManager.rollBack();
- }
- } catch (Exception e) {
- e.printStackTrace();
- //因为会发生异常 而主线程一直在 while (true) 循环 childResponse的size,这里一定要保证进行add,否则可能会出现死循环
- childResponse.add(result);
- //异常回滚
- selfTransactionManager.rollBack();
- } finally {
- //子线程执行完计数器-1
- childMonitor.countDown();
- }
- }, taskExecutor);
-
-
- /**
- * ================================主线程========================================
- */
- try {
- //---------业务代码----------
- BookingInfo bookingInfo2 = new BookingInfo();
- bookingInfo2.setPhoneNumber("13215592000");
- bookingInfo2.setNickName("ceshi2");
- bookingInfo2.setCreateTime(new Date());
- boolean result = bookingInfoMapper.insertBookingInfo(bookingInfo2) < 1 ? false : true;
- //---------业务代码----------
-
- //主线程插入不成功,不成功让 IS_OK = false;
- if (!result) {
- throw new RuntimeException();
- }
-
- //循环获取子线程结果
- while (true) {
- if (childResponse.size() == 1) {
- break;
- }
- }
- System.out.println(childResponse.size());
-
- //子线程中存在不成功让 IS_OK = false;
- if (childResponse.contains(Boolean.FALSE)) {
- throw new RuntimeException();
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- //IS_OK = false; 等待主线程走完子线程就能得知 IS_OK 为false
- IS_OK.set(Boolean.FALSE);
- //这里抛出异常 是为了当前主线程事务自动进行回滚
- throw e;
- } finally {
- //到这个方法执行完就意味着mainMonitor中主线程走完了,接着子线程会开始执行
- mainMonitor.countDown();
- }
-
- return true;
- }
- }
实现思路:
首先设置一个全局变量Boolean,默认是可以提交的true,在子线程,通过编程式事务开启事务,然后插入数据,一条线程负责10w,但是不提交,同时通知主线程,我准备好了,进入等待状态。
如果子线程出现异常,那就通知主线程,我这边发生异常,然后自己回滚,最后主线程收集10个子线程的状态,如果有一个线程出现问题,那么全局变量就设置为不可提交false,然后唤醒所有子线程,进行回滚。
所有子线程都正常的情况:
线程1 执行插入业务...
线程2 执行插入业务...
线程3 执行插入业务...
线程n 执行插入业务...
线程1 准备就绪,判断其它线程状态可否提交...
线程2 准备就绪,判断其它线程状态可否提交...
线程3 准备就绪,判断其它线程状态可否提交...
线程n 准备就绪,判断其它线程状态可否提交...
线程1 事务提交..
线程2 事务提交...
线程3 事务提交...
线程n 事务提交...
有子线程出现异常的情况:
线程1 执行插入业务...
线程2 执行插入业务...
线程3 执行插入业务...
线程n 执行插入业务...
线程1 准备就绪,判断其它线程状态可否提交...
线程2 出现异常,开始事务回滚...
线程3 准备就绪,判断其它线程状态可否提交...
线程n 准备就绪,判断其它线程状态可否提交...
main线程:有线程执行失败,全局变量设为false
线程1 事务回滚..
线程2 事务回滚...
线程3 事务回滚...
线程n 事务回滚..
看结果符合了预期!!!

但是...

如果在回头细品代码的时候发现,只是成功了一半出现了一个类似于两阶段提交(2PC)的一致性协议,这个实现方式实际上就是编程式事务配合二阶段提交(2PC)使用。破绽就出在 2PC 上。
二阶段提交的问题:
协调者说可以提交了,但是参与者挂了...
举例:10个线程提交肯定有先后顺序,前面6个提交了,已经写入DB,后面4个还没提交,那不就丢了4个
虽然,从某种角度上,绕开了事务的隔离性,但是有一定概率出现数据一致性问题,虽然概率比较小,所以称这种方案叫:基于运气编程,用运气换时间。
这是小编在开发学习使用和总结的小Demo, 这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!