基本转换算子 说明 映射(map) 将数据流中的数据进行转换,形成新的数据流 过滤(filter) 将数据流中的数据根据条件过滤 扁平映射(flatMap) 将数据流中的整体(如:集合)拆分成个体使用。消费一个元素,产生0到多个元素
- package com.qiyu.Transformation;
-
- import org.apache.flink.api.common.functions.FilterFunction;
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
-
- /**
- * @author MR.Liu
- * @version 1.0
- * @data 2023-10-19 11:00
- */
- public class Trans {
-
- /***
- * 映射 map 算子
- * @param env
- */
- public static void map(StreamExecutionEnvironment env){
- DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);
-
- //将集合中的元素值都 加上 100
- DataStream<Integer> map = stream.map(new MapFunction<Integer, Integer>() {
- @Override
- public Integer map(Integer integer) throws Exception {
- return integer+100;
- }
- });
- map.print();
- }
-
- /***
- * 过滤 filter 算子
- * @param env
- */
- public static void filter(StreamExecutionEnvironment env){
- DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);
- //将集合中的值取模,不等于1的通行,反之过滤
- DataStream<Integer> filter = stream.filter(new FilterFunction<Integer>() {
- @Override
- public boolean filter(Integer integer) throws Exception {
- if (integer % 2 != 1) {
- return true;
- }
- return false;
- }
- });
- filter.print();
- }
-
- /***
- * 扁平化 flatMap 算子
- * @param env
- */
- public static void flatMap(StreamExecutionEnvironment env){
- DataStream<String> stream = env.fromElements(
- "Flink is a powerful framework for stream and batch processing",
- "It provides support for event time processing"
- );
- //将字符串以空格分隔,拆成多个字符串个体
- stream.flatMap(new FlatMapFunction<String, Object>() {
- @Override
- public void flatMap(String s, Collector<Object> collector) throws Exception {
- String[] words = s.split(" ");
- for (String word : words){
- collector.collect(word);
- }
- }
- }).print();
-
-
- }
-
- /**
- * 主程序类
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- //map(env);
- //filter(env);
- flatMap(env);
- env.execute();
- }
- }
聚合算子 说明 按键分区(keyBy) 通过指定键(key),将一条流逻辑上划分为不同的分区。分区指的是并行任务的子任务,对应着任务槽(task solt) 简单聚合 sum():在输入流上,对指定的字段做叠加求和的操作。
min():在输入流上,对指定的字段求最小值。
max():在输入流上,对指定的字段求最大值。
minBy():在输入流上针对指定字段求最小值。
maxBy():在输入流上针对指定字段求最大值。
归约聚合(reduce) 可以把每一个新输入的数据和当前已经归约出来的值,做聚合计算
- package com.qiyu.Transformation;
-
- import com.qiyu.Source.Student;
- import org.apache.flink.api.java.functions.KeySelector;
- import org.apache.flink.api.java.tuple.Tuple;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.datastream.KeyedStream;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import java.util.ArrayList;
-
- /**
- * @author MR.Liu
- * @version 1.0
- * @data 2023-10-19 14:45
- */
- public class Aggregation {
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env =
- StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(
- Tuple2.of("a", 1),
- Tuple2.of("a", 3),
- Tuple2.of("b", 3),
- Tuple2.of("b", 4)
- );
-
- stream.keyBy(r -> r.f0).print();
- stream.keyBy(r -> r.f0).sum(1).print();
- stream.keyBy(r -> r.f0).min(1).print();
- stream.keyBy(r -> r.f0).max(1).print();
- stream.keyBy(r -> r.f0).maxBy(1).print();
- stream.keyBy(r -> r.f0).minBy(1).print();
-
- stream.keyBy(r -> r.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
- @Override
- public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
- return Tuple2.of(t1.f0, t1.f1 + t2.f1);
- }
- }).print();
-
- env.execute();
- }
- }