• Stream的简单学习


    抛砖引玉

    假如现在你有这么个任务给你一堆原材料让你加工成产品,你需要怎么做?
    比如:有一个数据集D{1,2,3,4,5,6,7,8,9},现在要求先取平方,再加1,最后得到一个无重的数据集,请写出代码。
    代码可能如下:

    int[] data={1,2,3,4,5,6,7,8,9};
    Set<Integer> set1=new HashSet<>();
    for (int i = 0; i < data.length; i++) {
        int x=data[i]*data[i];
        int y=x+1;
        set1.add(y);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    simply unbelievable! It’s so easy. 确实这么写没毛病,但有另一套。

    Stream.of(1, 2, 3).map(x -> x *=x).map(x->x+=1).collect(Collectors.toSet())
    
    • 1

    The world has been different. 这中间产生了什么变化呢?
    通过观察发现,原始代码可以分为三部分:源数据、行为和数据收集。具体来说:

    //第一部分
    int[] data={1,2,3,4,5,6,7,8,9} //用of代替
    //第二部分
    Set<Integer> set=new HashSet<>();
    set.add(??);//用collect代替(这里与实际有偏差,现在先这么理解)
    //第三部分
    for(int i...){...}//用function代替
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    源数据

    对于源数据也就是int数组,是用of方法来生成一个Stream。Stream是一个接口,它应有三个方法,of,map,collect。

    public interface Stream<T> {
        static <T> Stream<T> of(T... values){
            return new HeadStream<>(values);
        }
        <R> Stream<R> map(Function<? super T,? extends R> function);
        <A,R> R collect(Collector<? super T,A,R> collector);
    }    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    题外话:这里涉及了接口的静态方法,函数式编程,泛型等知识,不会的需要先去补充这些。

    of生成一个叫Head的Stream,此Head就持有源数组,但是源数组并不直接存放在Head这个源码里面而在分割器(Spliterator)中。

    • 可迭代分割器
      Spliterator是可分割迭代器(splitable iterator),两功能分割和迭代。(以下简称分割器)
      分割器是在Stream高级功能中有用到,这里不用,知道有这个就可以了,就在HeadStream中直接持有原数组。

    另外用静态of方法生成对象是一条高效Java的规则,具体可以参考《Effective Java》。

    在了解Head之前需要先了解流处理的整体的全貌。流的整个操作分为3步,of生成流->map映射->collect收集。of、map都是生成流的,collect是收集成结果的。

    Stream1
    Stream2
    StreamN
    最终结果

    可以看到整个过程是链式的,说明Stream是链表,但它不是单向链表,而是双向的。双向的目的一是源数组只需要存一份Head里,故后面需要向前找Head,二是构造处理行为时那其实也是个链表,不过是个单向链表,需要用到。故Stream其实是这样的:

    Stream1
    Stream2
    StreamN

    Stream就分为两类HeadStream和MiddleStream。

    AbstractStream

    无论是HeadStream还是MiddleStream都是双向链表的节点,它们都有前一个节点、后一个节点的引用,故用AbstractStream来存有这些共同的信息。

    public abstract class AbstractStream<P_IN,P_OUT> {
        public AbstractStream<?,P_OUT> previousStream;
        public AbstractStream<P_OUT,?> nextStream;
        public AbstractStream<?,?> sourceNode;
        public Supplier<Object[]> supplier;
        public AbstractStream(AbstractStream previousStream) {
            if (previousStream == null) {
                this.previousStream=null;
            }else{
                this.previousStream = previousStream;
                previousStream.nextStream = this;
                this.sourceNode=previousStream.sourceNode;
            }
        }
        public AbstractStream(Supplier<Object[]> supplier) {
            this.supplier = supplier;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    HeadStream

    public class HeadStream<E_IN,E_OUT>
            extends AbstractStream<E_IN,E_OUT>
            implements Stream<E_IN> {//此处Stream是不正确的
        private E_IN[] data;
        public HeadStream(E_IN[] data) {
            super(()->data);
            this.data = data;
        }
        @Override
        public <R> Stream<R> map(Function<? super E_IN, ? extends R> function) {
            Stream<R> middleStream = new MiddleStream<E_OUT,R>(this,function);
            return middleStream;
        }
        @Override
        public <A, R> R collect(Collector<? super E_IN, A, R> collector) {
            A container = collector.supplier().get();
            BiConsumer<A, ? super E_IN> accumulator = collector.accumulator();
            for (int i = 0; i < data.length; i++) {
                accumulator.accept(container,data[i]);
            }
            return (R)container;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    静态方法of会生成一个HeadStream,然后调用Head的map方法生成下一个Stream,也就是MiddleStream,最后在某一个Stream中进行收集操作。Head的收集操作最简单,不需要处理直接收集返回就欧拉。

    MiddleStream

    public class MiddleStream<E_IN, E_OUT>
            extends AbstractStream<E_IN, E_OUT>
            implements Stream<E_OUT> {
        public Function<? super E_IN, ? extends E_OUT> function;
        public MiddleStream(AbstractStream<?, E_IN> previousStream,
                              Function<? super E_IN, ? extends E_OUT> function) {
            super(previousStream);
            this.function = function;
        }
        @Override
        public <R> Stream<R> map(Function<? super E_OUT, ? extends R> function) {
            Stream<R> stream = new MiddleStream<>(this, function);
            return stream;
        }
        @Override
        public <A, R> R collect(Collector<? super E_OUT, A, R> collector) {
            A container = collector.supplier().get();//错误的写法,只为演示
            BiConsumer<A, ? super E_OUT> accumulator = collector.accumulator();
            //第一个对function构造处理链
            //f1->f2->f3->...->fn
            Handle<?> chain1 = buildFunctionChain1();
            Object[] data = getDataSource();
            for (int i = 0; i < data.length; i++) {
                chain1.handle(data[i]);
            }
            return (R) container;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    MiddleStream有很多难点,首先一个中间流必然有一个Function,它的map方法必然生成下一个MiddleStream,然后子子孙孙无穷尽下去。难点在collect方法,它是一个终止操作,代表流运行到这个方法的时候要结束了,该返回结果了。其中涉及到另一个重要的内容——数据处理行为。

    行为

    无论是x的平方还是y加1都抽象成一个个的Function,整个处理的过程就是:

    Function1
    Function2
    FunctionN

    或许有人可能觉得应该这么做:从当前的MiddleStream开始,一步一步向前寻找直到Head节点前停下,然后一个Function一个Function的构造单向链表。可能的代码如下:
    节点代码:

    public class FunctionNode<IN,OUT>{
    	Function<? super IN, ? extends OUT> f;
    	FUnctionNode<OUT,?> next;
    	//...剩余代码省略
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    构造处理链条代码:

        private <T, R> FunctionChainNode<T, R> buildFunctionChain1() {
            AbstractStream<?, E_OUT> stream = this;
            FunctionChainNode<?, ?> node = new FunctionChainNode<>(this.function, null);
            while (stream.previousStream != null) {
                stream = stream.previousStream;
                if (stream instanceof MiddleStream) {
                    MiddleStream ms = (MiddleStream) stream;
                    FunctionChainNode<?, ?> chain = new FunctionChainNode<>(ms.function, node);
                    node = chain;
                }
            }
            return (FunctionChainNode<T, R>) node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    很遗憾这样的代码是没办法进行处理的,它有几处不适配。

    • FunctionNode的泛型不适配,希望构造出一个FunctionNode,而最终只能构造出FunctionNode,中间的MiddleStream里面的IN,OUT是什么完全不知
    • 理论上输入T,要构造后的Chain输入R,但是在当前MiddleStream中输入不是T而是前一个Stream的输出。最终的Chain而源数组则是Head中的E_IN,无法匹配

    这里需要进行一个调整,这里不能妄想用双泛型来做,T转换为R步骤复杂,不容易实现。需要用一个Consumer这样消费数据。
    这里声明一个接口来处理。

    public interface Handle<T> extends Consumer<T> {
    }
    
    • 1
    • 2

    有一个问题,为啥不用Function,Function这样不就欧拉吗?问题就在于这个R是不知道的,当前节点不知道下一个节点的R,故最后的R无法构造出来,只能用Consumer来消费。
    一个Function处理节点就有了:

    public class FunctionHandle<E_IN,E_OUT> implements Handle<E_IN>{
       private Function<? super E_IN,? extends E_OUT> f;
       private Handle<E_OUT> next;
    
       public FunctionHandle(Function<? super E_IN, ? extends E_OUT> f, 
       Handle<E_OUT> next) {
           this.f = f;
           this.next = next;
       }
       @Override
       public void accept(E_IN in) {
           E_OUT e_out = f.apply(in);
           next.accept(e_out);
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    有了处理Node后就需要构造处理链条。

        private <T> Handle<T> buildChain() {
           Handle<?> node = new FunctionHandle<>(this.function, null);
           MyAbstractStream<?, ?> stream = this;
           while (stream.previousStream != null) {
               stream = stream.previousStream;
               if (stream instanceof MyMiddleStream) {
                   MyMiddleStream ms = (MyMiddleStream) stream;
                   node = new FunctionHandle<>(ms.function, node);
               }
           }
           return (Handle<T>) node;
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    一个简单的构造处理行为链条的代码完成了。这里有点不好看,向前找找到Head前就可以了,这里写的比较麻烦,源码中用了一个depth深度来表示,对深度循环就可以了,比较简单。
    有了接下来该处理collect方法了。

        @Override
        public <A, R> R collect(Collector<? super E_OUT, A, R> collector) {
            Handle<T> chain = buildChain();
            Object[] data = getDataSource();
            for (int i = 0; i < data.length; i++) {
                chain.handle((T)data[i]);
            }
            return (R) container;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在这里遇到问题,处理完数据,这个R怎么收集成Set?因为是无返回值的,所以需要在处理结束后进行set.add操作,故在构造处理链条的时候需要多构造一个TerminalOpHandle节点。

    public class TerminalOpHandle<T,A,R> implements Handle<T> {
        Collector<T,A,R> collector;
        public TerminalOpHandle(Collector<T, A, R> collector) {
            this.collector = collector;
        }
        @Override
        public void accept(T t) {
            A a = collector.supplier().get();//这么写是错误的
            BiConsumer<A, T> accumulator = collector.accumulator();
            accumulator.accept(a,t);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    通过这个终止操作就完成了set.add操作。
    这样调整一下构造链条的处理过程:

        private <T,A,R> Handle<T> buildChain(Collector<E_OUT, A, R> collector) {
        	//多了一个尾节点
            Handle<E_OUT> terminalNode = new TerminalOpHandle<>(collector);
            Handle<? super E_IN> node = new FunctionHandle<>(this.function, terminalNode);
            AbstractStream<?, ?> stream = this;
            while (stream.previousStream != null) {
                stream = stream.previousStream;
                if (stream instanceof MiddleStream) {
                    MiddleStream ms = (MiddleStream) stream;
                    node = new FunctionHandle<>(ms.function, node);
                }
            }
            return (Handle<T>) node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    对于Stream添加一个depth的属性就可以优化以下上面的代码:

        private <T, A, R> Handle<T> buildChain(Handle<E_OUT> terminalNode) {
            Handle<? super E_IN> node = new FunctionHandle<>(this.function, terminalNode);
            MyAbstractStream<?, ?> stream = this;
            while (this.depth > 0) {
                stream = stream.previousStream;
                MyMiddleStream ms = (MyMiddleStream) stream;
                node = new FunctionHandle<>(ms.function, node);
            }
            return (Handle<T>) node;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    数据收集

    收集数据这一步其实在构造行为链条的最后节点已经说过了,不再赘述。
    数据已经处理进collector中,但是收集的时候咱们遗留了一个问题,就是A container的问题。

    A container=collector.supplier.get();
    
    • 1

    每次运行这行代码就会生成一个新对象,故需要对TerminalOpHandle改正一下。
    新添加一个抽象类Box用于持有这个Set对象。

    public abstract class Box<U> {
        U state;
    
        public Box() {}
    
        public U get() {
            return state;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    相应的TerminalOpHandle也需要改造:

    public class TerminalOpHandle<T,A,R> extends Box<A> implements Handle<T> {
        MyCollector<? super T,A,R> collector;
    
        public TerminalOpHandle(MyCollector<? super T, A, R> collector) {
            this.collector = collector;
        }
    
        public  void before(){
            state=collector.supplier().get();
        }
        
        @Override
        public void accept(T t) {
    //        A a = collector.supplier().get();
            BiConsumer<A, ? super T> accumulator = collector.accumulator();
    //        accumulator.accept(a,t);
            accumulator.accept(state,t);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这里添加了一个before方法用于在执行accept前去执行Set的获取任务。
    有人想到在accept中改造,伪代码:
    if(state!=null) before();
    这么做不是不可以,比较死板,if满天。
    对handle进行改造

    public interface Handle<T> extends Consumer<T> {
        void before();
        void end();
    }
    
    • 1
    • 2
    • 3
    • 4

    before、end前置、后置方法,perfect!
    相应的FunctionHandle实现方法:

    	//省略前面的代码
        @Override
        public void before() {
            next.before();
        }
    
        @Override
        public void end() {
            next.end();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    这样一步一步下去最后到TerminalOpHandle,执行一次before方法,妙啊!
    end方法不做什么处理。
    紧接着就是改造collect方法

        public <A, R> R collect(MyCollector<? super E_OUT, A, R> collector) {
         A container;
         //改1
    //        Handle handle = buildChain(collector);
         TerminalOpHandle<E_OUT,A,R> terminalNode = new TerminalOpHandle<>(collector);
         Handle<Object> handle = buildChain(terminalNode);
         Object[] data = getDataSource();
         handle.before();
         for (int i = 0; i < data.length; i++) {
             handle.accept(data[i]);
         }
         handle.end();
         //改2
         container=terminalNode.get();
         return (R) container;
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    collect的改造主要有以下几处:
    1.container不再从supplier.get获取,从TerminalOp节点获取
    2.构造链条时拿到TerminalOp节点
    3.数据处理变成before->foreach->end,然后再获取container
    对于before->foreach->end这段过程可以抽象出来。

        public void copyInto(Handle handle,Object[] source){
            handle.before();
    //        Object[] data = source;
    //        for (int i = 0; i < data.length; i++) {
    //            handle.accept(data[i]);
    //        }
            forEach(source,handle);
            handle.end();
        }
        public void forEach(Object[] source,Consumer<Object> consumer){
            Object[] data = source;
            for (int i = 0; i < data.length; i++) {
                consumer.accept(data[i]);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    而这段代码一段在spliterator中执行,一段在sink中执行,不再赘述。
    至此,运行下面的测试代码:

            Collector<Integer, ?, Set<Integer>> collector = Collectors.toSet();
            Set<Integer> collect = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
                    .map(f1)
                    .map(f2)
                    .collect(collector);
            System.out.println("最终结果"+collect);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    可以得到下面的结果:

    最终结果[17, 65, 2, 50, 82, 5, 37, 10, 26]
    
    • 1

    head和middle的map、collect可以合到父类中去,不再演示。
    具体的代码,请参考:代码样例

    最后补充

    1.注意Head中实现Stream的泛型应该时E_OUT而不是E_IN,E_IN在Head中处理后变成E_OUT故E_OUT才是Stream的泛型。
    2.泛型,这里面泛型是比较多的,该实现哪个泛型,该super还是该extends都是需要认真考虑的。

  • 相关阅读:
    TS和JS的区别
    AI工程化—— 如何让AI在企业多快好省的落地?
    三维模型3DTile格式轻量化压缩的遇到常见问题与处理方法分析
    c++--ubuntu-libevent1-概述与安装-IO事件+信号事件
    列表元素
    react项目实战 5 嵌套路由实现TabBar
    Transformer模型 | iTransformer时序预测
    logrotate日志轮转
    【Java 基础篇】Java网络编程基础知识详解
    jQuery
  • 原文地址:https://blog.csdn.net/weixin_43671840/article/details/126366241