写在前面:这篇内容是一篇学习笔记,知识点和代码主要摘自《Java高并发编程详解:深入理解并发核心库》一书,主要用于学习交流。
Java 8中的Stream是支持顺序或者并行操作的元素序列,它不是一个容器。它并不是用来存储数据的,而是对JDK中Collections的一个增强,它专注于对集合对象既便利又高效的聚合操作。它不仅支持串行的操作功能,而且还借助于JDK 1.7中的Fork-Join机制支持并行模式,开发者无须编写任何一行并行相关的代码,就能高效方便地写出高并发的程序。
在JDK 1.8版本中,Stream为容器的使用提供了新的方式,它允许我们通过陈述式的编码风格对容器中的数据进行分组、过滤、计算、排序、聚合、循环等操作。
JDK1.8提供了很多种创建Stream的方法,这里通过示例,我们一起来学习一下:
静态方法of()创建Stream,通过接收可变长数组的方式获取了一个T类型的Stream。语法如下:
<T> Stream<T> of(T...values)
示例:
//静态方法of()创建Stream,输出1,2,3,4
System.out.println("++++++++++++++++++++静态方法of()创建Stream+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
stream1.forEach(i -> System.out.println(i));
这应该是用的比较多的方式,现有了集合或数组,然后我们再创建stream流,用于处理数据。
示例:
System.out.println("++++++++++++++++++++过集合容器或数组创建Stream+++++++++++++++++++++");
//通过数组创建
//Stream<Integer> stream7 = Arrays.stream(new Integer[]{1,2,3});
//通过集合创建
Collection<Integer> list = Arrays.asList(new Integer[]{1,2,3});
Stream<Integer> stream7 = list.stream();
stream7.forEach(i -> System.out.println(i));
//通过Stream.Builder来创建Stream,其中Builder继承自函数式接口Consumer<T>
System.out.println("++++++++++++++++++++通过Stream.Builder来创建Stream+++++++++++++++++++++");
Stream<Integer> stream2 = Stream.<Integer>builder()
.add(1)
.add(2)
.add(3)
.add(4).build();
stream2.forEach(i -> System.out.println(i));
创建空Stream,基本类型还可以使用IntStream、LongStream、DoubleStream直接创建
System.out.println("++++++++++++++++++++创建空Stream+++++++++++++++++++++");
Stream<Integer> stream3 = Stream.<Integer>empty();
//IntStream stream3 = IntStream.empty();
//LongStream stream3 = LongStream.empty();
//DoubleStream stream3 = DoubleStream.empty();
stream3.forEach(i -> System.out.println(i));
通过generate()方法创建无限元素的Stream,参数是一个Supplier函数式接口实现对象,基本类型还可以使用IntStream、LongStream、DoubleStream直接创建。
System.out.println("++++++++++++++++++++通过generate()方法创建无限元素的Stream+++++++++++++++++++++");
//根据ThreadLocalRandom.current().nextInt(10)不断(无限)产生数据
Stream<Integer> stream4 = Stream.generate(()-> ThreadLocalRandom.current().nextInt(10));
stream4.limit(10).forEach(i -> System.out.println(i));
通过iterate()方法创建无限元素的Stream,第一个参数是初始元素,第二个参数是UnaryOperator函数接口实现对象,前一个元素会作为后一个元素的输入值。数字类型的Stream还可以使用IntStream、LongStream、DoubleStream直接创建。
System.out.println("++++++++++++++++++++通过iterate()方法创建无限元素的Stream+++++++++++++++++++++");
Stream<Integer> stream5 = Stream.iterate(100,seed -> seed+1);
stream5.limit(10).forEach(i -> System.out.println(i));
静态方法range()创建有限元素,适用对象:IntStream、LongStream、DoubleStream等。注意:range()方法用于创建半开半闭区间,rangeClosed()方法用于创建闭区间。
ystem.out.println("++++++++++++++++++++静态方法range()创建有限元素+++++++++++++++++++++");
//IntStream stream6 = IntStream.range(1,5);//输出[1,5)值,半开半闭区间
IntStream stream6 = IntStream.rangeClosed(1,5);//输出[1,5]值,闭区间
stream6.forEach(i -> System.out.println(i));
其实Map并未提供创建Stream的方法,但是我们可以通过entry set的方式间接创建一个类型为Entry键值对的元素序列,提供对Map的Stream支持。
System.out.println("++++++++++++++++++++通过Map创建Stream+++++++++++++++++++++");
Map<String, String> map = new HashMap<>();
map.put("key1","val1");
map.put("key2","val2");
//Stream<String> stream8 = map.keySet().stream();
Stream<String> stream8 = map.values().stream();
stream8.forEach(i -> System.out.println(i));
通过Files创建Stream,java.io和java.nio.file包支持通过Streams对I/O进行操作。
System.out.println("++++++++++++++++++++通过Files创建Stream+++++++++++++++++++++");
try {
Stream<String> stream9 = Files.lines(Paths.get("D:\\test\\test.txt"), Charset.forName("UTF-8"));
stream9.forEach(i -> System.out.println(i));
} catch (IOException e) {
e.printStackTrace();
}
其他创建方式,一些第三方框架或者平台都提供了对Stream操作的支持,比如Spark、Flink、Storm的Trident、JOOQ等,后续用到再记录。
在Stream中,操作可以分为两类:Intermediate操作和Terminal操作。
其中,Intermediate操作,比如filter、sorted、map之类,这类操作的结果都是一个全新的Stream类型。多个Intermediate操作构成了一个流水线(pipeline)。Intermediate操作的执行都是通过lazy的方式,直到遇到最后的Terminal操作时,才开始执行。注意:一旦某个Stream执行了Intermediate操作,或者已经被关闭(执行了Terminal操作),将无法再次被操作。
Stream的Terminal操作会终结Stream的流水线(pipeline)的继续执行,最终返回一个非Stream类型的结果(foreach操作可以理解为返回的是void类型的结果)。因此在一个Stream的流水线中执行了Terminal方法之后,Stream将被关闭。
下面,我们先学习一下Intermediate操作都有哪些,是如何使用的。
distinct去重操作,注意:去重操作是基于equals()和hashcode()方法实现的。
/**
* distinct去重操作,注意:去重操作是基于equals()和hashcode()方法实现的
*/
public static void distinct(){
System.out.println("++++++++++++++++++++distinct去重操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4,1,2);
stream1.distinct().forEach(i -> System.out.println(i));
}
filter过滤操作,使用实现了Predicate函数式接口作为入参。
/**
* filter过滤操作,使用实现了Predicate函数式接口作为入参
*/
public static void filter(){
System.out.println("++++++++++++++++++++filter操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
//返回大于2的元素
stream1.filter(e -> e>2).forEach(i -> System.out.println(i));
}
limit截断操作,类似SQL的limit关键字。注意:如果要截取的size大于实际数量,则不会起到截取效果,仍返回一个全新的Stream。
/**
* limit截断操作,类似SQL的limit关键字。注意:如果要截取的size大于实际数量,则不会起到截取效果,仍返回一个全新的Stream
*/
public static void limit(){
System.out.println("++++++++++++++++++++limit操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
stream1.limit(3).forEach(i -> System.out.println(i));
}
map映射操作,可以借助map操作对元素进行增强运算、投影运算,甚至类型转换等操作。
/**
* map映射操作,可以借助map操作对元素进行增强运算、投影运算,甚至类型转换等操作。
*/
public static void map(){
System.out.println("++++++++++++++++++++map操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
stream1.map(i -> i*2).forEach(i -> System.out.println(i));
}
skip丢弃前n个元素,skip操作与limit类似,但是其作用却是相反的,skip操作会跳过(丢弃)n(指定数量)个元素,并且返回一个全新的Stream。注意:如果n大于当前Stream元素的个数,那么该操作就相当于是对Stream元素执行了一次清空操作。
/**
* skip丢弃前n个元素,skip操作与limit类似,但是其作用却是相反的,skip操作会跳过(丢弃)n(指定数量)个元素,并且返回一个全新的Stream。
* 注意:如果n大于当前Stream元素的个数,那么该操作就相当于是对Stream元素执行了一次清空操作。
*/
public static void skip(){
System.out.println("++++++++++++++++++++skip操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
stream1.skip(3).forEach(i -> System.out.println(i));
}
peek操作,对当前Stream中所有元素执行consume操作,并返回一个和原来Stream一样的全新Stream,好像一个debug操作。
/**
* peek操作,对当前Stream中所有元素执行consume操作,并返回一个和原来Stream一样的全新Stream,好像一个debug操作。
*/
public static void peek(){
System.out.println("++++++++++++++++++++peek操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
stream1.peek(System.out::println).forEach(i -> System.out.println(i));
}
sorted排序操作,会返回一个经过自然排序的全新Stream。注意:我们无法针对一个非Comparable子类进行排序,如果对一个非Comparable子类进行排序则会引起错误。
/**
* sorted排序操作,会返回一个经过自然排序的全新Stream,
* 注意:我们无法针对一个非Comparable子类进行排序,如果对一个非Comparable子类进行排序则会引起错误。
*/
public static void sorted(){
System.out.println("++++++++++++++++++++sorted操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(3,4,1,2,6,5);
stream1.sorted().forEach(i -> System.out.println(i));
}
flatMap操作,用于处理Stream数据元素类型是Stream的序列,实现数据的扁平化,即它会将类型Stream的Stream<Stream>扁平化为Stream,并且产生一个全新的Stream。
/**
* flatMap操作,用于处理Stream数据元素类型是Stream的序列,实现数据的扁平化,即它会将类型Stream<R>的Stream<Stream<R>>扁平化为Stream<R>,并且产生一个全新的Stream。
*/
public static void flatMap(){
System.out.println("++++++++++++++++++++flatMap操作--不使用flatMap时+++++++++++++++++++++");
Stream<Stream<Integer>> stream1 = Stream.of(1,2,3,4).map(i -> Stream.of(i,i*2,i*3));
stream1.forEach(i -> i.forEach(k -> System.out.println(k)));
System.out.println("++++++++++++++++++++flatMap操作--使用flatMap时+++++++++++++++++++++");
//flatMap方法,扁平化了元素序列
Stream<Integer> stream2 = Stream.of(1,2,3,4).flatMap(i -> Stream.of(i,i*2,i*3));
stream2.forEach(i -> System.out.println(i));
}
Stream的Terminal操作会终结Stream的流水线(pipeline)的继续执行,最终返回一个非Stream类型的结果(foreach操作可以理解为返回的是void类型的结果)。因此在一个Stream的流水线中执行了Terminal方法之后,Stream将被关闭。
match操作,返回布尔类型,主要用于判断是否存在匹配条件的元素。
/**
* match操作,返回布尔类型,主要用于判断是否存在匹配条件的元素
* allMatch():若所有的元素都匹配条件,则结果为true,否则为false。
* anyMatch():只要有一个元素匹配条件,则结果为true,否则为false。
* noneMatch():若所有的元素都不匹配条件,则结果为true,否则为false。
*/
public static void match(){
System.out.println("++++++++++++++++++++match操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
System.out.println("result1:" + stream1.allMatch(i -> i >0));
Stream<Integer> stream2 = Stream.of(1,2,3,4);
System.out.println("result2:" + stream2.anyMatch(i -> i >3));
Stream<Integer> stream3 = Stream.of(1,2,3,4);
System.out.println("result3:" + stream3.noneMatch(i -> i >3));
}
find操作,会返回Stream中的某个元素Optional。
/**
* find操作,会返回Stream中的某个元素Optional
*
* Optional<T> findFirst():返回Stream中的第一个元素。
* Optional<T> findAny():返回Stream中的任意一个元素。
*/
public static void find(){
System.out.println("++++++++++++++++++++find操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
Stream<Integer> stream2 = Stream.of(1,2,3,4);
//获取第一个值
Optional<Integer> first = stream1.findFirst();
System.out.println("first:" + first.get());
//获取任意一个值,但是正常情况下一般会取第一个元素,在并行流的情况下会随机取一个元素
Optional<Integer> any = stream2.parallel().findAny();
System.out.println("any:" + any.get());
}
foreach操作用于对Stream中的每一个元素执行consume函数。
public static void foreach(){
System.out.println("++++++++++++++++++++foreach操作 可以对比在并发情况下,输出顺序是不一样的+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4).parallel();
stream1.forEach(i -> System.out.print(i + " "));
System.out.println("");
Stream<Integer> stream2 = Stream.of(1,2,3,4).parallel();
stream2.forEachOrdered(i -> System.out.print(i + " "));
}
count操作,用于返回Stream中元素的个数。
/**
* count操作,用于返回Stream中元素的个数
*/
public static void count(){
System.out.println("++++++++++++++++++++count操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
System.out.println("count:" + stream1.count());
}
max/min操作,返回Optional类型。
public static void maxAndMin(){
System.out.println("++++++++++++++++++++max/min操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
System.out.println("max:" + stream1.max(Comparator.comparingInt(o -> o)).get());
Stream<Integer> stream2 = Stream.of(1,2,3,4);
System.out.println("min:" + stream2.min(Comparator.comparingInt(o -> o)).get());
}
reduce操作,通过BinaryOperator函数式接口对Stream中的所有元素逐次进行计算,得到一个最终值并且返回。
public static void reduce(){
System.out.println("++++++++++++++++++++reduce操作 +++++++++++++++++++++");
//求和
Stream<Integer> stream1 = Stream.of(1,2,3,4);
System.out.println("sum:" + stream1.reduce(0,Integer::sum));
//求最大值
Stream<Integer> stream2 = Stream.of(1,2,3,4);
System.out.println("max:" + stream2.reduce(0,(a,b) -> (a >= b ? a : b)));
}
collect操作可以将Stream中的元素聚合到一个新的集合中,比如map、set、list等,涉及到了Collector的使用。
public static void collect(){
System.out.println("++++++++++++++++++++collect操作+++++++++++++++++++++");
Stream<Integer> stream1 = Stream.of(1,2,3,4);
List<Integer> list = stream1.collect(Collectors.toList());
System.out.println("list:" + list);
}
NumericStream是一个总称,代表着具体数据类型的Stream,比如IntStream、LongStream等。
/**
* Stream<Integer> 和 IntStream性能对比示例
*/
public static void testNumericStream(){
Stream<Integer> stream1 = Stream.iterate(1,seed -> seed+1).limit(1000);
IntStream stream2 = IntStream.rangeClosed(1,1000);
System.out.println("++++++++++++++++++++Stream<Integer> 操作时长+++++++++++++++++++++");
long start1 = Calendar.getInstance().getTimeInMillis();
stream1.reduce(0,Integer::sum);
long end1 = Calendar.getInstance().getTimeInMillis();
System.out.println("Stream<Integer>运行时长:" + (end1 - start1));
System.out.println("++++++++++++++++++++IntStream 操作时长+++++++++++++++++++++");
long start2 = Calendar.getInstance().getTimeInMillis();
stream2.reduce(0,Integer::sum);
long end2 = Calendar.getInstance().getTimeInMillis();
System.out.println("IntStream运行时长:" + (end2 - start2));
}
这里主要以IntStream 为例,DoubleStream 和LongStream 类似,这里就不再重复。
public static void test(){
//mapToInt()方法,把Stream<String>转换成了IntStream
Stream<String> stream1 = Stream.of("1","2","3");
IntStream stream2 = stream1.mapToInt(i -> Integer.valueOf(i));
stream2.forEach(i -> System.out.println(i));
//boxed()方法,封箱操作,IntStream转换成Stream<String>
IntStream stream3 = IntStream.rangeClosed(1,5);
Stream<Integer> stream4 = stream3.boxed();
//mapToObj()方法,把IntStream转化成了Stream<Boolean>,与mapToLong和mapToDouble类似
IntStream stream5 = IntStream.rangeClosed(1,5);
Stream<Boolean> stream6 = stream5.mapToObj(i -> (i%2==0 ? true : false));
//asDoubleStream()方法,把IntStream转成DoubleStream,和asLongStream()方法一样。
IntStream stream7 = IntStream.rangeClosed(1,5);
//DoubleStream stream8 = stream7.asDoubleStream();
LongStream stream8 = stream7.asLongStream();
}
这篇内容主要记录了Java8 Streams用法中的常见操作,后需要我们还会专门介绍collect()方法和Collector的使用、Stream并行等内容。