• 性能优化实战使用CountDownLatch


    1.分析问题

    原程序是分页查询EventAffinityScoreDO表的数据,每次获取2000条在一个个遍历去更新EventAffinityScoreDO表的数据。但是这样耗时比较慢,测试过30万的数据需要2小时

      private void eventSubjectHandle(String tenantId, String eventSubject) {
        // 查询eventAffinityScoreDO表,更新时间小于今天的(今天更新过的不更新)
        final Integer pageSize = 2000;
        PageResult<EventAffinityScoreDO> groupPag =
            eventAffinityScoreDbService.findByTenantIdAndTimePage(tenantId, eventSubject, 1, pageSize);
        Integer pages = groupPag.getPages();
        Integer pageNum = groupPag.getPageNum();
        while (pages >= pageNum) {
          if (pageNum > 1) {
            groupPag =
                eventAffinityScoreDbService.findByTenantIdAndTimePage(
                    tenantId, eventSubject, 1, pageSize);
          }
          List<EventAffinityScoreDO> list = groupPag.getList();
          forEventAffinityScore(tenantId, eventSubject, list);
          if (list.size() < pageSize) {
            break;
          }
          pageNum++;
        }
      }
    
    
      private void forEventAffinityScore(
          String tenantId, String eventSubject, List<EventAffinityScoreDO> eventAffinityScoreDOS) {
        eventAffinityScoreDOS.forEach(
            (eventAffinityScoreDO) -> {
             //更新EventAffinityScoreDO表数据
              updateOrAddAffinity(
                  tenantId,
                  eventAffinityScoreDO.getChatLabsId(),
                  eventAffinityScoreDO.getEconomyId(),
                  eventAffinityScoreDO.getAttributeValue(),
                  eventSubject,
                  eventAffinityScoreDO.getAttributeName());
            });
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    单个线程一个个遍历去更新表数据太慢了,我想把2000的数据分成多份,每份200条,可以分成10份。每份用一个线程去跑。这样跑2000的时间就大大缩短。大概等于跑200个数据的时间。
    这里想到使用CountDownLatch

    2.知识点CountDownLatch

    CountDownLatch 是 Java 中的一个并发工具类,用于在多线程环境中控制线程的执行顺序。它允许一个或多个线程等待其他线程完成操作后再继续执行。
    CountDownLatch 的构造方法接受一个整数作为参数,表示需要等待的线程数量。当一个线程完成了自己的任务后,可以调用 countDown() 方法来将计数器减1。当计数器的值变为0时,所有等待的线程都会被释放,可以继续执行。

    3.解决问题

    我们使用Lists.partition,把2000的集合拆分成每份200的小份,共10分。
    CountDownLatch countDownLatch = new CountDownLatch(partition.size())设置CountDownLatch需要等待的线程数为拆分后的份数partition.size(),也就是10份
    countDownLatch.countDown(); 每跑完一份计数器减一
    countDownLatch.await();计数器减完主程序开始执行,继续循环后面的2000份

     private void eventSubjectHandle(String tenantId, String eventSubject)
            throws InterruptedException {
          // 查询eventAffinityScoreDO表,更新时间小于今天的(今天更新过的不更新)
          final Integer pageSize = 2000;
          PageResult<EventAffinityScoreDO> groupPag =
              eventAffinityScoreDbService.findByTenantIdAndTimePage(tenantId, eventSubject, 1,
       pageSize);
          Integer pages = groupPag.getPages();
          Integer pageNum = groupPag.getPageNum();
          while (pages >= pageNum) {
            if (pageNum > 1) {
              groupPag =
                  eventAffinityScoreDbService.findByTenantIdAndTimePage(
                      tenantId, eventSubject, 1, pageSize);
            }
            List<EventAffinityScoreDO> list = groupPag.getList();
            //Lists.partition把list进行拆分,没份200个
            List<List<EventAffinityScoreDO>> partition = Lists.partition(list, 200);
            //设置需要等待的线程数量,就是我们的集合大小
            CountDownLatch countDownLatch = new CountDownLatch(partition.size());
            for (List<EventAffinityScoreDO> eventAffinityScoreDOS : partition) {
              eventSubjectExecutorPool.execute(
                  () -> {
                    try {
                      forEventAffinityScore(tenantId, eventSubject, eventAffinityScoreDOS);
                    } catch (Exception e) {
                      log.info(
                          "AutoAffinityJob updateAffinityByEventSubject error tenantId:{},eventSubject:{}",
                          tenantId,
                          eventSubject,
                          e);
                    }
                    //每处理完200份计数器减一
                    countDownLatch.countDown();
                  });
            }
            //计数器减完主程序开始执行,继续循环后面的2000份
            countDownLatch.await();
            if (list.size() < pageSize) {
              break;
            }
            pageNum++;
          }
        }
    
    
      private void forEventAffinityScore(
          String tenantId, String eventSubject, List<EventAffinityScoreDO> eventAffinityScoreDOS) {
        eventAffinityScoreDOS.forEach(
            (eventAffinityScoreDO) -> {
              // 根据生态中事件属性属性值更新or新增影响到的内容亲和力
              updateOrAddAffinity(
                  tenantId,
                  eventAffinityScoreDO.getChatLabsId(),
                  eventAffinityScoreDO.getEconomyId(),
                  eventAffinityScoreDO.getAttributeValue(),
                  eventSubject,
                  eventAffinityScoreDO.getAttributeName());
            });
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    这里需要注意的是如果线程池设置的太小,会导致触发拒绝策略。如果触发了拒绝策略countDownLatch.countDown()就不会执行了。就会导致countDownLatch.await()一直等待。所以这里我把线程池的队列设置的很大Integer.MAX_VALUE,这样不会触发拒绝策略。因为我们最多就10个线程,也不会导致出现OOM

    @Configuration
    @Slf4j
    public class CalculateAffinityThreadPool {
    
      @Bean(name = "eventSubjectExecutorPool")
      public ExecutorService eventSubjectExecutorPool() {
        int poolSize = ThreadExecutorUtils.getNormalCoreSize();
        return ThreadExecutorUtils.createNormalThreadPool(
            poolSize,
            poolSize,
            0L,
            TimeUnit.MILLISECONDS,
            Integer.MAX_VALUE,
            "eventSubject-pool",
            false);
      }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    经过测试跑30万的数据只需要20分钟了。

  • 相关阅读:
    html之表格标签和列表标签
    在visual studio里安装Python并创建python工程
    物无定味适口者珍,Python3并发场景(CPU密集/IO密集)任务的并发方式的场景抉择(多线程threading/多进程multiprocessing/协程asyncio)
    计算机网络——b站王道考研笔记
    Exch:修复丢失的系统邮箱
    重上吹麻滩——段芝堂创始人翟立冬游记
    Element UI表格的序号递增(点击分页后)
    不容错过!什么是领域驱动设计?为什么落地这么难?
    阿里面试:我们为什么要分库分表
    Django入门教程
  • 原文地址:https://blog.csdn.net/qq_25292419/article/details/133418477