码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • 【Java8 Stream】:探秘Stream实现的核心:Collector,模拟Stream的实现


    目录

      • 前言
      • Collector的基础知识
      • Collector源码
      • 一个简单的Collector实现类
      • 模拟Stream,使用Collector实现一个简单的年龄计算



    Stream的用法可以参考下文:

    • Java8 Stream使用方法:筛选、排序、最大值、最小值、计数求和平均数、分组、合并、映射、去重等
    • Collectors.groupingBy的四种用法 解决分组统计(计数、求和、平均数等)、范围统计、分组合并、分组结果自定义映射等问题




    前言

    本篇还处于待完善阶段,目前仅仅是使用了自己的方法来实现Stream对流的处理。因此暂时先写一篇文章做记录。




    Collector的基础知识

    Collector范型的含义:

    • :规约操作(reduction operation)的输入元素类型
    • :是规约操作的输出结果类型,该类型是可变可累计的,可以是各种集合容器,或者具有累计操作(如add)的自定义对象。
    • :规约操作结果经过转换操作后返回的最终结果类型

    Collector中方法定义,下面的方法的返回值都可以看作函数(function):

    • Supplier supplier():该函数创建并返回新容器对象。
    • BiConsumer accumulator():该函数将把元素值放入容器对象,并返回容器。
    • BinaryOperator combiner():该函数会把两个容器(此时每个容器都是处理流元素的部分结果)合并,该函数可以返回这两个容器中的一个,也可以返回一个新的容器。
    • Function finisher():该函数将执行最终的转换,它会将combiner的最终合并结果A转变为R。
    • Set characteristics():提供集合列表,该列表将提供当前Collector的一些特征值。这些特征将会影响上述函数的表现。

    上述函数的语法:

    • Supplier#T get():调用一个无参方法,返回一个结果。一般来说是构造方法的方法引用。
    • BiConsumer#void accept(T t, U u):根据给定的两个参数,执行相应的操作。
    • BinaryOperator extends BiFunction#T apply(T t, T u):合并t和u,返回其中之一,或创建一个新对象放回。
    • Function#R apply(T t):处理给定的参数,并返回一个新的值。




    Collector源码

    public interface Collector<T, A, R> {
    
        Supplier<A> supplier();
        
        BiConsumer<A, T> accumulator();
    
        BinaryOperator<A> combiner();
    
        Function<A, R> finisher();
    
        Set<Characteristics> characteristics();
       }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    // Collector#Characteristics
    enum Characteristics {
    	// 表明Collector是否用于并发
        CONCURRENT,
        // 表明Collector是否会保留原容器的顺序
        UNORDERED,
        // 表明accumulator函数结果类型是否等于finisher函数,默认为空,当设置该特征时,那么finisher函数将执行A到R的未经检查的强制转换。
        IDENTITY_FINISH
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9




    一个简单的Collector实现类

    Collector的实现类很简单,它将用于存储用户输出的各项函数。

    public class SimpleCollector<T, A, R> implements Collector<T, A, R> {
    
        private final Supplier<A> supplier;
    
        private final BiConsumer<A, T> accumulator;
    
        private final BinaryOperator<A> combiner;
    
        private final Function<A, R> finisher;
    
        private final Set<Characteristics> characteristics;
    
        public SimpleCollector(Supplier<A> supplier,
                               BiConsumer<A, T> accumulator,
                               BinaryOperator<A> combiner,
                               Function<A, R> finisher,
                               Set<Characteristics> characteristics) {
            this.supplier = supplier;
            this.accumulator = accumulator;
            this.combiner = combiner;
            this.finisher = finisher;
            this.characteristics = characteristics;
        }
    
        @Override
        public Supplier<A> supplier() {
            return supplier;
        }
    
        @Override
        public BiConsumer<A, T> accumulator() {
            return accumulator;
        }
    
        @Override
        public BinaryOperator<A> combiner() {
            return combiner;
        }
    
        @Override
        public Function<A, R> finisher() {
            return finisher;
        }
    
        @Override
        public Set<Characteristics> characteristics() {
            return characteristics;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49




    模拟Stream,使用Collector实现一个简单的年龄计算

    基于对Collector原理的粗浅了解和StreamBuilderImpl、ReferencePipeline、ForEachTask等源码的解析,模仿Stream的思路自己写了一个使用Collector的流程,方便理解。不过由于ReferencePipeline源码较为复杂,对一些实现的理解还不够深刻,有错难免。


    1 执行入口,根据特征判断是否使用多线程,并对每个线程的结果进行合并,最后将合并的结果转为最终返回值

    public static <T, R, A> A execute(ExecutorService threadPool, Collection<T> data, Collector<T, R, A> collector) throws ExecutionException, InterruptedException {
        Objects.requireNonNull(threadPool, "threadPool");
        Objects.requireNonNull(data, "data");
        Objects.requireNonNull(collector, "collector");
        // 查询特征,判断是否要进行分段处理
        Set<Collector.Characteristics> characteristics = collector.characteristics();
        int segment = 1;
        if (characteristics.contains(Collector.Characteristics.CONCURRENT)) {
            segment = data.size() / Runtime.getRuntime().availableProcessors() + 1;
        }
        // 集合分段用于多线程,以便不会对同一数据多次计算
        Collection<Collection<T>> segmentList = ListUtil.segmentList(data, segment);
        List<CompletableFuture<R>> completableFutureList = new ArrayList<>(segmentList.size());
        for (Collection<T> collection : segmentList) {
            // 并发情况下就不能保证累积函数执行的顺序,也就无法保证最终结果的顺序性(源码中分别使用了ForEachOrderedTask | ForEachTask)
            CompletableFuture<R> async = CompletableFuture.supplyAsync(() -> {
                return CollectorUsageDemo.dealWithElement(collection, collector);
            });
            completableFutureList.add(async);
        }
        CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0]));
        CompletableFuture<R> result = allOf.thenApply(v -> {
            // 初始化容器 起初初始容器也将作为函数计算的一部分, 这里将容器合并,并返回新的容器
            R r = collector.supplier().get();
            for (CompletableFuture<R> f1 : completableFutureList) {
                R r2 = f1.join();
                r = collector.combiner().apply(r, r2);
            }
            return r;
        });
        // 合并容器后的最终结果
        R last = result.get();
        if (characteristics.contains(Collector.Characteristics.IDENTITY_FINISH)) {
            return (A) last;
        }
        // 将R转为最终的结果类型A
        return collector.finisher().apply(last);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    2 执行容器对每个元素的处理

    public static <T, R, A> R dealWithElement(Collection<T> data, Collector<T, R, A> collector) {
        // 初始化一个容器
        R container = collector.supplier().get();
        // 遍历data集合,将每个元素通过accumulator函数进行规约
        for (T t : data) {
            collector.accumulator().accept(container, t);
        }
        return container;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3 测试

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<Student> student = Student.getStudent();
        // 比如我们想实现一个类似Collectors.joining()的功能
        Set<Collector.Characteristics> characteristics = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT,
                Collector.Characteristics.UNORDERED));
        Collector<Student, AtomicInteger, Integer> collector = new SimpleCollector<>(AtomicInteger::new, (AtomicInteger i, Student s) -> i.addAndGet(s.getAge()),
                (i, i1) -> {
                    i.addAndGet(i1.get());
                    return i;
                }, AtomicInteger::get, characteristics);
        Integer execute = execute(executorService, student, collector);
        System.out.println(execute);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    输出结果:

    121

    源码:CollectorUsageDemo

  • 相关阅读:
    [ICS] Inferno(地狱) ETH/IP未授权访问,远程控制工控设备利用工具
    AOP(JDK动态代理实现)
    Redis与jedis的区别
    猿创征文|MYSQL主从复制
    Redis--List、Set、Zset、Hash、Bitmaps、HyperLogLog、Geospatial
    关于隔离电源断电瞬间MOSFET损坏问题分析
    【Hack The Box】windows练习-- Conceal
    模拟算法刷题笔记【蓝桥杯】
    STM32MP157F-DK2 使用体验
    Mellanox IB卡驱动的安装和小记录
  • 原文地址:https://blog.csdn.net/HO1_K/article/details/127615662
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号