parallelStream
是一种并行流, 意为处理任务时并行处理。
parallelStream
底层使用的是ForkJoinPool
。ForkJoinPool
是一种工作窃取算法线程池,和分治法的概念一致,可以充分利用多 CPU 的优势,把一个任务拆分成多个"小任务", 把多个"小任务"放到多个处理器核心上并行执行; 当多个"小任务"执行完成之后, 再将这些执行结果合并起来
前提是硬件支持, 如果单核 CPU, 只会存在并发处理, 而不会并行
调用并行流的API
虽然 API 的调用方式不同, 但是底层都是将AbstractPipeline
中的parallel
标识设置为true
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}
并行流parallerStream的底层都是使用同一个ForkJoinPool,而ForkJoinPool线程数默认为cpu的核心数-1
// 查看内核的可用核数
Runtime.getRuntime().availableProcessors()
// ForkJoinPool线程数
ForkJoinPool.commonPool().getParallelism()
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
不建议直接修改commonPool线程数,自行创建一个ForkJoinPool更优
ConcurrentHashMap
)/集合采用collect()/reduce()
操作)ForkJoinPool
的每个ForkJoinWorkerThread
下都维护着一个工作队列(WorkQueue
),这是一个双端队列,里面存放的对象是任务ForkJoinTask
ForkJoinWorkerThread
在运行中产生新的任务(通常是因为调用了fork()
)时,会放入工作队列的队尾,并且会在队尾取出任务(LIFO
)steal
一个任务(来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首(FIFO
)join()
时,如果需要join
的任务尚未完成,则会先处理其他任务,直到目标的任务方法被告知已经结束(isDone()
),所有的任务都是无阻塞的完成public void mission() {
try {
List<Long> idList = Lists.newArrayList(1L, 2L, 3L);
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> {
idList.parallelStream().forEach(uid -> {
//do something
});
});
// 等待任务执行完毕
pool.awaitTermination(2, TimeUnit.SECONDS);
log.info("mission done");
} catch (InterruptedException e) {
}
若需要等待所有任务执行完毕,可使用下述方式
public void mission() {
try {
List<Long> idList = Lists.newArrayList(1L, 2L, 3L);
CountDownLatch countDownLatch = new CountDownLatch(1);
ForkJoinPool pool = new ForkJoinPool(20);
pool.submit(() -> {
idList.parallelStream().forEach(uid -> {
//do something
});
countDownLatch.countDown();
});
// 等待任务执行完毕
countDownLatch.await();
log.info("mission done");
} catch (InterruptedException e) {
}
参考资料: