• 多线程优化导入支持事务二


    Java多线程优化导入支持事务二


    可以在项目中使用的多线程导入,支持事务,异常立即回滚
    使用到线程池+CountDownLatch+transactionManager+AtomicBoolean

    创建线程池

    @Bean("taskExecutor")
        public Executor taskExecutro(){
            int i = Runtime.getRuntime().availableProcessors();
            System.out.println("系统最大线程数  : "+i);
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setCorePoolSize(i);
            taskExecutor.setMaxPoolSize(i);
            taskExecutor.setQueueCapacity(99999);
            taskExecutor.setKeepAliveSeconds(60);
            taskExecutor.setThreadNamePrefix("taskExecutor--");
            taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
            taskExecutor.setAwaitTerminationSeconds(60);
            return taskExecutor;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    切分数据,启动线程

    public void importExcel2() {
            List<User> list = getUserList();
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            AtomicBoolean isCommit = new AtomicBoolean(true);
            //每个线程处理200条数据
            int sliceLength = 200;
            //线程数
            int threadCount = (list.size() -1)/sliceLength + 1;
    
            //通过子线程进入主线程
            CountDownLatch threadToMainCdl = new CountDownLatch(threadCount);
            //通过主线程进入子线程
            CountDownLatch mainToThreadCdl = new CountDownLatch(1);
            //线程开始角标
            int startIndex = 0;
            //线程结束角标
            int endIndex = 0;
            for (int i = 0; i < threadCount; i++) {
                startIndex = i * sliceLength;
                //处理最后一条线程
                if(i == threadCount - 1){
                    endIndex = list.size();
                }else{
                    endIndex = (i + 1) * sliceLength;
                }
                List<User> users = list.subList(startIndex, endIndex);
                taskExecutor.execute(() -> {
                    insertUser(users, isCommit,threadToMainCdl,mainToThreadCdl);
                });
            }
            try {
                boolean await = threadToMainCdl.await(30, TimeUnit.SECONDS);
                if (!await){
                    isCommit.set(false);
                }
            } catch (InterruptedException e) {
                isCommit.set(false);
            }
            mainToThreadCdl.countDown();
            stopWatch.stop();
            System.out.println("导入耗时: "+stopWatch.getTotalTimeSeconds());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    插入数据

    private void insertUser(List<User> users, AtomicBoolean isCommit, CountDownLatch threadToMainCdl, CountDownLatch mainToThreadCdl) {
            TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
            Random random = new Random();
            long l = random.nextLong(10000);
            try {
                for (User user : users) {
                    System.out.println("插入数据:" + user.toString());
                    if(isCommit.get()){
                        userMapper.insert(user);
                    TimeUnit.MILLISECONDS.sleep(100);
    //            if(l > 9000){
    //                int s = 10 / 0;
    //            }
                    }else{
                        break;
                    }
                }
            } catch (Exception e) {
                isCommit.set(false);
            }finally {
                threadToMainCdl.countDown();
            }
            try {
                mainToThreadCdl.await();
            } catch (InterruptedException e) {
                isCommit.set(false);
            }
            if(isCommit.get()){
                transactionManager.commit(transaction);
            }else{
                transactionManager.rollback(transaction);
            }
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    注意点

    需要注意,线程池最大线程数要大于等于切片数,如果小于分片数会导致线程等待

  • 相关阅读:
    基于SpringMVC+html5+BootStrap的图书销售智能推荐系统
    k8s中常用命令总结
    ahooks解决React闭包问题方法示例
    分布式天花板?阿里百万架构师的ZK+Dubbo笔记,颠覆认知
    LeetCode简单题之公因子的数目
    擎创技术流 | ClickHouse实用工具—ckman教程(1)部署安装
    镜舟科技荣获第十三届中国智能制造高峰论坛两项大奖
    苹果14手机怎么投屏到mac电脑上面?
    都说了能不动就别动,非要去调整,出生产事故了吧 → 补充
    基于RabbitMQ的模拟消息队列之五——虚拟主机设计
  • 原文地址:https://blog.csdn.net/weixin_42202992/article/details/133011464