本章我们将分析终止操作相关源码,深入了解内部原理。
在Stream API中有一个接口TerminalOp代表终止操作。
- interface TerminalOp<E_IN, R> {
-
- default StreamShape inputShape() { return StreamShape.REFERENCE; }
-
- default int getOpFlags() { return 0; }
-
- default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
- Spliterator<P_IN> spliterator) {
- if (Tripwire.ENABLED)
- Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
- return evaluateSequential(helper, spliterator);
- }
-
- <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
- Spliterator<P_IN> spliterator);
- }
接口上定义了两个泛型类型:
来看下方法定义:
- enum StreamShape {
-
- REFERENCE,
- INT_VALUE,
- LONG_VALUE,
-
- DOUBLE_VALUE
- }
通过继承关系可以看到,只有四个类直接实现TerminalOp接口,Stream有这么多终止方法,而实际上可以归类为4个,单独说明一下:toArray()方法也是终止操作,但是与其它不同,没有使用TerminalOp。
现在来看下终止操作划分:
操作类型 | 方法 |
---|---|
非短路操作 | forEach() forEachOrdered() toArray() reduce() collect() min() max() count() |
短路操作 | anyMatch() allMatch() noneMatch() findFirst() findAny() |
非短路操作主要有ForEachOp和ReduceOp两个TerminalOp实现,其实toArray()也属于非短路操作,但是它没有依靠实现TerminalOp来完成相应的功能,所以不作讲解,感兴趣的小伙伴可以自己看一下相关源码。
用于遍历Stream元素的forEach()和forEachOrdered()方法都是通过ForEachOp的子类完成相应工作的。
ForEachOp的继承结构比较复杂,除了实现上面的TerminalOp外,还实现了TerminalSink。TerminalSink聚合了Sink和Supplier接口,这两个大家应该都不陌生,Sink在前面的文章有详细讲解,Supplier是一个函数式接口,用于提供一个结果给调用者。
通过前面的文章和继承结构,我们可以大胆猜测:ForEachOp除了具备终止操作的能力,在数据处理之前,自己还会作为一个sink,与中间操作中的sink实例组成sink链表,通过责任链模式依次处理Stream中的元素。到底是不是这样呢?我们进入源码求证。
- //省略了并行处理相关的代码
- //泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值
- static abstract class ForEachOp<T>
- implements TerminalOp<T, Void>, TerminalSink<T, Void> {
- //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注
- private final boolean ordered;
-
- protected ForEachOp(boolean ordered) {
- this.ordered = ordered;
- }
-
- // TerminalOp
-
- @Override
- public int getOpFlags() {
- return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
- }
-
- @Override
- public <S> Void evaluateSequential(PipelineHelper<T> helper,
- Spliterator<S> spliterator) {
- return helper.wrapAndCopyInto(this, spliterator).get();
- }
-
- // TerminalSink
-
- @Override
- public Void get() {
- return null;
- }
- }
重点关注evaluateSequential(),内部会调用PipelineHelper#wrapAndCopyInto()方法,看到这个方法是不是很熟悉,没错就是前面文章中多次提到的处理数据的方法,它的职责是将传入sink与中间操作产生的sink组合成链表,然后调用源Spliterator的方法,发送Stream元素给sink链处理。
我们进入这个方法看一下,它是在AbstractPipeline中实现的:
- final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator
spliterator) { - copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
- return sink;
- }
可以看到首先调用wrapSink()方法,将封装有终止操作逻辑的sink再次包装:
- final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
- Objects.requireNonNull(sink);
-
- for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
- sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
- }
- return (Sink<P_IN>) sink;
- }
wrapSink()方法前面也有多次讲到,Stream经过一系列中间操作调用,返回的实际上是一个Stream链表,结构如下:
这里的逻辑就是从后向前遍历这个链表,调用每一个节点上的AbstractPipeline#opWrapSink()方法,将除开Head节点之外,每一个中间操作当中的sink,与后一个中间操作的sink节点连接,最后一个节点就是代表终止操作的sink。
拿到sink链表之后,再来看看copyInto()方法是如何执行数据处理逻辑的:
- final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
- Objects.requireNonNull(wrappedSink);
-
- //非短路操作
- if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
- //1.begin()方法调用
- wrappedSink.begin(spliterator.getExactSizeIfKnown());
- //2.forEachRemaining()方法调用,发送元素
- spliterator.forEachRemaining(wrappedSink