流在处理数据进行一些迭代操作的时候确认很方便,但是在执行一些耗时或是占用资源很高的任务时候,串行化的流无法带来速度/性能上的提升,并不能满足我们的需要。
通常我们会使用多线程来并行或是分片分解执行任务,而在Stream中也提供了这样的并行方法,下面将会一一介绍这些方法。
使用parallelStream()方法或者是使用stream().parallel()来转化为并行流。
但是只是可能会返回一个并行的流,流是否能并行执行还受到其他一些条件的约束(如是否有序,是否支持并行)。
对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。如果对这个方法调用了多次,将以最后一次执行为准。
package com.morris.java8.parallel;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class ParallerDemo {
public static void main(String[] args) {
IntStream list = IntStream.range(0, 6);
//开始并行执行
list.parallel().forEach(i -> {
Thread thread = Thread.currentThread();
System.err.println("integer:" + i + "," + "currentThread:" + thread.getName());
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
运行结果如下:
integer:3,currentThread:main
integer:4,currentThread:ForkJoinPool.commonPool-worker-3
integer:5,currentThread:ForkJoinPool.commonPool-worker-2
integer:1,currentThread:ForkJoinPool.commonPool-worker-1
integer:2,currentThread:ForkJoinPool.commonPool-worker-1
integer:0,currentThread:ForkJoinPool.commonPool-worker-3
从运行结果里面我们可以很清楚的看到parallelStream同时使用了主线程和ForkJoinPool.commonPool创建的线程。 值得说明的是这个运行结果并不是唯一的,实际运行的时候可能会得到多个结果。
看看流的parallel方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?
并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。
但是你可以通过系统属性java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,如下所示:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。
// 设置全局并行流并发线程数
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 12
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 12
为什么两次的运行结果是一样的呢?上面刚刚说过了这是一个全局设置,java.util.concurrent.ForkJoinPool.common.parallelism是final类型的,整个JVM中只允许设置一次。既然默认的并发线程数不能反复修改,那怎么进行不同线程数量的并发测试呢?答案是:引入ForkJoinPool。
IntStream range = IntStream.range(1, 100000);
// 传入parallelism
new ForkJoinPool(parallelism).submit(() -> range.parallel().forEach(System.out::println)).get();
因此,使用parallelStream时需要注意的一点是,多个parallelStream之间默认使用的是同一个线程池,所以IO操作尽量不要放进parallelStream中,否则会阻塞其他parallelStream。
// 获取当前机器CPU处理器的数量
System.out.println(Runtime.getRuntime().availableProcessors());// 输出 4
// parallelStream默认的并发线程数
System.out.println(ForkJoinPool.getCommonPoolParallelism());// 输出 3
为什么parallelStream默认的并发线程数要比CPU处理器的数量少1个?因为最优的策略是每个CPU处理器分配一个线程,然而主线程也算一个线程,所以要占一个名额。 这一点可以从源码中看出来:
static final int MAX_CAP = 0x7fff; // max #workers - 1
// 无参构造函数
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
下面通过几种方式计算数据的和来测试流的性能。
package com.morris.java8.parallel;
import java.util.function.Function;
import java.util.stream.LongStream;
import java.util.stream.Stream;
public class ParallerStreamExample {
public static void main(String[] args) {
long n = 100_000_000;
System.out.println("normal:" + recordTime(ParallerStreamExample::normal, n) + " MS");
System.out.println("iterator:" + recordTime(ParallerStreamExample::iterator, n) + " MS");
// 太耗时,暂时注释
// System.out.println("iteratorParallel:" + recordTime(ParallerStreamExample::iteratorParallel, n) + " MS");
System.out.println("longStream:" + recordTime(ParallerStreamExample::longStream, n) + " MS");
System.out.println("longStreamParallel:" + recordTime(ParallerStreamExample::longStreamParallel, n) + " MS");
}
public static long recordTime(Function<Long, Long> function, long n) {
long lowestCostTime = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long startTime = System.currentTimeMillis();
function.apply(n);
long costTime = System.currentTimeMillis() - startTime;
if(costTime < lowestCostTime) {
lowestCostTime = costTime;
}
}
return lowestCostTime;
}
/**
* 正常for循环
* @param n
* @return
*/
public static long normal(long n) {
long result = 0;
for(long i = 1; i <= n; i++) {
result += i;
}
return result;
}
/**
* iterate顺序流
* @param n
* @return
*/
public static long iterator(long n) {
return Stream.iterate(1L, t -> t + 1).limit(n).reduce(0L, Long::sum);
}
/**
* iterate并行流
* @param n
* @return
*/
public static long iteratorParallel(long n) {
return Stream.iterate(1L, t -> t + 1).parallel().limit(n).reduce(0L, Long::sum);
}
/**
* rangeClosed顺序流
* @param n
* @return
*/
public static long longStream(long n) {
return LongStream.rangeClosed(1, n).sum();
}
/**
* rangeClosed并行流
* @param n
* @return
*/
public static long longStreamParallel(long n) {
return LongStream.rangeClosed(1, n).parallel().sum();
}
}
运行结果如下:
normal:33 MS
iterator:990 MS
longStream:44 MS
longStreamParallel:16 MS
结论:
Stream串行性能明显差于for循环迭代,因为Stream串行还有流水线成本在里面。
并行的Stream API能够发挥多核特性,但是有时候不如串行流(比如后面的计算依赖前面的计算结果就不适宜用并行流)
下面是一些使用并行流需要思考的方面:
留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream、DoubleStream)来避免这种操作,但凡有可能都应该用这些流。
有些操作本身在并行流上的性能就比顺序流差,比如后面的计算依赖前面的计算结果。
还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。
流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。
还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。