• Flink—— Flink Data transformation(转换)


            Flink数据算子转换有很多类型,各位看官看好,接下来,演示其中的十八种类型。

    1.Map(映射转换)

            DataStream → DataStream

            将函数作用在集合中的每一个元素上,并返回作用后的结果,其中输入是一个数据流,输出的也是一个数据流:

    1. DataStream<Integer> dataStream = //加载数据源
    2. dataStream.map(new MapFunction<Integer, Integer>() {
    3. @Override
    4. public Integer map(Integer age) throws Exception {
    5. return 2 + age;
    6. }
    7. });

    2.Flatmap(扁平映射转换)

            DataStream → DataStream

            FlatMap 将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果,即采用一条记录并输出零个,一个或多个记录。

    1. //加载数据源
    2. dataStream.flatMap(new FlatMapFunction<String, String>() {
    3. @Override
    4. public void flatMap(String value, Collector<String> out)
    5. throws Exception {
    6. for(String word: value.split(",")){
    7. out.collect(word);
    8. }
    9. }
    10. });

    3.Filter(过滤转换)

            DataStream → DataStream

            按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素。

    1. // 过滤出年龄大于18的数据
    2. dataStream.filter(new FilterFunction<Integer>() {
    3. @Override
    4. public boolean filter(Integer age) throws Exception {
    5. return age > 18;
    6. }
    7. });

    Keyby(分组转换)

            DataStream → KeyedStream

            按照指定的key来对流中的数据进行分组,在逻辑上是基于 key 对流进行分区。在内部,它使用 hash 函数对流进行分区。它返回 KeyedDataStream 数据流。

    1. KeyedStream<Student, Integer> keyBy = student.keyBy(new KeySelector<Student, Integer>() {
    2. @Override
    3. public Integer getKey(Student value) throws Exception {
    4. return value.age;
    5. }
    6. });

    4.Reduce(归约转换)

            KeyedStream → DataStream

            对集合中的元素进行聚合,Reduce 返回单个的结果值,并且 reduce 操作每处理一个元素总是创建一个新值。常用的方法有 average, sum, min, max, count,使用 reduce 方法都可实现。

    1. keyedStream.reduce(new ReduceFunction<Integer>() {
    2. @Override
    3. public Integer reduce(Integer value1, Integer value2)
    4. throws Exception {
    5. return value1 * value2;
    6. }
    7. });

    5.Aggregations(聚合转换)

            KeyedStream → DataStream

            在分组后的数据集上进行聚合操作,如求和、计数、最大值、最小值等。这些函数可以应用于 KeyedStream 以获得 Aggregations 聚合。

    1. DataStream<Tuple2<String, Integer>> dataStream = ...; // 加载数据源
    2. dataStream.keyBy(0) // 对元组的第一个元素进行分组
    3. .sum(1); // 对元组的第二个元素求和

    6.Window(分组开窗转换)

    KeyedStream → WindowedStream

            Flink 定义数据片段以便(可能)处理无限数据流。 这些切片称为窗口,将数据流划分为不重叠的窗口,并在每个窗口上执行转换操作,常用于对时间窗口内的数据进行处理。 此切片有助于通过应用转换处理数据块。 要对流进行窗口化,需要分配一个可以进行分发的键和一个描述要对窗口化流执行哪些转换的函数,

            要将流切片到窗口,我们可以使用 Flink 自带的窗口分配器。 我们有选项,如 tumbling windows, sliding windows, global 和 session windows。 Flink 还允许您通过扩展 WindowAssginer 类来编写自定义窗口分配器。

    inputStream.keyBy(0).window(Time.seconds(10));
    

            上述案例是数据分组后,是以 10 秒的时间窗口聚合:

    7.WindowAll(开窗转换)

            DataStream → AllWindowedStream

            类似于 Window 操作,但是对整个数据流应用窗口操作而不是对每个 key 分别应用。

            windowAll 函数允许对常规数据流进行分组。 通常,这是非并行数据转换,因为它在非分区数据流上运行。 唯一的区别是它们处理窗口数据流。 所以窗口缩小就像 Reduce 函数一样,Window fold 就像 Fold 函数一样,并且还有聚合。

    1. // 创建一个简单的数据流
    2. DataStream<Tuple2<String, Integer>> dataStream = ...; // 请在此处填充你的数据源
    3. // 定义一个 ReduceFunction,用于在每个窗口内进行求和操作
    4. ReduceFunction<Tuple2<String, Integer>> reduceFunction = new ReduceFunction>() {
    5. @Override
    6. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
    7. return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
    8. }
    9. };
    10. // 使用 WindowAll 方法,指定时间窗口和 ReduceFunction
    11. dataStream.windowAll(Time.of(5, TimeUnit.SECONDS), reduceFunction)
    12. .print(); // 输出结果到 stdout (for debugging)

    8.Union(合并转换)

            DataStream* → DataStream

            将多个数据流合并为一个数据流。union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重

    1. //通过一些 key 将同一个 window 的两个数据流 join 起来。
    2. inputStream.join(inputStream1)
    3. .where(0).equalTo(1)
    4. .window(Time.seconds(5))
    5. .apply (new JoinFunction () {...});
    6. // 以上示例是在 5 秒的窗口中连接两个流,其中第一个流的第一个属性的连接条件等于另一个流的第二个属性。

    9.Connect / CoMap / CoFlatMap(连接转换)

            DataStream,DataStream → DataStream

            连接两个数据流,并对连接后的数据流进行转换操作。

    1. Connect:Connect 算子用于连接两个数据流,这两个数据流的类型可以不同。Connect 算子会将两个数据流连接成一个 ConnectedStreams 对象,但并不对元素做任何转换操作。Connect 算子通常用于需要将两个不同类型的数据流进行关联处理的场景。

    2. CoMap:CoMap 算子用于对 ConnectedStreams 中的每一个数据流应用一个 map 函数,将它们分别转换为另一种类型。CoMap 会将两个数据流中的元素分别转换为不同的类型,因此在使用 CoMap 时需要分别指定两个不同的 map 函数。

    3. CoFlatMap:CoFlatMap 算子和 CoMap 类似,也是用于对 ConnectedStreams 中的每一个数据流应用一个 flatMap 函数,将它们分别转换为另一种类型。不同之处在于,CoFlatMap 生成的元素个数可以是 0、1 或多个,因此适用于需要将每个输入元素转换为零个、一个或多个输出元素的情况。

    1. // 创建两个数据流
    2. DataStream<Type1> dataStream1 = ... // 从某个地方获取 Type1 类型的数据流
    3. DataStream<Type2> dataStream2 = ... // 从某个地方获取 Type2 类型的数据流
    4. // 使用 Connect 算子连接两个数据流
    5. ConnectedStreams<Type1, Type2> connectedStreams = dataStream1.connect(dataStream2);
    6. // 使用 CoMap 对每个数据流进行单独的转换
    7. SingleOutputStreamOperator<ResultType1> resultStream1 = connectedStreams.map(new CoMapFunction<Type1, ResultType1>() {
    8. @Override
    9. public ResultType1 map1(Type1 value) throws Exception {
    10. //Type1 数据流的转换逻辑
    11. // ...
    12. return transformedResult1;
    13. }
    14. });
    15. SingleOutputStreamOperator<ResultType2> resultStream2 = connectedStreams.map(new CoMapFunction<Type2, ResultType2>() {
    16. @Override
    17. public ResultType2 map2(Type2 value) throws Exception {
    18. //Type2 数据流的转换逻辑
    19. // ...
    20. return transformedResult2;
    21. }
    22. });
    23. // 使用 CoFlatMap 对连接后的数据流进行转换
    24. SingleOutputStreamOperator<ResultType> resultStream = connectedStreams.flatMap(new CoFlatMapFunction<Type1, Type2, ResultType>() {
    25. @Override
    26. public void flatMap1(Type1 value, Collector<ResultType> out) throws Exception {
    27. //Type1 数据流的转换逻辑
    28. // 将转换后的结果发射出去
    29. out.collect(transformedResult1);
    30. }
    31. @Override
    32. public void flatMap2(Type2 value, Collector<ResultType> out) throws Exception {
    33. //Type2 数据流的转换逻辑
    34. // 将转换后的结果发射出去
    35. out.collect(transformedResult2);
    36. }
    37. });
    38. // 执行任务
    39. env.execute("Connect and CoMap Example");

    10.Join(连接转换)

            KeyedStream,KeyedStream → DataStream

            可以使用 join 算子来实现两个数据流的连接转换操作

    1. java
    2. // 创建两个数据流
    3. DataStream<Type1> inputStream1 = ... // 从某个地方获取 Type1 类型的数据流
    4. DataStream<Type2> inputStream2 = ... // 从某个地方获取 Type2 类型的数据流
    5. // 使用 keyBy 将两个数据流按照相同的字段进行分区
    6. KeyedStream<Type1, KeyType> keyedStream1 = inputStream1.keyBy(<keySelector>);
    7. KeyedStream<Type2, KeyType> keyedStream2 = inputStream2.keyBy(<keySelector>);
    8. // 使用 join 进行连接转换
    9. SingleOutputStreamOperator<OutputType> resultStream = keyedStream1
    10. .join(keyedStream2)
    11. .where(<keySelector1>)
    12. .equalTo(<keySelector2>)
    13. .window(<windowAssigner>)
    14. .apply(new JoinFunction<Type1, Type2, OutputType>() {
    15. @Override
    16. public OutputType join(Type1 value1, Type2 value2) throws Exception {
    17. // 执行连接转换逻辑
    18. // ...
    19. return transformedResult;
    20. }
    21. });
    22. // 执行任务
    23. env.execute("Join Example");

            上述事例有两个输入数据流 inputStream1inputStream2,它们的元素类型分别为 Type1Type2。对这两个数据流进行连接转换操作,并输出连接后的结果。首先使用 keyBy 对两个数据流进行分区,然后使用 join 算子将两个分区后的数据流按照指定的条件进行连接。在 join 方法中,我们需要指定连接条件和窗口分配器,并通过 apply 方法应用一个 JoinFunction 对连接后的数据进行转换操作。在 JoinFunctionjoin 方法中,我们可以编写具体的连接转换逻辑,然后返回转换后的结果。

    Split / Select:将一个数据流拆分为多个数据流,然后对不同的数据流进行选择操作。

    此功能根据条件将流拆分为两个或多个流。 当您获得混合流并且您可能希望单独处理每个数据流时,可以使用此方法。

    11.Apply(窗口中的元素自定义转换)

            WindowedStream → DataStream
            AllWindowedStream → DataStream

            当使用 Flink 的 apply 方法时,将一个自定义的函数应用于流中的每个元素,并生成一个新的流。这个自定义的函数可以是 MapFunctionFlatMapFunctionFilterFunction 等接口的实现。

    1. // 创建输入数据流
    2. DataStream<Type1> inputStream = ... // 从某个地方获取 Type1 类型的数据流
    3. // 使用 apply 方法应用自定义函数
    4. SingleOutputStreamOperator<OutputType> resultStream = inputStream
    5. .apply(new MyMapFunction());
    6. // 定义自定义的 MapFunction
    7. public class MyMapFunction implements MapFunction<Type1, OutputType> {
    8. @Override
    9. public OutputType map(Type1 value) {
    10. // 执行转换操作
    11. OutputType transformedValue = ... // 对输入元素进行一些转换操作
    12. return transformedValue;
    13. }
    14. }
    15. // 执行任务
    16. env.execute("Apply Example");

    12.Iterate(迭代转换)

            DataStream → IterativeStream → DataStream

            允许在数据流上进行迭代计算,通常用于实现迭代算法。iterate函数允许您定义一个迭代处理的核心逻辑,并通过closeWith方法指定迭代结束的条件。

    1. // 定义迭代逻辑
    2. DataSet<Long> iteration = initialInput.iterate(1000) // 指定迭代上限
    3. .map(new MapFunction<Long, Long>() {
    4. @Override
    5. public Long map(Long value) throws Exception {
    6. // 迭代处理逻辑,这里简单地加1
    7. return value + 1;
    8. }
    9. });
    10. // 指定迭代结束条件
    11. DataSet<Long> result = iteration.closeWith(iteration.filter(value -> value >= 10));

            在这个示例中,使用iterate函数来定义迭代逻辑,其中map函数对每个元素进行加1操作。接着,我们使用closeWith方法来指定迭代结束的条件,即当元素的值大于等于10时结束迭代。

            需要注意的是,在实际的迭代处理中,需要根据具体业务逻辑来定义迭代的处理过程和结束条件。另外,还需要注意迭代过程中的性能和资源消耗,以及迭代次数的控制,避免出现无限循环等问题。

    13.CoGroup(分组连接转换)

            DataStream,DataStream → DataStream

            将两个或多个数据流中的元素进行连接操作,通常基于相同的键进行连接。

    1. import org.apache.flink.api.common.functions.CoGroupFunction;
    2. import org.apache.flink.api.java.ExecutionEnvironment;
    3. import org.apache.flink.api.java.tuple.Tuple2;
    4. import org.apache.flink.util.Collector;
    5. import java.util.ArrayList;
    6. import java.util.List;
    7. public class CoGroupExample {
    8. public static void main(String[] args) throws Exception {
    9. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    10. // 创建第一个数据集
    11. List<Tuple2<Integer, String>> firstDataSet = new ArrayList<>();
    12. firstDataSet.add(new Tuple2<>(1, "A"));
    13. firstDataSet.add(new Tuple2<>(2, "B"));
    14. // 创建第二个数据集
    15. List<Tuple2<Integer, String>> secondDataSet = new ArrayList<>();
    16. secondDataSet.add(new Tuple2<>(1, "X"));
    17. secondDataSet.add(new Tuple2<>(3, "Y"));
    18. // 将数据集转化为Flink的DataSet
    19. org.apache.flink.api.java.DataSet<Tuple2<Integer,String>> first = env.fromCollection(firstDataSet);
    20. org.apache.flink.api.java.DataSet<Tuple2<Integer,String>> second = env.fromCollection(secondDataSet);
    21. // 使用CoGroup算子进行连接
    22. first.coGroup(second)
    23. .where(0) // 第一个数据集的连接字段
    24. .equalTo(0) // 第二个数据集的连接字段
    25. .with(new MyCoGroupFunction()) // 指定自定义的CoGroupFunction
    26. .print();
    27. env.execute();
    28. }
    29. // 自定义CoGroupFunction
    30. public static class MyCoGroupFunction implements CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, String> {
    31. @Override
    32. public void coGroup(Iterable<Tuple2<Integer, String>> first, Iterable> second, Collector out) {
    33. List<String> valuesFromFirst = new ArrayList<>();
    34. for (Tuple2<Integer, String> t : first) {
    35. valuesFromFirst.add(t.f1);
    36. }
    37. List<String> valuesFromSecond = new ArrayList<>();
    38. for (Tuple2<Integer, String> t : second) {
    39. valuesFromSecond.add(t.f1);
    40. }
    41. // 对两个数据集的分组进行连接操作
    42. for (String s1 : valuesFromFirst) {
    43. for (String s2 : valuesFromSecond) {
    44. out.collect(s1 + "-" + s2);
    45. }
    46. }
    47. }
    48. }
    49. }

            在这个示例中,首先创建了两个简单的数据集firstDataSetsecondDataSet,然后将它们转换为Flink的DataSet对象。接着使用CoGroup算子对这两个数据集进行分组连接操作,其中通过whereequalTo指定了连接字段,通过with方法指定了自定义的CoGroupFunction。最后,在CoGroupFunction中实现了对两个数据集分组的连接逻辑,并通过Collector将结果输出。

    14.Cross(笛卡尔积转换)

            计算两个数据流的笛卡尔积。

            DataStream,DataStream → DataStream

    1. import org.apache.flink.api.java.ExecutionEnvironment;
    2. import org.apache.flink.api.java.DataSet;
    3. import org.apache.flink.api.java.tuple.Tuple2;
    4. import java.util.ArrayList;
    5. import java.util.List;
    6. public class CrossExample {
    7. public static void main(String[] args) throws Exception {
    8. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    9. // 创建第一个数据集
    10. List<Integer> firstDataSet = new ArrayList<>();
    11. firstDataSet.add(1);
    12. firstDataSet.add(2);
    13. // 创建第二个数据集
    14. List<String> secondDataSet = new ArrayList<>();
    15. secondDataSet.add("A");
    16. secondDataSet.add("B");
    17. // 将数据集转化为Flink的DataSet
    18. DataSet<Integer> first = env.fromCollection(firstDataSet);
    19. DataSet<String> second = env.fromCollection(secondDataSet);
    20. // 使用Cross算子进行笛卡尔积操作
    21. DataSet<Tuple2<Integer, String>> result = first.cross(second);
    22. // 打印结果
    23. result.print();
    24. env.execute();
    25. }
    26. }

            在这个示例中,首先创建了两个简单的数据集firstDataSetsecondDataSet,然后将它们转换为Flink的DataSet对象。接着使用Cross算子对这两个数据集进行笛卡尔积操作,得到了一个包含所有可能组合的新数据集。

    15.Project(投影转换)

            DataStream → DataStream

            对数据集进行投影操作,选择特定的字段或属性。Project算子用于从数据集中选择或投影出特定的字段。

    1. import org.apache.flink.api.java.ExecutionEnvironment;
    2. import org.apache.flink.api.java.DataSet;
    3. import org.apache.flink.api.java.tuple.Tuple3;
    4. import java.util.ArrayList;
    5. import java.util.List;
    6. public class ProjectExample {
    7. public static void main(String[] args) throws Exception {
    8. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    9. // 创建数据集
    10. List<Tuple3<Integer, String, Double>> inputDataSet = new ArrayList<>();
    11. inputDataSet.add(new Tuple3<>(1, "Alice", 1000.0));
    12. inputDataSet.add(new Tuple3<>(2, "Bob", 1500.0));
    13. inputDataSet.add(new Tuple3<>(3, "Charlie", 2000.0));
    14. // 将数据集转化为Flink的DataSet
    15. DataSet<Tuple3<Integer, String, Double>> input = env.fromCollection(inputDataSet);
    16. // 使用Project算子进行字段投影
    17. DataSet<Tuple2<Integer, String>> projectedDataSet = input.project(0, 1); // 选择字段0和字段1
    18. // 打印结果
    19. projectedDataSet.print();
    20. env.execute();
    21. }
    22. }

            在这个示例中,首先创建了一个包含整数、字符串和双精度浮点数的元组数据集inputDataSet。然后将它们转换为Flink的DataSet对象。接着使用Project算子对数据集进行字段投影,选择了字段0和字段1。最后打印出了字段投影后的结果。

    16.Connect(连接转换)

            DataStream,DataStream → ConnectedStreams

            connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:connect只能连接两个数据流,union可以连接多个数据流。connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
            两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。

    1. import org.apache.flink.api.java.ExecutionEnvironment;
    2. import org.apache.flink.api.common.functions.MapFunction;
    3. import org.apache.flink.api.java.DataSet;
    4. import org.apache.flink.api.java.tuple.Tuple2;
    5. import java.util.ArrayList;
    6. import java.util.List;
    7. public class ConnectExample {
    8. public static void main(String[] args) throws Exception {
    9. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    10. // 创建第一个数据集
    11. List<Integer> firstDataSet = new ArrayList<>();
    12. firstDataSet.add(1);
    13. firstDataSet.add(2);
    14. firstDataSet.add(3);
    15. // 创建第二个数据集
    16. List<String> secondDataSet = new ArrayList<>();
    17. secondDataSet.add("A");
    18. secondDataSet.add("B");
    19. secondDataSet.add("C");
    20. // 将数据集转化为Flink的DataSet
    21. DataSet<Integer> first = env.fromCollection(firstDataSet);
    22. DataSet<String> second = env.fromCollection(secondDataSet);
    23. // 使用Map将Integer类型转换为Tuple2类型
    24. DataSet<Tuple2<Integer, String>> firstMapped = first.map(new MapFunction>() {
    25. @Override
    26. public Tuple2<Integer, String> map(Integer value) {
    27. return new Tuple2<>(value, "default");
    28. }
    29. });
    30. // 使用Connect算子将两个数据集连接在一起
    31. DataSet<Tuple2<Integer, String>> connectedDataSet = firstMapped.connect(second).map(new MapFunction>() {
    32. @Override
    33. public Tuple2<Integer, String> map(Integer value) {
    34. return new Tuple2<>(value, "connected");
    35. }
    36. });
    37. // 打印结果
    38. connectedDataSet.print();
    39. env.execute();
    40. }
    41. }

            在这个示例中,首先创建了两个简单的数据集firstDataSetsecondDataSet,然后将它们转换为Flink的DataSet对象。接着使用Map算子将第一个数据集中的整数类型转换为Tuple2类型。然后使用Connect算子将转换后的第一个数据集与第二个数据集连接在一起,最后再对连接后的数据集进行处理。最终打印出了连接后的结果。

    17.IntervalJoin(时间窗口连接转换)

            KeyedStream,KeyedStream → DataStream

        IntervalJoin算子用于在两个数据流之间执行基于时间窗口的连接操作。

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. import org.apache.flink.api.common.functions.JoinFunction;
    3. import org.apache.flink.streaming.api.datastream.IntervalJoinOperator;
    4. import org.apache.flink.streaming.api.windowed.TimeWindow;
    5. import org.apache.flink.streaming.api.windowing.time.Time;
    6. public class IntervalJoinExample {
    7. public static void main(String[] args) throws Exception {
    8. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    9. // 创建第一个数据流
    10. DataStream<Tuple2<String, Integer>> firstStream = ... // 从数据源获取第一个数据流
    11. // 创建第二个数据流
    12. DataStream<Tuple2<String, String>> secondStream = ... // 从数据源获取第二个数据流
    13. // 定义时间窗口大小
    14. Time windowSize = Time.seconds(10);
    15. // 使用IntervalJoin算子进行时间窗口连接
    16. IntervalJoinOperator<Tuple2<String, Integer>, Tuple2<String, String>, String> joinedStream = firstStream
    17. .intervalJoin(secondStream)
    18. .between(Time.seconds(-3), Time.seconds(3)) // 定义连接窗口范围
    19. .upperBoundExclusive() // 指定上界为不包含
    20. .lowerBoundExclusive() // 指定下界为不包含
    21. .process(new MyIntervalJoinFunction());
    22. // 打印结果
    23. joinedStream.print();
    24. // 执行任务
    25. env.execute("Interval Join Example");
    26. }
    27. // 自定义IntervalJoinFunction
    28. public static class MyIntervalJoinFunction implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, String>, String> {
    29. @Override
    30. public String join(Tuple2<String, Integer> first, Tuple2<String, String> second) {
    31. // 在这里实现连接后的处理逻辑
    32. return "Joined: " + first.toString() + " and " + second.toString();
    33. }
    34. }
    35. }

            在这个示例中,首先创建了两个数据流firstStreamsecondStream,这些数据流可以来自各种数据源(例如Kafka、Socket等)。然后使用IntervalJoin算子将这两个数据流在时间窗口上进行连接操作,通过定义连接窗口的范围来指定两个数据流之间的连接条件。最后定义了自定义的JoinFunction来处理连接后的数据。最终打印出了连接后的结果,并执行Flink任务。

    18.Split / Select(拆分和选择转换

           DataStream  → DataStream

            Split 和 Select 算子用于将单个数据流拆分为多个流,并选择其中的部分流进行处理。

    1. import org.apache.flink.api.java.functions.KeySelector;
    2. import org.apache.flink.streaming.api.datastream.DataStream;
    3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    4. public class SplitSelectExample {
    5. public static void main(String[] args) throws Exception {
    6. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. // 创建数据流
    8. DataStream<String> inputDataStream = ... // 从数据源获取数据流
    9. // 使用 Split 算子将数据流拆分为多个流
    10. SplitStream<String> splitStream = inputDataStream.split(new OutputSelector<String>() {
    11. @Override
    12. public Iterable<String> select(String value) {
    13. List<String> output = new ArrayList<>();
    14. if (value.contains("category1")) {
    15. output.add("category1");
    16. } else if (value.contains("category2")) {
    17. output.add("category2");
    18. } else {
    19. output.add("other");
    20. }
    21. return output;
    22. }
    23. });
    24. // 选择拆分后的流中的部分流进行处理
    25. DataStream<String> category1Stream = splitStream.select("category1");
    26. DataStream<String> category2Stream = splitStream.select("category2");
    27. // 对每个流进行进一步处理
    28. DataStream<String> processedCategory1Stream = category1Stream.map(new MyMapperFunction());
    29. DataStream<String> processedCategory2Stream = category2Stream.filter(new MyFilterFunction());
    30. // 将处理后的结果合并为一个流
    31. DataStream<String> resultStream = processedCategory1Stream.union(processedCategory2Stream);
    32. // 打印结果
    33. resultStream.print();
    34. // 执行任务
    35. env.execute("Split and Select Example");
    36. }
    37. // 自定义 Mapper 函数
    38. public static class MyMapperFunction implements MapFunction<String, String> {
    39. @Override
    40. public String map(String value) {
    41. // 在这里实现对流中元素的转换操作
    42. return "Processed Category1: " + value;
    43. }
    44. }
    45. // 自定义 Filter 函数
    46. public static class MyFilterFunction implements FilterFunction<String> {
    47. @Override
    48. public boolean filter(String value) {
    49. // 在这里实现过滤逻辑
    50. return value.length() > 10;
    51. }
    52. }
    53. }

            在这个示例中,首先创建了一个输入数据流inputDataStream,然后使用 Split 算子将数据流拆分为三个不同的流:category1category2other。接着使用 Select 算子选择了category1category2两个流,并对它们分别应用了自定义的 Mapper 函数和 Filter 函数进行处理。最后将处理后的结果合并为一个流,并打印出来。

    更多消息资讯,请访问昂焱数据(https://www.ayshuju.com)

  • 相关阅读:
    如何设置 Jenkins 流水线环境变量
    uniapp—— uni统计2.0接入记录及问题解决
    Java中八种基本数据类型及其区别、字符编码
    淘宝/天猫API:item_videolist_cat-获取淘宝直播分类id接口
    广州xx策划公司MongoDB恢复-2023.09.09
    Shell中括号的含义
    [1172]python操作odps
    Debezium 同步 MySQL 实时数据并解决数据重复消费问题
    【2023研电赛】全国技术竞赛一等奖:基于FPGA的超低时延激光多媒体终端
    webpack 开发环境一次
  • 原文地址:https://blog.csdn.net/tszc95/article/details/134300569