• parallelStream/ForkJoinPool 详解


    parallelStream

    parallelStream是一种并行流, 意为处理任务时并行处理。

    parallelStream底层使用的是ForkJoinPoolForkJoinPool是一种工作窃取算法线程池,和分治法的概念一致,可以充分利用多 CPU 的优势,把一个任务拆分成多个"小任务", 把多个"小任务"放到多个处理器核心上并行执行; 当多个"小任务"执行完成之后, 再将这些执行结果合并起来

    工作窃取算法线程池

    前提是硬件支持, 如果单核 CPU, 只会存在并发处理, 而不会并行

    核心概念

    调用并行流的API

    1. stringList.parallelStream()
    2. stringList.stream().parallel()
    3. Stream.of(stringList).parallel()

    虽然 API 的调用方式不同, 但是底层都是将AbstractPipeline中的parallel标识设置为true

    public final S parallel() {
       sourceStage.parallel = true;
       return (S) this;
    }
    
    • 1
    • 2
    • 3
    • 4

    并行流parallerStream的底层都是使用同一个ForkJoinPool,而ForkJoinPool线程数默认为cpu的核心数-1

    // 查看内核的可用核数
    Runtime.getRuntime().availableProcessors()
    // ForkJoinPool线程数
    ForkJoinPool.commonPool().getParallelism()
    
    • 1
    • 2
    • 3
    • 4

    修改默认的线程数量

    System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
    
    • 1

    不建议直接修改commonPool线程数,自行创建一个ForkJoinPool更优

    注意点

    1. parallelStream线程不安全问题(加锁/使用线程安全的集合(如ConcurrentHashMap)/集合采用collect()/reduce()操作)
    2. parallelStream 适用的场景是计算密集型的,假如服务器CPU的负载很大,那并不能起到并行计算的作用(尽量不要在paralelSreram操作中使用IO流)
    3. 不要在多线程中使用parallelStream,原因同上,当线程都在争抢CPU时不但没有提升效果,反而还会加大线程切换的开销

    ForkJoinPool

    工作窃取算法线程池

    1. ForkJoinPool的每个ForkJoinWorkerThread下都维护着一个工作队列(WorkQueue),这是一个双端队列,里面存放的对象是任务ForkJoinTask
    2. ForkJoinWorkerThread在运行中产生新的任务(通常是因为调用了fork())时,会放入工作队列的队尾,并且会在队尾取出任务(LIFO)
    3. ForkJoinWorkerThread在处理自己的工作队列同时,会尝试窃取steal一个任务(来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首(FIFO)
    4. 在遇到join()时,如果需要join的任务尚未完成,则会先处理其他任务,直到目标的任务方法被告知已经结束(isDone()),所有的任务都是无阻塞的完成

    自定义ForkJoinPool并行流

    public void mission() {
        try {
            List<Long> idList = Lists.newArrayList(1L, 2L, 3L);
    
            ForkJoinPool pool = new ForkJoinPool(20);
            pool.submit(() -> {
                idList.parallelStream().forEach(uid -> {
                    //do something
                });
            });
            // 等待任务执行完毕
            pool.awaitTermination(2, TimeUnit.SECONDS);
            log.info("mission done");
        } catch (InterruptedException e) {
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    若需要等待所有任务执行完毕,可使用下述方式

    1. ForkJoinPool#awaitTermination()
    2. 显式submit任务,使用ForkJoinTask#get()阻塞
    3. CountDownLatch
    public void mission() {
        try {
            List<Long> idList = Lists.newArrayList(1L, 2L, 3L);
    
            CountDownLatch countDownLatch = new CountDownLatch(1);
            ForkJoinPool pool = new ForkJoinPool(20);
            pool.submit(() -> {
                idList.parallelStream().forEach(uid -> {
                    //do something
                });
                countDownLatch.countDown();
            });
            // 等待任务执行完毕
            countDownLatch.await();
    
            log.info("mission done");
        } catch (InterruptedException e) {
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    参考资料:

    1. 谨慎使用 Java8 新特性 parallelStream
    2. ParallelStream的陷阱
    3. ForkJoinPool线程池—独门专访
  • 相关阅读:
    理解ASP.NET Core - 全球化&本地化&多语言(Globalization and Localization)
    入职美团定级P7,总结2022年最新最全180道高级岗面试题及答案
    POJ 3185 The Water Bowls 反转+点灯游戏
    跨域问题以及经过网关二次转发重复跨域
    【笔者感悟】笔者的工作感悟【四】
    pandas教程:Introduction to pandas Data Structures pandas的数据结构
    Spring 深入——IoC 容器 01
    Leetcode581. 最短无序连续子数组
    基于JAVA疫情下的居民管理系统计算机毕业设计源码+系统+数据库+lw文档+部署
    【暑期集训第一周:搜索】【DFS&&BFS】
  • 原文地址:https://blog.csdn.net/why_still_confused/article/details/128027279