• Java8 Stream源码精讲(四):一文说透四种终止操作


    本章我们将分析终止操作相关源码,深入了解内部原理。

    终止操作

    在Stream API中有一个接口TerminalOp代表终止操作。

    1. interface TerminalOp<E_IN, R> {
    2. default StreamShape inputShape() { return StreamShape.REFERENCE; }
    3. default int getOpFlags() { return 0; }
    4. default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper,
    5. Spliterator<P_IN> spliterator) {
    6. if (Tripwire.ENABLED)
    7. Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
    8. return evaluateSequential(helper, spliterator);
    9. }
    10. <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper,
    11. Spliterator<P_IN> spliterator);
    12. }

    接口上定义了两个泛型类型:

    • E_IN:输入的元素类型
    • R:结果类型

    来看下方法定义:

    • inputShape():获取操作的输入元素类型,返回枚举StreamShape,译为Stream的形状。可以看到定义了四种枚举,正好对应Stream、IntStream、LongStream、DoubleStream。
    1. enum StreamShape {
    2. REFERENCE,
    3. INT_VALUE,
    4. LONG_VALUE,
    5. DOUBLE_VALUE
    6. }
    • evaluateParallel():Stream是并行时,终止操作最终调用这个方法处理数据,我们不关心。
    • evaluateSequential():串行流时最终调用这个方法处理数据。

    通过继承关系可以看到,只有四个类直接实现TerminalOp接口,Stream有这么多终止方法,而实际上可以归类为4个,单独说明一下:toArray()方法也是终止操作,但是与其它不同,没有使用TerminalOp。

    现在来看下终止操作划分:

    操作类型 方法
    非短路操作 forEach() forEachOrdered() toArray() reduce() collect() min() max() count()
    短路操作 anyMatch() allMatch() noneMatch() findFirst() findAny()

    非短路操作

    非短路操作主要有ForEachOp和ReduceOp两个TerminalOp实现,其实toArray()也属于非短路操作,但是它没有依靠实现TerminalOp来完成相应的功能,所以不作讲解,感兴趣的小伙伴可以自己看一下相关源码。

    ForEachOp

    用于遍历Stream元素的forEach()和forEachOrdered()方法都是通过ForEachOp的子类完成相应工作的。

    ForEachOp的继承结构比较复杂,除了实现上面的TerminalOp外,还实现了TerminalSink。TerminalSink聚合了Sink和Supplier接口,这两个大家应该都不陌生,Sink在前面的文章有详细讲解,Supplier是一个函数式接口,用于提供一个结果给调用者。

    通过前面的文章和继承结构,我们可以大胆猜测:ForEachOp除了具备终止操作的能力,在数据处理之前,自己还会作为一个sink,与中间操作中的sink实例组成sink链表,通过责任链模式依次处理Stream中的元素。到底是不是这样呢?我们进入源码求证。

    1. //省略了并行处理相关的代码
    2. //泛型声明:T表示输入元素类型;Void表示返回结果类型,forEach()没有返回值
    3. static abstract class ForEachOp<T>
    4. implements TerminalOp<T, Void>, TerminalSink<T, Void> {
    5. //odered表示是否按元素顺序遍历,因为并行处理可能不是按Stream中元素顺序遍历,不用过多关注
    6. private final boolean ordered;
    7. protected ForEachOp(boolean ordered) {
    8. this.ordered = ordered;
    9. }
    10. // TerminalOp
    11. @Override
    12. public int getOpFlags() {
    13. return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
    14. }
    15. @Override
    16. public <S> Void evaluateSequential(PipelineHelper<T> helper,
    17. Spliterator<S> spliterator) {
    18. return helper.wrapAndCopyInto(this, spliterator).get();
    19. }
    20. // TerminalSink
    21. @Override
    22. public Void get() {
    23. return null;
    24. }
    25. }

    重点关注evaluateSequential(),内部会调用PipelineHelper#wrapAndCopyInto()方法,看到这个方法是不是很熟悉,没错就是前面文章中多次提到的处理数据的方法,它的职责是将传入sink与中间操作产生的sink组合成链表,然后调用源Spliterator的方法,发送Stream元素给sink链处理。

    我们进入这个方法看一下,它是在AbstractPipeline中实现的:

    1. final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator spliterator) {
    2. copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    3. return sink;
    4. }

    可以看到首先调用wrapSink()方法,将封装有终止操作逻辑的sink再次包装:

    1. final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    2. Objects.requireNonNull(sink);
    3. for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
    4. sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    5. }
    6. return (Sink<P_IN>) sink;
    7. }

    wrapSink()方法前面也有多次讲到,Stream经过一系列中间操作调用,返回的实际上是一个Stream链表,结构如下:

    这里的逻辑就是从后向前遍历这个链表,调用每一个节点上的AbstractPipeline#opWrapSink()方法,将除开Head节点之外,每一个中间操作当中的sink,与后一个中间操作的sink节点连接,最后一个节点就是代表终止操作的sink。

    拿到sink链表之后,再来看看copyInto()方法是如何执行数据处理逻辑的:

    1. final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    2. Objects.requireNonNull(wrappedSink);
    3. //非短路操作
    4. if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
    5. //1.begin()方法调用
    6. wrappedSink.begin(spliterator.getExactSizeIfKnown());
    7. //2.forEachRemaining()方法调用,发送元素
    8. spliterator.forEachRemaining(wrappedSink
  • 相关阅读:
    XSKY 文件存储首次进入 IDC 榜单
    【计算机网络笔记】路由算法之距离向量路由算法
    docker安装单机版redis、集群版redis
    一种ESDF地图实现方法:FIESTA
    linux下文件存储系统(inode/目录项/硬链接)
    Linux文件权限
    简单易用的地图可视化
    Verilog语言实现设计交通灯控制器
    Java面试题全集(上)
    python学习 6.15
  • 原文地址:https://blog.csdn.net/xhbzl/article/details/126686831