大多处理数据的流程是 1)加载数据源数据 2)转换处理 3) 输出结果
map(new MapFunction )
MapFunction: (x)-> y [1条变1条]
- /**
- * @Date: 22.11.8
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class Base_API_MapFunction {
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
-
- // 加载网络数据流
- DataStreamSource
ds = see.socketTextStream("linux01", 9999); -
- // 使用转换算子 map 处理数据 一条一条的处理数据
-
- /**
- * 示例一
- */
- SingleOutputStreamOperator
sos = ds.map(new MapFunction() { - // 每条数据调用一次
- @Override
- public String map(String line) throws Exception {
- // 将每条数据转换成大写
- return line.toUpperCase();
- }
- });
- /**
- * 示例二
- */
- SingleOutputStreamOperator
> sos2 = ds.map(new MapFunction>() { - @Override
- public Tuple2
map(String line) throws Exception { - // 将接收的数据 封装成二元组
- String[] split = line.split("\\s+");
- Tuple2
tp2 = Tuple2.of(split[0], split[1]); - return tp2;
- }
- });
-
- /**
- * 示例三
- * MapFunction 接口中只有一个抽象各个 可以使用Lamda表达式的方式处理数据
- * public interface MapFunction
extends Function, Serializable { - * O map (T value) throws Exception;
- * }
- *}
- */
- SingleOutputStreamOperator
> sos3= ds.map(line->{ - String[] arr = line.split("\\s+");
- return Tuple2.of(arr[0] , arr[1]) ;
- }) .returns(TypeInformation.of(new TypeHint
>() {})) ; // 指定返回值的数据类型 - // 或者 .returns(new TypeHint
>() {}) ; // 指定返回值的数据类型 - sos3.print("map后的数据: ") ;
- see.execute() ;
-
- }
- }
如果是调用map方法时传入Lambda表达式,需要在调用map方法后,在调用returns方法指定返回的数据的类型。不然Flink无法自动推断出返回的数据类型,会出现异常。
flatMap( new FlatMapFcuntion)
FlatMapFunction: x-> x1, x2,x3,x4 [1条变多条,并展平]
- /**
- * @Date: 22.11.8
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class Base_API_FlatMapFunction {
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- // 加载网络数据流
- DataStreamSource
ds = see.socketTextStream("linux01", 9999); -
- /**
- * 示例一
- * 将接收到的一行数据 扁平化处理
- * 组装成 (单词和1)
- */
- SingleOutputStreamOperator
> soo = ds.flatMap(new FlatMapFunction>() { - @Override
- public void flatMap(String line, Collector
> out) throws Exception { - String[] arr = line.split("\\s+");
- for (String word : arr) {
- Tuple2
tp = Tuple2.of(word, 1); - // 接收到一条数据 ,将一条数据转成成 多条数据后 使用Collector 收集多条数据
- out.collect(tp);
- }
- }
- });
-
- /**
- * 示例二
- * 使用lambda 表达式处理数据
- * 不会自动推断返回值数据 类型 可以使用returns 指定返回值数据类型
- */
- SingleOutputStreamOperator
> soo2 = ds.flatMap((String line, Collector> out) -> { - String[] arr = line.split("\\s+");
- for (String s : arr) {
- out.collect(Tuple2.of(s, 1));
- }
- }).returns(Types.TUPLE(Types.STRING, Types.INT));
- soo2.print("扁平化后的数据: ");
- see.execute("flatMap函数示例") ;
-
- }
- }
该算子只能对Tuple类型数据使用,project方法的功能类似sql中的"select 字段";
该方法只有Java的API有,Scala的API没此方法。
- /**
- * @Date: 22.11.8
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _3Base_API_Project {
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
-
- /**
- * project 投影(DataStream → DataStream)
- * 该算子只能对Tuple类型数据使用,project方法的功能类似sql中的"select 字段";
- * 该方法只有Java的API有,Scala的API没此方法。
- */
- DataStreamSource
> ds = see.fromElements( - Tuple4.of(1, "YY", "F", 100),
- Tuple4.of(2, "DY", "F", 99)
-
- );
- // 处理每条数据 ,返回每条数据中的指定位置的属性值
- // 只要 id 和 name
- SingleOutputStreamOperator
res = ds.project(0, 1); - res.print() ;
- see.execute("project函数") ;
-
- }
- }
filter过滤(DataStream → DataStream)
filter(new FilterFunction)
FilterFunction : x -> true/false
- /**
- * @Date: 22.11.8
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _4Base_API_FilterFunction {
-
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
-
- // 获取数据源
- DataStreamSource
ds = see.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9); - /**
- * 过滤出偶数
- */
- SingleOutputStreamOperator
res = ds.filter(new FilterFunction() { - @Override
- public boolean filter(Integer value) throws Exception {
- return value % 2 == 0;
- }
- });
- //使用 lambda表达式 过滤出奇数
- SingleOutputStreamOperator
res2 = ds.filter(e -> { - return e % 2 == 1;
- });
-
- /**
- * 示例 -----
- * 过滤出分数大于60的人
- */
- DataStreamSource
yyds = see.fromElements( - new YY(1, "DY", 100),
- new YY(2, "XY", 100),
- new YY(3, "HH", 10),
- new YY(4, "XH", 12)
- );
- SingleOutputStreamOperator
res3 = yyds.filter(new FilterFunction() { - @Override
- public boolean filter(YY yy) throws Exception {
- return yy.getScore() > 60;
- }
- });
-
- res3.print();
- see.execute("filter function");
-
- }
- }
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- @ToString
- class YY {
- private int id;
- private String name;
- private double score;
- }

