• Flink学习之旅:(四)Flink转换算子(Transformation)


    1.基本转换算子

    基本转换算子说明
    映射(map)将数据流中的数据进行转换,形成新的数据流
    过滤(filter)将数据流中的数据根据条件过滤
    扁平映射(flatMap)将数据流中的整体(如:集合)拆分成个体使用。消费一个元素,产生0到多个元素

    1. package com.qiyu.Transformation;
    2. import org.apache.flink.api.common.functions.FilterFunction;
    3. import org.apache.flink.api.common.functions.FlatMapFunction;
    4. import org.apache.flink.api.common.functions.MapFunction;
    5. import org.apache.flink.streaming.api.datastream.DataStream;
    6. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    7. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    8. import org.apache.flink.util.Collector;
    9. /**
    10. * @author MR.Liu
    11. * @version 1.0
    12. * @data 2023-10-19 11:00
    13. */
    14. public class Trans {
    15. /***
    16. * 映射 map 算子
    17. * @param env
    18. */
    19. public static void map(StreamExecutionEnvironment env){
    20. DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);
    21. //将集合中的元素值都 加上 100
    22. DataStream<Integer> map = stream.map(new MapFunction<Integer, Integer>() {
    23. @Override
    24. public Integer map(Integer integer) throws Exception {
    25. return integer+100;
    26. }
    27. });
    28. map.print();
    29. }
    30. /***
    31. * 过滤 filter 算子
    32. * @param env
    33. */
    34. public static void filter(StreamExecutionEnvironment env){
    35. DataStream<Integer> stream = env.fromElements(1, 2, 3, 4, 5);
    36. //将集合中的值取模,不等于1的通行,反之过滤
    37. DataStream<Integer> filter = stream.filter(new FilterFunction<Integer>() {
    38. @Override
    39. public boolean filter(Integer integer) throws Exception {
    40. if (integer % 2 != 1) {
    41. return true;
    42. }
    43. return false;
    44. }
    45. });
    46. filter.print();
    47. }
    48. /***
    49. * 扁平化 flatMap 算子
    50. * @param env
    51. */
    52. public static void flatMap(StreamExecutionEnvironment env){
    53. DataStream<String> stream = env.fromElements(
    54. "Flink is a powerful framework for stream and batch processing",
    55. "It provides support for event time processing"
    56. );
    57. //将字符串以空格分隔,拆成多个字符串个体
    58. stream.flatMap(new FlatMapFunction<String, Object>() {
    59. @Override
    60. public void flatMap(String s, Collector<Object> collector) throws Exception {
    61. String[] words = s.split(" ");
    62. for (String word : words){
    63. collector.collect(word);
    64. }
    65. }
    66. }).print();
    67. }
    68. /**
    69. * 主程序类
    70. * @param args
    71. * @throws Exception
    72. */
    73. public static void main(String[] args) throws Exception {
    74. StreamExecutionEnvironment env =
    75. StreamExecutionEnvironment.getExecutionEnvironment();
    76. env.setParallelism(1);
    77. //map(env);
    78. //filter(env);
    79. flatMap(env);
    80. env.execute();
    81. }
    82. }

    2.聚合算子

    聚合算子说明
    按键分区(keyBy)通过指定键(key),将一条流逻辑上划分为不同的分区。分区指的是并行任务的子任务,对应着任务槽(task solt)
    简单聚合

    sum():在输入流上,对指定的字段做叠加求和的操作。

    min():在输入流上,对指定的字段求最小值。

    max():在输入流上,对指定的字段求最大值。

    minBy():在输入流上针对指定字段求最小值。

    maxBy():在输入流上针对指定字段求最大值。

    归约聚合(reduce)可以把每一个新输入的数据和当前已经归约出来的值,做聚合计算

    1. package com.qiyu.Transformation;
    2. import com.qiyu.Source.Student;
    3. import org.apache.flink.api.java.functions.KeySelector;
    4. import org.apache.flink.api.java.tuple.Tuple;
    5. import org.apache.flink.api.java.tuple.Tuple2;
    6. import org.apache.flink.streaming.api.datastream.DataStream;
    7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    8. import org.apache.flink.streaming.api.datastream.KeyedStream;
    9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    10. import java.util.ArrayList;
    11. /**
    12. * @author MR.Liu
    13. * @version 1.0
    14. * @data 2023-10-19 14:45
    15. */
    16. public class Aggregation {
    17. public static void main(String[] args) throws Exception {
    18. StreamExecutionEnvironment env =
    19. StreamExecutionEnvironment.getExecutionEnvironment();
    20. env.setParallelism(1);
    21. DataStreamSource<Tuple2<String, Integer>> stream = env.fromElements(
    22. Tuple2.of("a", 1),
    23. Tuple2.of("a", 3),
    24. Tuple2.of("b", 3),
    25. Tuple2.of("b", 4)
    26. );
    27. stream.keyBy(r -> r.f0).print();
    28. stream.keyBy(r -> r.f0).sum(1).print();
    29. stream.keyBy(r -> r.f0).min(1).print();
    30. stream.keyBy(r -> r.f0).max(1).print();
    31. stream.keyBy(r -> r.f0).maxBy(1).print();
    32. stream.keyBy(r -> r.f0).minBy(1).print();
    33. stream.keyBy(r -> r.f0).reduce(new ReduceFunction<Tuple2<String, Integer>>() {
    34. @Override
    35. public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
    36. return Tuple2.of(t1.f0, t1.f1 + t2.f1);
    37. }
    38. }).print();
    39. env.execute();
    40. }
    41. }

  • 相关阅读:
    设计模式 -- 中介者模式(17)
    通过pyinstaller将python项目打包成exe执行文件
    STM32CubeMX教程13 ADC - 单通道转换
    什么是国内生产总值(GDP)
    云计算-基础云架构(Fundamental Cloud Architectures)
    python unittest测试报告生成
    关于容器镜像那些事
    校园网页设计成品 学校班级网页制作模板 dreamweaver网页作业 简单网页课程成品 大学生静态HTML网页源码
    【无标题】
    【第十四部分】Fragment和CSSModule
  • 原文地址:https://blog.csdn.net/qq_35370485/article/details/133925939