• Flink系列文档-(YY04)-Flink编程基础API-Transformation算子


    大多处理数据的流程是   1)加载数据源数据   2)转换处理  3) 输出结果 

    1 映射类算子

    1.1 map算子

    map(new MapFunction )

    MapFunction: (x)-> y   [1条变1条]

    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class Base_API_MapFunction {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. // 加载网络数据流
    15. DataStreamSource ds = see.socketTextStream("linux01", 9999);
    16. // 使用转换算子 map 处理数据 一条一条的处理数据
    17. /**
    18. * 示例一
    19. */
    20. SingleOutputStreamOperator sos = ds.map(new MapFunction() {
    21. // 每条数据调用一次
    22. @Override
    23. public String map(String line) throws Exception {
    24. // 将每条数据转换成大写
    25. return line.toUpperCase();
    26. }
    27. });
    28. /**
    29. * 示例二
    30. */
    31. SingleOutputStreamOperator> sos2 = ds.map(new MapFunction>() {
    32. @Override
    33. public Tuple2 map(String line) throws Exception {
    34. // 将接收的数据 封装成二元组
    35. String[] split = line.split("\\s+");
    36. Tuple2 tp2 = Tuple2.of(split[0], split[1]);
    37. return tp2;
    38. }
    39. });
    40. /**
    41. * 示例三
    42. * MapFunction 接口中只有一个抽象各个 可以使用Lamda表达式的方式处理数据
    43. * public interface MapFunction extends Function, Serializable {
    44. * O map (T value) throws Exception;
    45. * }
    46. *}
    47. */
    48. SingleOutputStreamOperator> sos3= ds.map(line->{
    49. String[] arr = line.split("\\s+");
    50. return Tuple2.of(arr[0] , arr[1]) ;
    51. }) .returns(TypeInformation.of(new TypeHint>() {})) ; // 指定返回值的数据类型
    52. // 或者 .returns(new TypeHint>() {}) ; // 指定返回值的数据类型
    53. sos3.print("map后的数据: ") ;
    54. see.execute() ;
    55. }
    56. }

    如果是调用map方法时传入Lambda表达式,需要在调用map方法后,在调用returns方法指定返回的数据的类型。不然Flink无法自动推断出返回的数据类型,会出现异常。

    1.2 flatMap扁平映射 

    flatMap( new FlatMapFcuntion)

    FlatMapFunction: x-> x1, x2,x3,x4  [1条变多条,并展平]

    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class Base_API_FlatMapFunction {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. // 加载网络数据流
    15. DataStreamSource ds = see.socketTextStream("linux01", 9999);
    16. /**
    17. * 示例一
    18. * 将接收到的一行数据 扁平化处理
    19. * 组装成 (单词和1)
    20. */
    21. SingleOutputStreamOperator> soo = ds.flatMap(new FlatMapFunction>() {
    22. @Override
    23. public void flatMap(String line, Collector> out) throws Exception {
    24. String[] arr = line.split("\\s+");
    25. for (String word : arr) {
    26. Tuple2 tp = Tuple2.of(word, 1);
    27. // 接收到一条数据 ,将一条数据转成成 多条数据后 使用Collector 收集多条数据
    28. out.collect(tp);
    29. }
    30. }
    31. });
    32. /**
    33. * 示例二
    34. * 使用lambda 表达式处理数据
    35. * 不会自动推断返回值数据 类型 可以使用returns 指定返回值数据类型
    36. */
    37. SingleOutputStreamOperator> soo2 = ds.flatMap((String line, Collector> out) -> {
    38. String[] arr = line.split("\\s+");
    39. for (String s : arr) {
    40. out.collect(Tuple2.of(s, 1));
    41. }
    42. }).returns(Types.TUPLE(Types.STRING, Types.INT));
    43. soo2.print("扁平化后的数据: ");
    44. see.execute("flatMap函数示例") ;
    45. }
    46. }

    1.3 project 投影

    该算子只能对Tuple类型数据使用,project方法的功能类似sql中的"select 字段";

    该方法只有Java的API有,Scala的API没此方法。

    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class _3Base_API_Project {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. /**
    15. * project 投影(DataStream → DataStream)
    16. * 该算子只能对Tuple类型数据使用,project方法的功能类似sql中的"select 字段";
    17. * 该方法只有Java的API有,Scala的API没此方法。
    18. */
    19. DataStreamSource> ds = see.fromElements(
    20. Tuple4.of(1, "YY", "F", 100),
    21. Tuple4.of(2, "DY", "F", 99)
    22. );
    23. // 处理每条数据 ,返回每条数据中的指定位置的属性值
    24. // 只要 id 和 name
    25. SingleOutputStreamOperator res = ds.project(0, 1);
    26. res.print() ;
    27. see.execute("project函数") ;
    28. }
    29. }

    2 过滤算子

      filter过滤(DataStream → DataStream)

    filter(new FilterFunction)

    FilterFunction :  x -> true/false

    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class _4Base_API_FilterFunction {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. // 获取数据源
    15. DataStreamSource ds = see.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
    16. /**
    17. * 过滤出偶数
    18. */
    19. SingleOutputStreamOperator res = ds.filter(new FilterFunction() {
    20. @Override
    21. public boolean filter(Integer value) throws Exception {
    22. return value % 2 == 0;
    23. }
    24. });
    25. //使用 lambda表达式 过滤出奇数
    26. SingleOutputStreamOperator res2 = ds.filter(e -> {
    27. return e % 2 == 1;
    28. });
    29. /**
    30. * 示例 -----
    31. * 过滤出分数大于60的人
    32. */
    33. DataStreamSource yyds = see.fromElements(
    34. new YY(1, "DY", 100),
    35. new YY(2, "XY", 100),
    36. new YY(3, "HH", 10),
    37. new YY(4, "XH", 12)
    38. );
    39. SingleOutputStreamOperator res3 = yyds.filter(new FilterFunction() {
    40. @Override
    41. public boolean filter(YY yy) throws Exception {
    42. return yy.getScore() > 60;
    43. }
    44. });
    45. res3.print();
    46. see.execute("filter function");
    47. }
    48. }
    49. @Data
    50. @NoArgsConstructor
    51. @AllArgsConstructor
    52. @ToString
    53. class YY {
    54. private int id;
    55. private String name;
    56. private double score;
    57. }

    3 分组算子 

     keyBy按key分组(DataStream → KeyedStream)

    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class _5Base_API_KeyBy {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. // 加载网络数据流
    15. DataStreamSource ds = see.socketTextStream("linux01", 9999);
    16. SingleOutputStreamOperator> wordAndOne = ds.flatMap(new FlatMapFunction>() {
    17. @Override
    18. public void flatMap(String line, Collector> out) throws Exception {
    19. String[] arr = line.split("\\s+");
    20. for (String word : arr) {
    21. Tuple2 tp = Tuple2.of(word, 1);
    22. // 接收到一条数据 ,将一条数据转成成 多条数据后 使用Collector 收集多条数据
    23. out.collect(tp);
    24. }
    25. }
    26. });
    27. /**
    28. * 对数据流进行分组
    29. * -- 按照单词分组
    30. */
    31. // 按照单词分组
    32. wordAndOne.keyBy(0) ;
    33. // 按照单词分组 KeyedStream
    34. KeyedStream, String> res = wordAndOne.keyBy(new KeySelector, String>() {
    35. @Override
    36. public String getKey(Tuple2 value) throws Exception {
    37. return value.f0;
    38. }
    39. });
    40. //根据自定义数据类型中的某个属性进行分组
    41. DataStreamSource ds2 = see.fromElements(
    42. new YY2(1, "DY", "NM_BT", 100),
    43. new YY2(2, "XY", "NM_BT", 100),
    44. new YY2(3, "HH", "SD_HZ", 10),
    45. new YY2(4, "XH", "SD_HZ", 12)
    46. );
    47. /**
    48. * 对数据流进行分组
    49. * -- 根据Bean的属性
    50. */
    51. ds2.keyBy(new KeySelector() {
    52. @Override
    53. public String getKey(YY2 value) throws Exception {
    54. return value.getCity();
    55. }
    56. }) ;
    57. ds2.keyBy(YY2::getCity) ;
    58. res.print() ;
    59. see.execute() ;
    60. }
    61. }
    62. @Data
    63. @NoArgsConstructor
    64. @AllArgsConstructor
    65. @ToString
    66. class YY2 {
    67. private int id;
    68. private String name;
    69. private String city ;
    70. private double score;
    71. }

    4 滚动聚合算子

    1. 此处所说的滚动聚合算子,是多个聚合算子的统称,有sum、min、minBy、max、maxBy;
    2. 这些算子的底层逻辑都是维护一个聚合值,并使用每条流入的数据对聚合值进行滚动更新;
    3. 这些算子都只能在KeyedStream上调用(就是必须keyby后调用);

     4.1 sum

    1. package com.blok;
    2. import lombok.AllArgsConstructor;
    3. import lombok.Data;
    4. import lombok.NoArgsConstructor;
    5. import lombok.ToString;
    6. import org.apache.flink.api.common.functions.FlatMapFunction;
    7. import org.apache.flink.api.java.functions.KeySelector;
    8. import org.apache.flink.api.java.tuple.Tuple2;
    9. import org.apache.flink.configuration.Configuration;
    10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    11. import org.apache.flink.streaming.api.datastream.KeyedStream;
    12. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    13. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    14. import org.apache.flink.util.Collector;
    15. /**
    16. * @Date: 22.11.8
    17. * @Author: Hang.Nian.YY
    18. * @qq: 598196583
    19. * @Tips: 学大数据 ,到多易教育
    20. * @Description:
    21. */
    22. public class _6Base_API_Sum{
    23. public static void main(String[] args) throws Exception {
    24. Configuration conf = new Configuration();
    25. conf.setInteger("rest.port", 8898);
    26. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    27. see.setParallelism(1);
    28. // 加载网络数据流
    29. DataStreamSource ds = see.socketTextStream("linux01", 9999);
    30. SingleOutputStreamOperator> wordAndOne = ds.flatMap(new FlatMapFunction>() {
    31. @Override
    32. public void flatMap(String line, Collector> out) throws Exception {
    33. String[] arr = line.split("\\s+");
    34. for (String word : arr) {
    35. Tuple2 tp = Tuple2.of(word, 1);
    36. // 接收到一条数据 ,将一条数据转成成 多条数据后 使用Collector 收集多条数据
    37. out.collect(tp);
    38. }
    39. }
    40. });
    41. /**
    42. * 对数据流进行分组
    43. * -- 按照单词分组
    44. */
    45. // 按照单词分组
    46. wordAndOne.keyBy(0) ;
    47. // 按照单词分组 KeyedStream
    48. KeyedStream, String> res = wordAndOne.keyBy(new KeySelector, String>() {
    49. @Override
    50. public String getKey(Tuple2 value) throws Exception {
    51. return value.f0;
    52. }
    53. });
    54. //------------------------------------------------------------------------------
    55. // 滚动聚合: 随着数据的流出 结果数据源源的进行数据叠加
    56. // 统计单词出现的次数
    57. SingleOutputStreamOperator> sum = res.sum("1");
    58. SingleOutputStreamOperator> sum2 = res.sum(1);
    59. //------------------------------------------------------------------------------
    60. //根据自定义数据类型中的某个属性进行分组
    61. DataStreamSource ds2 = see.fromElements(
    62. new YY2(1, "DY", "NM_BT", 100),
    63. new YY2(2, "XY", "NM_BT", 100),
    64. new YY2(3, "HH", "SD_HZ", 10),
    65. new YY2(4, "XH", "SD_HZ", 12)
    66. );
    67. /**
    68. * 对数据流进行分组
    69. * -- 根据Bean的属性
    70. */
    71. KeyedStream keyed = ds2.keyBy(new KeySelector() {
    72. @Override
    73. public String getKey(YY2 value) throws Exception {
    74. return value.getCity();
    75. }
    76. });
    77. //------------------------------------------------------------------------------
    78. // 滚动聚合: 随着数据的流出 结果数据源源的进行数据叠加
    79. // 统计每组的总分 根据组内Bean的属性
    80. SingleOutputStreamOperator score = keyed.sum("score");
    81. score.print() ;
    82. //------------------------------------------------------------------------------
    83. see.execute() ;
    84. }
    85. }

    4.2 min/minBy/max/maxBy

    这两个算子都是求最小值;min和minBy的区别在于:

    1. min的返回值,最小值字段以外,其他字段是第一条输入数据的值;
    2. minBy返回值,就是最小值字段所在的那条数据;

    底层原理:滚动更新时是更新一个字段,还是更新整条数据的区别;

    1. package com.blok;
    2. import org.apache.flink.api.common.functions.FlatMapFunction;
    3. import org.apache.flink.api.common.functions.MapFunction;
    4. import org.apache.flink.api.java.functions.KeySelector;
    5. import org.apache.flink.api.java.tuple.Tuple2;
    6. import org.apache.flink.configuration.Configuration;
    7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    8. import org.apache.flink.streaming.api.datastream.KeyedStream;
    9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    11. import org.apache.flink.util.Collector;
    12. /**
    13. * @Date: 22.11.8
    14. * @Author: Hang.Nian.YY
    15. * @qq: 598196583
    16. * @Tips: 学大数据 ,到多易教育
    17. * @Description:
    18. */
    19. public class _7Base_API_MaxMin {
    20. public static void main(String[] args) throws Exception {
    21. Configuration conf = new Configuration();
    22. conf.setInteger("rest.port", 8898);
    23. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    24. see.setParallelism(1);
    25. // 加载网络数据流
    26. DataStreamSource ds = see.socketTextStream("linux01", 9999);
    27. /**
    28. * public class YY2 {
    29. * private int id;
    30. * private String name;
    31. * private String city ;
    32. * private double score;
    33. * }
    34. */
    35. SingleOutputStreamOperator beans = ds.map(new MapFunction() {
    36. @Override
    37. public YY2 map(String value) throws Exception {
    38. String[] arr = value.split(",");
    39. YY2 yy = new YY2(Integer.parseInt(arr[0]), arr[1], arr[2], Double.parseDouble(arr[3]));
    40. return yy;
    41. }
    42. });
    43. //将 数据按照城市分组
    44. KeyedStream keyed = beans.keyBy(YY2::getCity);
    45. //---------------------------------------------------------------
    46. /**
    47. * min 返回的是第一条数据 但是 会修改第一条数据 指定的属性信息
    48. * max 返回的是第一条数据 但是 会修改第一条数据 指定的属性信息
    49. *
    50. * 分数最小的数据: 1,yy1,NM_BT,98
    51. * 分数最大的数据: 1,yy1,NM_BT,100
    52. */
    53. // 获取分数最低的 信息
    54. SingleOutputStreamOperator minScoreInfo = keyed.min("score");
    55. SingleOutputStreamOperator maxScoreInfo = keyed.max("score");
    56. /**
    57. * 测试数据
    58. * 1,yy1,NM_BT,99
    59. * 2,yy2,NM_BT,100
    60. * 3,yy3,NM_BT,98
    61. * 4,yy4,NM_BT,98.5
    62. * 1,hh1,SD_HZ,99
    63. * 2,hh2,SD_HZ,100
    64. * 3,hh3,SD_HZ,98
    65. * 4,hh4,SD_HZ,98.5
    66. * 5,hh5,SD_HZ,101
    67. */
    68. //---------------------------------------------------------------
    69. /**
    70. * maxBy minBy返回的就是那条指定属性最大(最小)的数据
    71. *
    72. * 分数最大的数据: > YY2(id=2, name=yy2, city=NM_BT, score=100.0)
    73. * 分数最小的数据: > YY2(id=3, name=yy3, city=NM_BT, score=98.0)
    74. */
    75. SingleOutputStreamOperator minScoreInfoBy = keyed.minBy("score");
    76. SingleOutputStreamOperator maxScoreInfoBy = keyed.maxBy("score");
    77. minScoreInfoBy.print("分数最小的数据: ") ;
    78. maxScoreInfoBy.print("分数最大的数据: ") ;
    79. see.execute() ;
    80. }
    81. }

    4.3 reduce

    它的滚动聚合逻辑没有写死,而是由用户通过ReduceFunction来传入。

    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class _8Base_API_Reduce {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8898);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. // 加载网络数据流
    15. DataStreamSource ds = see.socketTextStream("linux01", 9999);
    16. /**
    17. * public class YY2 {
    18. * private int id;
    19. * private String name;
    20. * private String city ;
    21. * private double score;
    22. * }
    23. */
    24. SingleOutputStreamOperator beans = ds.map(new MapFunction() {
    25. @Override
    26. public YY2 map(String value) throws Exception {
    27. String[] arr = value.split(",");
    28. YY2 yy = new YY2(Integer.parseInt(arr[0]), arr[1], arr[2], Double.parseDouble(arr[3]));
    29. return yy;
    30. }
    31. });
    32. // 将数据beans分组
    33. KeyedStream keyed = beans.keyBy(YY2::getCity);
    34. SingleOutputStreamOperator reduced = keyed.reduce(new ReduceFunction() {
    35. @Override
    36. public YY2 reduce(YY2 value1, YY2 value2) throws Exception {
    37. YY2 yy2 = new YY2();
    38. yy2.setScore(value1.getScore() + value2.getScore());
    39. yy2.setCity(value1.getCity());
    40. return yy2;
    41. }
    42. });
    43. reduced.print("聚合后的结果");
    44. see.execute() ;
    45. }
    46. }

  • 相关阅读:
    标签类目体系(面向业务的数据资产设计方法论)-读书笔记1
    使用VBA快速完成不规则数据整理
    14、JAVA入门——方法和构造方法
    【小题练手】---Java基础
    职场小白如何将图片转文字?这个方法建议收藏使用!
    Kotlin高仿微信-第2篇-登录
    PendingIntent
    Ant-Design-Pro-V5 :ProTable自定义搜索菜单操作栏和搜索事件、列表工具栏操作。
    计算机网络原理 运输层
    Android组件化神器 --- ARouter
  • 原文地址:https://blog.csdn.net/qq_37933018/article/details/127741870