• 【二十六】springboot实现多线程事务处理


     springboot篇章整体栏目: 


    【一】springboot整合swagger(超详细

    【二】springboot整合swagger(自定义)(超详细)

    【三】springboot整合token(超详细)

    【四】springboot整合mybatis-plus(超详细)(上)

    【五】springboot整合mybatis-plus(超详细)(下)

    【六】springboot整合自定义全局异常处理

    【七】springboot整合redis(超详细)

    【八】springboot整合AOP实现日志操作(超详细)

    【九】springboot整合定时任务(超详细)

    【十】springboot整合redis实现启动服务即将热点数据保存在全局以及redis(超详细)

    【十一】springboot整合quartz实现定时任务优化(超详细)

    【十二】springboot整合线程池解决高并发(超详细,保你理解)

    【十三】springboot整合异步调用并获取返回值(超详细)

    【十四】springboot整合WebService(超详细)

    【十五】springboot整合WebService(关于传参数)(超详细)

    【十六】springboot整合WebSocket(超详细)

    【十七】springboot整合WebSocket实现聊天室(超详细)

    【十八】springboot实现自定义全局异常处理

    【十九】springboot整合ElasticSearch实战(万字篇)

    【二十】springboot整合过滤器实战

    【二十一】springboot整合拦截器实战并对比过滤器

    【二十二】springboot整合activiti7(1) 实战演示篇

    【二十三】springboot整合spring事务详解以及实战

    【二十四】springboot使用EasyExcel和线程池实现多线程导入Excel数据

    【二十五】springboot整合jedis和redisson布隆过滤器处理缓存穿透


            在前面二十四章做了一个springboot使用EasyExcel和线程池实现多线程导入Excel数据的demo,在写时忘了做事务处理,评论区有个大佬提出来了,这章就对二十四章的代码做一个改造,完善多线程的事务处理。

            对于springboot的事务处理,前面在二十三章也做过springboot整合spring事务详解以及实战的学习,但是在多线程时,这个东西并不适用,本章就通过手写事务处理(编程式事务处理)。

            由于本章是针对二十四章的批量导入功能的扩展,所有不会再写事务处理不相关的(二十四章的内容)介绍了。


    qq交流群导航——>231378628

    目录

    一、阐述目的与实现方式

    二、手动让子线程报错

    三、改造主线程

    四、改造子线程

    五、测试


    一、阐述目的与实现方式

            前面章节实现的多线程处理excel导入功能,如果一个子线程出现错误,结果会是那个子线程的数据处理不了,而其他子线程的数据仍然正常处理保存,并不会存在事务处理的情况,本章改造代码实现事务处理,所有线程正常执行才会保存数据,否则就回滚。大致如下:

    二、手动让子线程报错

            为了后面测试事务回滚,手动让某个子线程报错,比如名为线程3的子线程,如下:


     三、改造主线程

            根据上面图的思路,首先改造主线程的代码,整体代码如下:

    1. package com.swagger.demo.service;
    2. import com.alibaba.excel.context.AnalysisContext;
    3. import com.alibaba.excel.event.AnalysisEventListener;
    4. import com.swagger.demo.config.SpringJobBeanFactory;
    5. import com.swagger.demo.mapper.DeadManMapper;
    6. import com.swagger.demo.model.entity.DeadManExcelData;
    7. import com.swagger.demo.thread.DeadManThread;
    8. import lombok.extern.slf4j.Slf4j;
    9. import org.springframework.stereotype.Component;
    10. import org.springframework.stereotype.Service;
    11. import java.util.ArrayList;
    12. import java.util.Collections;
    13. import java.util.List;
    14. import java.util.concurrent.*;
    15. import java.util.concurrent.atomic.AtomicBoolean;
    16. /**
    17. * @author zrc
    18. * @version 1.0
    19. * @description: TODO 最新入狱名单导入监听器
    20. *
    21. * @date 2022/5/30 15:56
    22. */
    23. @Service
    24. @Slf4j
    25. @Component
    26. public class DeadManExcelListener extends AnalysisEventListener {
    27. /**
    28. * 多线程保存集合,使用线程安全集合
    29. */
    30. private List list = Collections.synchronizedList(new ArrayList<>());
    31. /**
    32. * 创建线程池必要参数
    33. */
    34. private static final int CORE_POOL_SIZE = 10; // 核心线程数
    35. private static final int MAX_POOL_SIZE = 100; // 最大线程数
    36. private static final int QUEUE_CAPACITY = 100; // 队列大小
    37. private static final Long KEEP_ALIVE_TIME = 1L; // 存活时间
    38. public List getData(){
    39. return list;
    40. }
    41. public DeadManExcelListener(){
    42. }
    43. public void setData(List deadManExcelDataList){
    44. this.list = deadManExcelDataList;
    45. }
    46. @Override
    47. public void invoke(DeadManExcelData deadManExcelData, AnalysisContext analysisContext) {
    48. if(deadManExcelData!=null){
    49. list.add(deadManExcelData);
    50. }
    51. }
    52. /**
    53. * 多线程方式保存
    54. * @param analysisContext
    55. */
    56. @Override
    57. public void doAfterAllAnalysed(AnalysisContext analysisContext) {
    58. log.info("解析结束,开始插入数据");
    59. // 创建线程池
    60. ExecutorService executor = new ThreadPoolExecutor(CORE_POOL_SIZE,
    61. MAX_POOL_SIZE,
    62. KEEP_ALIVE_TIME,
    63. TimeUnit.SECONDS,
    64. new ArrayBlockingQueue<>(QUEUE_CAPACITY),
    65. new ThreadPoolExecutor.CallerRunsPolicy());
    66. // 指定每个线程需要处理的导入数量,假设每个线程处理15000条,注意配合上面线程池的大小
    67. int singleThreadDealCount = 15000;
    68. // 根据假设每个线程需要处理的数量以及总数,计算需要提交到线程池的线程数量
    69. int threadSize=(list.size()/singleThreadDealCount)+1;
    70. // 计算需要导入的数据总数,用于拆分时线程需要处理数据时使用
    71. int rowSize = list.size() + 1;
    72. // 测试开始时间
    73. long startTime = System.currentTimeMillis();
    74. // 申明该线程需要处理数据的开始位置
    75. int startPosition = 0;
    76. // 申明该线程需要处理数据的结束位置
    77. int endPosition = 0;
    78. // 为了让每个线程执行完后回到当前线程,使用CountDownLatch,值为线程数,每次线程执行完就会执行countDown方法减1,为0后回到主线程,也就是当前线程执行后续的代码
    79. CountDownLatch count = new CountDownLatch(threadSize);
    80. // 用来控制主线程回到子线程
    81. CountDownLatch mainCount = new CountDownLatch(1);
    82. // 用来控制最终回到主线程
    83. CountDownLatch endCount = new CountDownLatch(threadSize);
    84. // 用来存放子线程的处理结果,若出错就保存一个false
    85. CopyOnWriteArrayList sonResult = new CopyOnWriteArrayList();
    86. // 使用线程安全的对象存储,保存主线程最后总的判断结果,是提交还是回滚
    87. AtomicBoolean ifSubmit = new AtomicBoolean(true);
    88. // 计算每个线程要处理的数据
    89. for(int i=0;i
    90. // 如果是最后一个线程,为保证程序不发生空指针异常,特殊判断结束位置
    91. if((i+1)==threadSize){
    92. // 计算开始位置
    93. startPosition = (i * singleThreadDealCount);
    94. // 当前线程为划分的最后一个线程,则取总数据的最后为此线程的结束位置
    95. endPosition = rowSize-1;
    96. }else{
    97. // 计算开始位置
    98. startPosition = (i * singleThreadDealCount);
    99. // 计算结束位置
    100. endPosition = (i + 1) * singleThreadDealCount;
    101. }
    102. DeadManMapper deadManMapper = SpringJobBeanFactory.getBean(DeadManMapper.class);
    103. DeadManThread thread = new DeadManThread(count,deadManMapper,list,startPosition,endPosition
    104. ,sonResult,mainCount,ifSubmit,endCount);
    105. executor.execute(thread);
    106. }
    107. try {
    108. count.await();
    109. for (Boolean resp : sonResult) {
    110. if (!resp) {
    111. // 只要有一个子线程出异常,就设置最终结果为回滚
    112. log.info("主线程:有线程执行失败,所有线程需要回滚");
    113. ifSubmit.set(false);
    114. break;
    115. }
    116. }
    117. } catch (InterruptedException e) {
    118. e.printStackTrace();
    119. }finally {
    120. // 回到子线程处理回滚或者提交事务
    121. mainCount.countDown();
    122. }
    123. try {
    124. endCount.await();
    125. // 逻辑处理完,关闭线程池
    126. executor.shutdown();
    127. long endTime = System.currentTimeMillis();
    128. log.info("总耗时:"+(endTime-startTime));
    129. } catch (InterruptedException e) {
    130. e.printStackTrace();
    131. }
    132. }
    133. }

            新增如下4个参数(count是前面章节的已有的)。

     

    PS:CountDownLatch类前面有讲过,通过await和countDown方法能够方便的实现多个线程之间的来回切换。

    CopyOnWriteArrayList和AtomicBoolean是为了能够线程安全的保存多个线程共同使用的数据。 

            接着,重写DeadManThread线程类的构造方法,将上面新增的四个参数通过构造方法传给子线程。然后调用记录子线程第一次的cout的await方法,等待子线程第一次执行完毕,回到主线程继续执行。回到主线程后,主线程判断子线程第一次执行完后保存的返回集,判断是否存在false(子线程若报错,保存false,否则保存true)。若存在false就将idsubmit设置为false,意思是需要回滚数据,然后调用记录主线程执行的mainCount的countDown方法,让主线程执行完毕,回到子线程调用mainCount.countDown的位置继续子线程执行。

     

            当子线程根据ifSubmit判断完进行回滚还是提交事务操作后,回到主线程,主线程关闭线程池。

    四、改造子线程

            接着改造子线程,整体代码如下:

    1. package com.swagger.demo.thread;
    2. import com.swagger.demo.config.SpringJobBeanFactory;
    3. import com.swagger.demo.mapper.DeadManMapper;
    4. import com.swagger.demo.model.entity.DeadMan;
    5. import com.swagger.demo.model.entity.DeadManExcelData;
    6. import lombok.extern.slf4j.Slf4j;
    7. import org.springframework.beans.BeanUtils;
    8. import org.springframework.jdbc.datasource.DataSourceTransactionManager;
    9. import org.springframework.stereotype.Component;
    10. import org.springframework.transaction.TransactionDefinition;
    11. import org.springframework.transaction.TransactionStatus;
    12. import org.springframework.transaction.support.DefaultTransactionDefinition;
    13. import java.util.ArrayList;
    14. import java.util.Collections;
    15. import java.util.List;
    16. import java.util.concurrent.CopyOnWriteArrayList;
    17. import java.util.concurrent.CountDownLatch;
    18. import java.util.concurrent.atomic.AtomicBoolean;
    19. /**
    20. * @author zrc
    21. * @version 1.0
    22. * @description TODO
    23. * @date 2022/7/22 15:40
    24. */
    25. @Component
    26. @Slf4j
    27. public class DeadManThread implements Runnable{
    28. /**
    29. * 当前线程需要处理的总数据中的开始位置
    30. */
    31. private int startPosition;
    32. /**
    33. * 当前线程需要处理的总数据中的结束位置
    34. */
    35. private int endPosition;
    36. /**
    37. * 需要处理的未拆分之前的全部数据
    38. */
    39. private List list = Collections.synchronizedList(new ArrayList<>());
    40. /**
    41. * 记录子线程第一次执行是否完成
    42. */
    43. private CountDownLatch count;
    44. private DeadManMapper deadManMapper;
    45. /**
    46. * 保存每个线程的执行结果
    47. */
    48. private CopyOnWriteArrayList sonResult;
    49. /**
    50. * 记录主线程是否执行过判断每个线程的执行结果这个操作
    51. */
    52. private CountDownLatch mainCount;
    53. /**
    54. * 记录主线程对每个线程的执行结果的判断
    55. */
    56. private AtomicBoolean ifSubmit;
    57. /**
    58. * 声明该子线程的事务管理器
    59. */
    60. private DataSourceTransactionManager dataSourceTransactionManager;
    61. /**
    62. * 声明该线程事务的状态
    63. */
    64. private TransactionStatus status;
    65. /**
    66. * 记录子线程第二次执行是否完成
    67. */
    68. private CountDownLatch endCount;
    69. public DeadManThread() {
    70. }
    71. public DeadManThread(CountDownLatch count, DeadManMapper deadManMapper, List list
    72. , int startPosition, int endPosition, CopyOnWriteArrayList sonResult,CountDownLatch mainCount
    73. ,AtomicBoolean ifSubmit,CountDownLatch endCount) {
    74. this.startPosition = startPosition;
    75. this.endPosition = endPosition;
    76. this.deadManMapper = deadManMapper;
    77. this.list = list;
    78. this.count = count;
    79. this.sonResult = sonResult;
    80. this.mainCount = mainCount;
    81. this.ifSubmit = ifSubmit;
    82. this.endCount = endCount;
    83. }
    84. @Override
    85. public void run() {
    86. try{
    87. dataSourceTransactionManager = SpringJobBeanFactory.getBean(DataSourceTransactionManager.class);
    88. DefaultTransactionDefinition def = new DefaultTransactionDefinition();
    89. def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
    90. status = dataSourceTransactionManager.getTransaction(def);
    91. if(Thread.currentThread().getName().contains("3")){
    92. throw new RuntimeException("线程3出问题了");
    93. }
    94. List deadManList = new ArrayList<>();
    95. List newList = list.subList(startPosition, endPosition);
    96. // 将EasyExcel对象和实体类对象进行一个转换
    97. for (DeadManExcelData deadManExcelData : newList) {
    98. DeadMan deadMan = new DeadMan();
    99. BeanUtils.copyProperties(deadManExcelData, deadMan);
    100. deadManList.add(deadMan);
    101. }
    102. // 批量新增
    103. deadManMapper.insertBatchSomeColumn(deadManList);
    104. sonResult.add(true);
    105. } catch (Exception e) {
    106. e.printStackTrace();
    107. sonResult.add(false);
    108. } finally {
    109. // 当一个线程执行完了计数要减一不然这个线程会被一直挂起
    110. count.countDown();
    111. try {
    112. log.info(Thread.currentThread().getName() + ":准备就绪,等待其他线程结果,判断是否事务提交");
    113. mainCount.await();
    114. } catch (InterruptedException e) {
    115. e.printStackTrace();
    116. }
    117. if (ifSubmit.get()) {
    118. dataSourceTransactionManager.commit(status);
    119. log.info(Thread.currentThread().getName() + ":事务提交");
    120. } else {
    121. dataSourceTransactionManager.rollback(status);
    122. log.info(Thread.currentThread().getName() + ":事务回滚");
    123. }
    124. // 执行完所有逻辑,等待主线程执行
    125. endCount.countDown();
    126. }
    127. }
    128. }

            先改造构造方法,用于接受主线程传过来的参数,并设置到自己的内部私有。

            然后改造run方法。

            先开启自己的事务,并保存事务状态,用于后面执行提交或者回滚操作。数据处理完后,若正常结束将线程安全的返回值集合变量保存一个true,否则保存false,并执行记录第一线程执行的count的countDown方法,等待所有子线程执行完后,返回主线程执行刚才上面讲的判断sonResult的代码,等主线程执行完判断并设置ifSubmit的值后,回到子线程执行main.await之后的代码。

            如果ifSubmit是false就回滚,否则就提交,执行完后执行记录子线程第二次执行的endCount的countDown方法,等待子线程全部执行完后,回到主线程,主线程执行关闭线程池的逻辑,结束。 

    五、测试

            代码写完,测试一下。测试之前数据:

            现在是线程3会报错,调用接口测试。

            事务成功回滚,数据没有提交。若删除手动抛异常的代码,让程序正常执行,如下:

     

             数据提交成功,事务正常处理。

  • 相关阅读:
    【SA8295P 源码分析 (二)】37 - OpenWFD Server 启动流程 之 openwfd_server.c main 函数源码分析
    GDAL+Java实现获取对应栅格影像经纬度对应的像素值
    Java如何从字符串中提取数字
    相似度系列-6:单维度方法:Evaluating Coherence in Dialogue Systems using Entailment
    异步函数(async/await)
    RT-DETR手把手教程,注意力机制如何添加在网络的不同位置进行创新优化,EMA注意力为案列
    【Python大数据笔记_day07_hive中的分区表、分桶表以及一些特殊类型】
    ALTERA FPGA IPCORE核之单口RAM详细教程
    【微服务治理之监控APM】系统监控架构概述
    Roson的Qt之旅 #117 QTcpSocket和QUdpSocket详细介绍
  • 原文地址:https://blog.csdn.net/weixin_56995925/article/details/126331000