keyBy按key分组(DataStream → KeyedStream)
- /**
- * @Date: 22.11.8
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _5Base_API_KeyBy {
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8888);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- // 加载网络数据流
- DataStreamSource
ds = see.socketTextStream("linux01", 9999); -
- SingleOutputStreamOperator
> wordAndOne = ds.flatMap(new FlatMapFunction>() { - @Override
- public void flatMap(String line, Collector
> out) throws Exception { - String[] arr = line.split("\\s+");
- for (String word : arr) {
- Tuple2
tp = Tuple2.of(word, 1); - // 接收到一条数据 ,将一条数据转成成 多条数据后 使用Collector 收集多条数据
- out.collect(tp);
- }
- }
- });
-
- /**
- * 对数据流进行分组
- * -- 按照单词分组
- */
-
- // 按照单词分组
- wordAndOne.keyBy(0) ;
- // 按照单词分组 KeyedStream
- KeyedStream
, String> res = wordAndOne.keyBy(new KeySelector, String>() { - @Override
- public String getKey(Tuple2
value) throws Exception { - return value.f0;
- }
- });
- //根据自定义数据类型中的某个属性进行分组
- DataStreamSource
ds2 = see.fromElements( - new YY2(1, "DY", "NM_BT", 100),
- new YY2(2, "XY", "NM_BT", 100),
- new YY2(3, "HH", "SD_HZ", 10),
- new YY2(4, "XH", "SD_HZ", 12)
- );
-
- /**
- * 对数据流进行分组
- * -- 根据Bean的属性
- */
- ds2.keyBy(new KeySelector
() { - @Override
- public String getKey(YY2 value) throws Exception {
- return value.getCity();
- }
- }) ;
-
- ds2.keyBy(YY2::getCity) ;
- res.print() ;
- see.execute() ;
-
- }
- }
-
- @Data
- @NoArgsConstructor
- @AllArgsConstructor
- @ToString
- class YY2 {
- private int id;
- private String name;
- private String city ;
- private double score;
- }

