• JDK1.8 新特性(二)【Stream 流】


    前言

            上节我们学了 lambda 表达式,很快我就在 Flink 的学习中用到了,我学的是 Java 版本的 Flink,一开始会以为代码会很复杂,但事实上 Flink 中很多地方都用到了 函数接口,这也让我们在编写 Flink 程序的时候可以使用 lambda 表达式非常地简洁地实现匿名函数。

            今天再来学习一个新的特性,Stream 流,光是看名字就觉得和大数据能扯上关系,我们的 Spark、Flink 当中不就都是这种流的概念嘛。

    1、什么是 Strem 流

            Stream 是 JDK1.8 中处理集合的关键抽象概念, Lambda 表达式 和 Stream 是JDK1.8 新增的函数式编程中最有亮点的特性了,它可以指定你希望对集合进行操作,可以执行非常复杂的查询过滤和映射等操作。使用 Stream API 对集合数据进行操作,就类似于使用 SQL 来执行对 Java 集合运算和表达的高阶抽象。

            Stream API 可以极大地提高 Java 程序员的生产力,让程序员写出更加高效、干净、简洁的代码。那对我在大数据开发中更是如此。

            这种风格将要处理的元素集合看做一种流,流在管道中传输,并且可以在管道的节点上进行处理,比如过滤、排序、聚合等。

    2、Stream 创建方式

    1、创建串行 Stream

    Stream userStream = list.stream();

    2、创建并行 Stream

    Stream userStream = list.parallelStream();

    3、关闭

    在Java中,Stream只能被操作一次,一旦你对其进行了一次操作(比如forEach, collect等),它就会被关闭,再次操作就会报错:stream has already been operated upon or closed。

    3、Stream 将 List 转换为 Set

    1、创建 List 集合

    Stream 是通过集合创建出来的,所以我们先创建一个集合,而集合内我们需要存放实体,所以先创建一个实体类 User:

    1. public class User {
    2. public String name;
    3. public int age;
    4. public User(){}
    5. public User(String name, int age) {
    6. this.name = name;
    7. this.age = age;
    8. }
    9. @Override
    10. public String toString() {
    11. return "User{" +
    12. "name='" + name + '\'' +
    13. ", age=" + age +
    14. '}';
    15. }
    16. @Override
    17. public boolean equals(Object o) {
    18. if (this == o) return true;
    19. if (o == null || getClass() != o.getClass()) return false;
    20. User user = (User) o;
    21. return age == user.age && Objects.equals(name, user.name);
    22. }
    23. @Override
    24. public int hashCode() {
    25. return Objects.hash(name, age);
    26. }
    27. public String getName() {
    28. return name;
    29. }
    30. public void setName(String name) {
    31. this.name = name;
    32. }
    33. public int getAge() {
    34. return age;
    35. }
    36. public void setAge(int age) {
    37. this.age = age;
    38. }
    39. }

    创建集合

    1. List list = new ArrayList<>();
    2. list.add(new User("燕双鹰",28));
    3. list.add(new User("李大喜",20));
    4. list.add(new User("李元芳", 30));
    5. list.add(new User("李元芳", 30));

    重写 equals

    注意:这里我们对实体类的 equals 和 hashcode 方法进行了重写,这在之前我是不会去重写的。重写和不重写的区别就是:
            重写后,当两个实体对象的属性相同时,equals 方法返回 true,如果没有重写,则 equals 返回 false。

    == 和 equals

            == 用于比较基本数据类型的值是否相等或者对象的引用地址是否相同。

    1. int a = 10;
    2. int b = 10;
    3. System.out.println(a==b); //true
    4. String str1 = "hello";
    5. String str2 = "hello";
    6. System.out.println(str1==str2); //false

    equals 用于比较两个对象的内容是否相等。在Object类中,默认的“equals()”实现使用“==”操作符比较对象的引用。但是,许多类(如String、Integer等)重写了“equals()”方法,以便根据类的特定属性比较对象的内容。

    set 去重底层原理

    set 去重底层依赖于 map 集合实现放重复的 key,map 集合底层基于 equals ,它先比较 key 的hashcode 是否相同,相同情况下再调用 equals 方法判断是否真的相等。

    所以一个实体类是否重写 equals 方法区别很大。

    1. User u1 = new User("s",1);
    2. User u2 = new User("s",1);
    3. System.out.println(u1.equals(u2));

    上面的代码,如果我们以 User 对象作为 key,如果我们的 User 没有重写 equals 方法,那么返回的就是 false,因为默认使用 == ,引用地址不同;如果重写了 equals 方法,那么返回的就是 true,因为使用重写后的 equals ,两个对象属性相同返回 true。

    注意:对象的比较不会去比较 hashcode。 

    1. HashMap map = new HashMap<>();
    2. map.put(u1,"a");
    3. map.put(u2,"b");
    4. System.out.println(map.get(u1).equals(map.get(u2)));

    上面的代码,如果我们没有重写 hashcode 的情况下,那么返回的就是 true,因为 map 的底层是通过 hashcode 来比较两个 key 是否相同;如果重写了 hashcode ,那么返回的就是 true。

    2、List 转为 Set

    1. public static void main(String[] args) {
    2. List list = new ArrayList<>();
    3. list.add(new User("燕双鹰",28));
    4. list.add(new User("李大喜",20));
    5. // 下面是两个属性相同的两个对象(我们已经重写了 equals 和 hashcode 方法)
    6. list.add(new User("李元芳", 30));
    7. list.add(new User("李元芳", 30));
    8. // todo 创建 Stream 的两种方式
    9. // 1. 串行流 stream() 单线程
    10. Stream stream = list.stream();
    11. Set set = stream.collect(Collectors.toSet());
    12. set.forEach(user->{
    13. System.out.println(user.toString());
    14. });
    15. }

    运行结果:

    1. User{name='李元芳', age=30}
    2. User{name='燕双鹰', age=28}
    3. User{name='李大喜', age=20}

    可以看到,重写 equals 和 hashcode 方法后,虽然相同属性的两个对象的内存地址不同,但也被去除重复了。

    4、Stream 将 List 转为 Map

    1、创建 List

    注意:List 转为 Map 的时候,由于 Map 集合不允许存在重复的 key,所以我们必须保证 list 集合中作为 key 字段的属性值唯一。

    1. List list = new ArrayList<>();
    2. list.add(new User("燕双鹰",28));
    3. list.add(new User("李大喜",20));
    4. list.add(new User("李元芳", 30));

     2、List 转为 Map

    1. Stream stream = list.stream();
    2. // list 集合是没有 key 的,所以不能直接转为 map 集合,需要指定 key(指定对象的某个字段作为key)
    3. Map collect = stream.collect(Collectors.toMap(new Function() { // 第一个参数 list中的类型,第二个参数是key类型: String
    4. @Override
    5. public String apply(User user) {
    6. return user.getName();
    7. }
    8. }, new Function() { // 第一个参数 list中的类型,第二个参数是value类型: User
    9. @Override
    10. public User apply(User user) {
    11. return user;
    12. }
    13. }));
    14. collect.forEach(new BiConsumer() {
    15. @Override
    16. public void accept(String key, User user) {
    17. System.out.println(key+","+user.toString());
    18. }
    19. });

    使用 lambda 表达式简化一下代码:

    1. // 用lambda表达式
    2. Map collect = stream.collect(Collectors.toMap(User::getName, user -> user));
    3. collect.forEach((key,user)-> System.out.println(key+","+user.toString()));

    运行结果:

    1. 李元芳,User{name='李元芳', age=30}
    2. 李大喜,User{name='李大喜', age=20}
    3. 燕双鹰,User{name='燕双鹰', age=28}

    5、Strem 通过 reduce 方法求和

    1、简单求和

    这里我们通过 Stream.of() 方法来进行数据的构造(这让我想到了最近 Flink)。

    1. Stream stream = Stream.of(10, 50, 30, 10);
    2. Optional res = stream.reduce(new BinaryOperator() {
    3. @Override
    4. public Integer apply(Integer integer, Integer integer2) {
    5. return integer + integer2;
    6. }
    7. });

    使用 lamda 表达式 

     Optional res = stream.reduce(Integer::sum);

    关于结果的打印,我们后面讲到 Optional 类的时候再详细说,一般直接:

    System.out.println(res.get());

    2、对象属性和

    我们构造一个 List 集合,然后转为 Stream 调用 reduce 方法进行求和。

    注意:reduce 方法的返回结果类型必须和 Stream 的类型一致(就像我们 Hadoop 中的 WordCount)。

    1. List list = new ArrayList<>();
    2. list.add(new User("燕双鹰",28));
    3. list.add(new User("李大喜",20));
    4. list.add(new User("李元芳", 30));
    5. Stream stream = list.stream();
    6. Optional sum = stream.reduce(new BinaryOperator() {
    7. @Override
    8. public User apply(User user1, User user2) {
    9. return new User("sum",user1.getAge()+ user2.getAge());
    10. }
    11. });
    12. System.out.println(sum.get());

    lambda 表达式简化:

    1. Stream stream = list.stream();
    2. Optional sum = stream.reduce(((user1, user2) -> new User("sum", user1.getAge() + user2.getAge())));
    3. System.out.println(sum.get());//78

    6、Strem 查找集合最大值和最小值

    1、创建集合

    1. List list = new ArrayList<>();
    2. list.add(new User("燕双鹰",28));
    3. list.add(new User("李大喜",20));
    4. list.add(new User("李元芳", 30));

    2、查找最大 age 属性对象

    1. Optional max = stream.max(new Comparator() {
    2. @Override
    3. public int compare(User o1, User o2) {
    4. return o1.getAge() - o2.getAge();
    5. }
    6. });
    7. System.out.println(max.get());

    lambda表达式简化:

    1. Optional max = stream.max((user1, user2) -> user1.getAge() - user2.getAge());
    2. System.out.println(max.get()); //30

    3、查找最小 age 属性对象

    1. Optional min = stream.min((user1, user2) -> user1.getAge() - user2.getAge());
    2. System.out.println(min.get()); //20

    7、Stream 中 Match 用法

    anyMatch 表示,任意一个元素满足条件返回 true。

    allMatch 表示,所有元素满足条件才会返回 true。

    noMatch 表示,所有条件都不满足这个条件才会返回 true。

    1、创建集合

    1. List list = new ArrayList<>();
    2. list.add(new User("燕双鹰",28));
    3. list.add(new User("李大喜",20));
    4. list.add(new User("李元芳", 30));
    5. Stream stream = list.stream();

    2、anyMatch

    判断集合中是否存在 age 属性大于 25 的对象。

    1. boolean res = stream.anyMatch(new Predicate() {
    2. @Override
    3. public boolean test(User user) {
    4. return user.getAge() > 25;
    5. }
    6. });
    7. System.out.println(res);

     lambda 表达式:

    1. boolean res = stream.anyMatch(user -> user.getAge() > 25);
    2. System.out.println(res); //true

    3、allMatch

    判断是否所有对象的 age属性都大于 30

    1. boolean res = stream.allMatch(user -> user.getAge() > 30);
    2. System.out.println(res); //false

    4、noMatch

    判断是否用户都不满足 name 为 “光头强 ”

    1. boolean res = stream.noneMatch(user -> user.getName().equals("光头强"));
    2. System.out.println(res); //true

    8、Stream 过滤器

    和我们 Flink 的 DataStream API 中的转换算子 filter 很像,它们都是把 判断条件结果为 true 的数据留下,false 则丢掉。

    1、创建集合

    1. List list = new ArrayList<>();
    2. list.add(new User("燕双鹰",28));
    3. list.add(new User("李大喜",20));
    4. list.add(new User("李元芳", 30));
    5. Stream stream = list.stream();

    2、过滤

    留下 年龄>25 岁的 User 对象。
    1. Stream filterStream = stream.filter(new Predicate() {
    2. @Override
    3. public boolean test(User user) { //为 true 则留下
    4. return user.getAge()>25;
    5. }
    6. });
    7. filterStream.forEach(new Consumer() {
    8. @Override
    9. public void accept(User user) {
    10. System.out.println(user);
    11. }
    12. });

    运行结果: 

    1. User{name='燕双鹰', age=28}
    2. User{name='李元芳', age=30}

    lambda表达式:

    1. Stream filterStream = stream.filter(user -> user.getAge() > 25);
    2. filterStream.forEach(System.out::println);

    9、Stream Limit 和 Skip

    同样,Stream 需要通过集合来创建。

    1. List list = new ArrayList<>();
    2. list.add(new User("燕双鹰",28));
    3. list.add(new User("李大喜",20));
    4. list.add(new User("李元芳", 30));
    5. list.add(new User("熊大",15));
    6. list.add(new User("熊二",14));
    7. list.add(new User("光头强",20));
    8. Stream stream = list.stream();

    1、取出前2条数据

    1. // 在mysql中limit(start,end)需要传两个参数,但在这里只允许传入一个long类型的 maxSize
    2. // 取前2条数据
    3. stream.limit(2).forEach(System.out::println);

    运行结果:

    1. User{name='燕双鹰', age=28}
    2. User{name='李大喜', age=20}

     

    2、取出第 [3,6) 条数据

    注意,这里的索引是从 0 开始的。

    1. // 取 [3,6)条数据 想要分页从先 skip 再 limit
    2. stream.skip(2).limit(3).forEach(System.out::println);

    运行结果:
     

    1. User{name='李元芳', age=30}
    2. User{name='熊大', age=15}
    3. User{name='熊二', age=14}

    10、Stream 排序 sorted

    下面用到的数据。

    1. List list = new ArrayList<>();
    2. list.add(new User("燕双鹰",28));
    3. list.add(new User("李大喜",20));
    4. list.add(new User("李元芳", 30));
    5. list.add(new User("熊大",15));
    6. list.add(new User("熊二",14));
    7. list.add(new User("光头强",20));
    8. Stream stream = list.stream();

    1、直接排序

    对于数值型的数据可以直接进行排序

    1. Stream integerStream = Stream.of(1, 5, 8, 3, 7);
    2. integerStream.sorted().forEach(System.out::println); //1 3 5 7 8

    2、根据对象字段进行升序

    1. stream.sorted(new Comparator() {
    2. @Override
    3. public int compare(User o1, User o2) {
    4. return o1.getAge()-o2.getAge();
    5. }
    6. }).forEach(System.out::println);

    lambda 表达式:

    stream.sorted((o1, o2) -> o1.getAge()-o2.getAge()).forEach(System.out::println);

    运行结果: 

    1. User{name='熊二', age=14}
    2. User{name='熊大', age=15}
    3. User{name='李大喜', age=20}
    4. User{name='光头强', age=20}
    5. User{name='燕双鹰', age=28}
    6. User{name='李元芳', age=30}

    JDK1.8 提供的函数接口

    都在包 java.util.function 包下。

    并行流

    案例1 - 500亿次求和

    1、使用单线程

    1. Instant start = Instant.now();
    2. long sum = 0;
    3. for (long i = 0; i <= 50000000000L; i++) {
    4. sum+=i;
    5. }
    6. Instant end = Instant.now();
    7. System.out.println(sum);
    8. System.out.println("500亿次求和花费时间: "+ Duration.between(start,end).toMillis()+"ms"); // 单线程 11s左右 多线程 6s左右

    2、使用并行流

    1. Instant start = Instant.now();
    2. LongStream longStream = LongStream.rangeClosed(0,50000000000L);
    3. OptionalLong result = longStream.parallel().reduce(new LongBinaryOperator() {
    4. @Override
    5. public long applyAsLong(long left, long right) {
    6. return left + right;
    7. }
    8. });
    9. Instant end = Instant.now();
    10. System.out.println(result.getAsLong());
    11. System.out.println("500亿次求和花费时间: "+ Duration.between(start,end).toMillis()+"ms"); // 单线程 11s左右 多线程 6s左右

    可以发现,多线程明显要快很多。

    总结

            本次学习收获非常大,函数接口的思想在 Flink 中随处可见,的确,这样一种能够使得代码简洁高效的技术在大数据开发中是非常重要的。

  • 相关阅读:
    集成学习方法(随机森林和AdaBoost)
    【Linux网络】1分钟使用shell脚本完成DNS主从解析服务器部署(适用于centos主机)
    ubuntu系统安装与环境配置
    爬虫: response.text 爬取链接下的三国演义小说第一回,并将结果保存到桌面《三国演义》.txt文件中。
    性能测试-基础理论知识
    MySQL高可用方案之MHA
    第20章 Netty
    软件测试/测试开发丨结对编程助手 GitHubCopilot
    【Python】代码风格约定_摘自开源项目FreeCAD
    基于JavaWEB SpringBoot婚纱影楼摄影预约网站设计和实现
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/134460082