- package com.blok;
-
- import lombok.AllArgsConstructor;
- import lombok.Data;
- import lombok.NoArgsConstructor;
- import lombok.ToString;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- /**
- * @Date: 22.11.8
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _6Base_API_Sum{
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8898);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- // 加载网络数据流
- DataStreamSource
ds = see.socketTextStream("linux01", 9999); -
- SingleOutputStreamOperator
> wordAndOne = ds.flatMap(new FlatMapFunction>() { - @Override
- public void flatMap(String line, Collector
> out) throws Exception { - String[] arr = line.split("\\s+");
- for (String word : arr) {
- Tuple2
tp = Tuple2.of(word, 1); - // 接收到一条数据 ,将一条数据转成成 多条数据后 使用Collector 收集多条数据
- out.collect(tp);
- }
- }
- });
-
- /**
- * 对数据流进行分组
- * -- 按照单词分组
- */
-
- // 按照单词分组
- wordAndOne.keyBy(0) ;
- // 按照单词分组 KeyedStream
- KeyedStream
, String> res = wordAndOne.keyBy(new KeySelector, String>() { - @Override
- public String getKey(Tuple2
value) throws Exception { - return value.f0;
- }
- });
- //------------------------------------------------------------------------------
- // 滚动聚合: 随着数据的流出 结果数据源源的进行数据叠加
- // 统计单词出现的次数
- SingleOutputStreamOperator
> sum = res.sum("1"); - SingleOutputStreamOperator
> sum2 = res.sum(1); -
- //------------------------------------------------------------------------------
-
- //根据自定义数据类型中的某个属性进行分组
- DataStreamSource
ds2 = see.fromElements( - new YY2(1, "DY", "NM_BT", 100),
- new YY2(2, "XY", "NM_BT", 100),
- new YY2(3, "HH", "SD_HZ", 10),
- new YY2(4, "XH", "SD_HZ", 12)
- );
-
- /**
- * 对数据流进行分组
- * -- 根据Bean的属性
- */
- KeyedStream
keyed = ds2.keyBy(new KeySelector() { - @Override
- public String getKey(YY2 value) throws Exception {
- return value.getCity();
- }
- });
-
- //------------------------------------------------------------------------------
- // 滚动聚合: 随着数据的流出 结果数据源源的进行数据叠加
- // 统计每组的总分 根据组内Bean的属性
- SingleOutputStreamOperator
score = keyed.sum("score"); - score.print() ;
- //------------------------------------------------------------------------------
- see.execute() ;
-
- }
- }
这两个算子都是求最小值;min和minBy的区别在于:
底层原理:滚动更新时是更新一个字段,还是更新整条数据的区别;
- package com.blok;
-
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.configuration.Configuration;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- /**
- * @Date: 22.11.8
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _7Base_API_MaxMin {
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8898);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- // 加载网络数据流
- DataStreamSource
ds = see.socketTextStream("linux01", 9999); -
- /**
- * public class YY2 {
- * private int id;
- * private String name;
- * private String city ;
- * private double score;
- * }
- */
- SingleOutputStreamOperator
beans = ds.map(new MapFunction() { - @Override
- public YY2 map(String value) throws Exception {
- String[] arr = value.split(",");
- YY2 yy = new YY2(Integer.parseInt(arr[0]), arr[1], arr[2], Double.parseDouble(arr[3]));
- return yy;
- }
- });
- //将 数据按照城市分组
- KeyedStream
keyed = beans.keyBy(YY2::getCity); - //---------------------------------------------------------------
- /**
- * min 返回的是第一条数据 但是 会修改第一条数据 指定的属性信息
- * max 返回的是第一条数据 但是 会修改第一条数据 指定的属性信息
- *
- * 分数最小的数据: 1,yy1,NM_BT,98
- * 分数最大的数据: 1,yy1,NM_BT,100
- */
- // 获取分数最低的 信息
- SingleOutputStreamOperator
minScoreInfo = keyed.min("score"); - SingleOutputStreamOperator
maxScoreInfo = keyed.max("score"); - /**
- * 测试数据
- * 1,yy1,NM_BT,99
- * 2,yy2,NM_BT,100
- * 3,yy3,NM_BT,98
- * 4,yy4,NM_BT,98.5
- * 1,hh1,SD_HZ,99
- * 2,hh2,SD_HZ,100
- * 3,hh3,SD_HZ,98
- * 4,hh4,SD_HZ,98.5
- * 5,hh5,SD_HZ,101
- */
- //---------------------------------------------------------------
- /**
- * maxBy minBy返回的就是那条指定属性最大(最小)的数据
- *
- * 分数最大的数据: > YY2(id=2, name=yy2, city=NM_BT, score=100.0)
- * 分数最小的数据: > YY2(id=3, name=yy3, city=NM_BT, score=98.0)
- */
- SingleOutputStreamOperator
minScoreInfoBy = keyed.minBy("score"); - SingleOutputStreamOperator
maxScoreInfoBy = keyed.maxBy("score"); - minScoreInfoBy.print("分数最小的数据: ") ;
- maxScoreInfoBy.print("分数最大的数据: ") ;
-
- see.execute() ;
-
- }
- }
它的滚动聚合逻辑没有写死,而是由用户通过ReduceFunction来传入。
- /**
- * @Date: 22.11.8
- * @Author: Hang.Nian.YY
- * @qq: 598196583
- * @Tips: 学大数据 ,到多易教育
- * @Description:
- */
- public class _8Base_API_Reduce {
- public static void main(String[] args) throws Exception {
-
- Configuration conf = new Configuration();
- conf.setInteger("rest.port", 8898);
- StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
- see.setParallelism(1);
- // 加载网络数据流
- DataStreamSource
ds = see.socketTextStream("linux01", 9999); -
- /**
- * public class YY2 {
- * private int id;
- * private String name;
- * private String city ;
- * private double score;
- * }
- */
- SingleOutputStreamOperator
beans = ds.map(new MapFunction() { - @Override
- public YY2 map(String value) throws Exception {
- String[] arr = value.split(",");
- YY2 yy = new YY2(Integer.parseInt(arr[0]), arr[1], arr[2], Double.parseDouble(arr[3]));
- return yy;
- }
- });
-
- // 将数据beans分组
- KeyedStream
keyed = beans.keyBy(YY2::getCity); - SingleOutputStreamOperator
reduced = keyed.reduce(new ReduceFunction() { - @Override
- public YY2 reduce(YY2 value1, YY2 value2) throws Exception {
- YY2 yy2 = new YY2();
- yy2.setScore(value1.getScore() + value2.getScore());
- yy2.setCity(value1.getCity());
- return yy2;
- }
- });
- reduced.print("聚合后的结果");
- see.execute() ;
-
- }
- }