• Flink系列文档-(YY05)-Flink编程API-多流算子


    1 多流连接 connect

      connect连接(DataStream,DataStream→ConnectedStreams)

    connect翻译成中文意为连接,可以将两个数据类型一样也可以类型不一样DataStream连接成一个新的ConnectedStreams。需要注意的是,connect方法与union方法不同,虽然调用connect方法将两个流连接成一个新的ConnectedStreams,但是里面的两个流依然是相互独立的,这个方法最大的好处是可以让两个流共享State状态,状态相关的内容在后面章节讲解

    1. DataStreamSource ds1 = see.fromElements("a", "b", "c", "d");
    2. DataStreamSource ds2 = see.fromElements(1, 2, 3, 4, 5, 6);
    3. ConnectedStreams wordAndNumber = ds1.connect(ds2);

    对ConnectedStreams调用map方法时需要传入CoMapFunction函数: 

    该接口需要指定3个泛型

    • 第一个输入DataStream的数据类型
    • 第二个输入DataStream的数据类型
    • 返回结果的数据类型。
    • 该接口需要重写两个方法:
    • map1方法,是对第1个流进行map的处理逻辑。
    • map2方法,是对2个流进行map的处理逻辑

    这两个方法必须是相同的返回值类型。指定的输出的数据类型一致.

    1. package com.blok;
    2. import org.apache.flink.configuration.Configuration;
    3. import org.apache.flink.streaming.api.datastream.ConnectedStreams;
    4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. import org.apache.flink.streaming.api.functions.co.CoMapFunction;
    8. /**
    9. * @Date: 22.11.8
    10. * @Author: Hang.Nian.YY
    11. * @qq: 598196583
    12. * @Tips: 学大数据 ,到多易教育
    13. * @Description:
    14. */
    15. public class _9Base_API_ConnectFunction{
    16. public static void main(String[] args) throws Exception {
    17. Configuration conf = new Configuration();
    18. conf.setInteger("rest.port", 8888);
    19. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    20. see.setParallelism(1);
    21. DataStreamSource ds1 = see.fromElements("a", "b", "c", "d");
    22. DataStreamSource ds2 = see.fromElements(1, 2, 3, 4, 5, 6);
    23. ConnectedStreams wordAndNumber = ds1.connect(ds2);
    24. // 针对 ConnectedStreams 以后调用的方法 传入的是 CoXXXFunction函数
    25. SingleOutputStreamOperator connectMaped = wordAndNumber.map(new CoMapFunction() {
    26. // 针对字符串 处理的是左边流的数据
    27. @Override
    28. public String map1(String value) throws Exception {
    29. return null;
    30. }
    31. // 针对字符串 处理的是右边流的数据
    32. @Override
    33. public String map2(Integer value) throws Exception {
    34. return null;
    35. }
    36. });
    37. see.execute("连接算子") ;
    38. }
    39. }

    对ConnectedStreams调用flatMap方法,调用flatMap方法,传入的Function是CoFlatMapFunction;

    这个接口要重写两个方法:

    • flatMap1方法,是对第1个流进行flatMap的处理逻辑;
    • flatMap2方法,是对2个流进行flatMap的处理逻辑;

    这两个方法都必须返回是相同的类型。

    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class _9Base_API_ConnectFunction02 {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. DataStreamSource ds1 = see.fromElements("a b c d", "e f g h", "j k l", "o p l a");
    15. DataStreamSource ds2 = see.fromElements("1 2 3 4 5" , "6 7 8 9 10");
    16. ConnectedStreams connectedStreams = ds1.connect(ds2);
    17. connectedStreams.flatMap(new CoFlatMapFunction() {
    18. @Override
    19. public void flatMap1(String value, Collector out) throws Exception {
    20. String[] split = value.split("\\s+");
    21. for (String word : split) {
    22. out.collect(word);
    23. }
    24. }
    25. @Override
    26. public void flatMap2(String value, Collector out) throws Exception {
    27. String[] split = value.split("\\s+");
    28. for (String word : split) {
    29. out.collect(word);
    30. }
    31. }
    32. }) ;
    33. see.execute("连接算子") ;
    34. }
    35. }

    2 多流合并 

    该方法可以将两个或者多个数据类型一致的DataStream合并成一个DataStream。DataStream union(DataStream… streams)可以看出DataStream的union方法的参数为可变参数,即可以合并两个或多个数据类型一致的DataStream。

    下面的例子是使用fromElements生成两个DataStream,一个是基数的,一个是偶数的,然后将两个DataStream合并成一个DataStream。

    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class _10Base_API_Union {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. DataStreamSource odd = see.fromElements(1, 3, 5, 7, 9);
    15. DataStreamSource even = see.fromElements(2, 4, 6, 8, 10);
    16. // 将两个流合并在一起
    17. DataStream union = odd.union(even);
    18. union.print("所有的数据: ");
    19. see.execute("合并union算子");
    20. }
    21. }

    3  分流操作 - 测流输出

    以下function函数,支持将特定数据输出到侧流中:凡是process的函数都有测流输出

    1. ProcessFunction
    2. KeyedProcessFunction
    3. CoProcessFunction
    4. KeyedCoProcessFunction
    5. ProcessWindowFunction
    6. ProcessAllWindowFunction
    1. /**
    2. * @Date: 22.11.8
    3. * @Author: Hang.Nian.YY
    4. * @qq: 598196583
    5. * @Tips: 学大数据 ,到多易教育
    6. * @Description:
    7. */
    8. public class _11Base_API_SideOut {
    9. public static void main(String[] args) throws Exception {
    10. Configuration conf = new Configuration();
    11. conf.setInteger("rest.port", 8888);
    12. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    13. see.setParallelism(1);
    14. DataStreamSource ds = see.fromElements(
    15. new YY2(1, "DY", "NM_BT", 100),
    16. new YY2(2, "XY", "NM_BT", 100),
    17. new YY2(3, "HH", "SD_HZ", 10),
    18. new YY2(4, "XH", "SD_HZ", 12)
    19. );
    20. final OutputTag sideOut = new OutputTag("not good"){};
    21. // 使用测流 将不及格的那家伙和优秀的分开
    22. // process方法支持测流输出
    23. SingleOutputStreamOperator processed = ds.process(new ProcessFunction() {
    24. @Override
    25. public void processElement(YY2 value, ProcessFunction.Context ctx, Collector out) throws Exception {
    26. if (value.getScore() < 60) { // 将指定规则的不及格的用户 输出到测流
    27. ctx.output(sideOut, value);
    28. } else { // 将及格的用户输出到主流中 [你们本来就不是一个世界的人, 就不应该有交集]
    29. out.collect(value);
    30. }
    31. }
    32. });
    33. DataStream sideOutput = processed.getSideOutput(sideOut);
    34. sideOutput.print("测流输出:-->不优秀的你:") ;
    35. processed.print("主流数据:-->优秀的你:") ;
    36. see.execute("连接算子");
    37. }
    38. }

    4 协同分组

    两个流按照指定的属性分别分组 ,将分组后的数据放在一起处理

    1. package com.blok;
    2. import org.apache.flink.api.common.functions.CoGroupFunction;
    3. import org.apache.flink.api.common.typeinfo.TypeHint;
    4. import org.apache.flink.api.java.tuple.Tuple2;
    5. import org.apache.flink.api.java.tuple.Tuple3;
    6. import org.apache.flink.configuration.Configuration;
    7. import org.apache.flink.streaming.api.datastream.DataStream;
    8. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    9. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    10. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    11. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    12. import org.apache.flink.streaming.api.windowing.time.Time;
    13. import org.apache.flink.util.Collector;
    14. /**
    15. * @Date: 22.11.8
    16. * @Author: Hang.Nian.YY
    17. * @qq: 598196583
    18. * @Tips: 学大数据 ,到多易教育
    19. * @Description: coGroup 协同分组
    20. * 将两个流按照特定的规则进行分组 , 两个流相同的组数据关联
    21. */
    22. public class _12Base_API_Cogroup {
    23. public static void main(String[] args) throws Exception {
    24. Configuration conf = new Configuration();
    25. conf.setInteger("rest.port", 8888);
    26. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    27. see.setParallelism(1);
    28. /**
    29. * 1 加载原始数据
    30. */
    31. // 输入的数据格式 id,name
    32. DataStreamSource ds1 = see.socketTextStream("linux01", 8899);
    33. // 输入的数据格式是 id,event,city
    34. DataStreamSource ds2 = see.socketTextStream("linux01", 9988);
    35. /**
    36. * 2 处理加载的数据成元组
    37. */
    38. //id,name
    39. SingleOutputStreamOperator> users = ds1.map(line -> {
    40. String[] arr = line.split(",");
    41. return Tuple2.of(arr[0], arr[1]);
    42. }).returns(new TypeHint>() {
    43. });
    44. // id,event,city
    45. SingleOutputStreamOperator> events = ds2.map(line -> {
    46. String[] arr = line.split(",");
    47. return Tuple3.of(arr[0], arr[1], arr[2]);
    48. }).returns(new TypeHint>() {
    49. });
    50. //利用coGroup算子,来实现两个流的数据按id相等进行窗口关联(包含inner ,left, right, outer)
    51. DataStream res = users.coGroup(events)
    52. .where(tp -> tp.f0)
    53. .equalTo(tp -> tp.f0)
    54. .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    55. //只要在窗口函数之后才有apply算子
    56. .apply(new CoGroupFunction, Tuple3, String>() {
    57. @Override
    58. public void coGroup(Iterable> users, Iterable> events, Collector out) throws Exception {
    59. /**
    60. * 实现 left join
    61. */
    62. for (Tuple2 user : users) {
    63. boolean flag = false;
    64. for (Tuple3 event : events) {
    65. out.collect(user.f0 + "," + user.f1 + "," + event.f0 + "," + event.f1 + "," + event.f2);
    66. flag = true;
    67. }
    68. //说明没有事件
    69. if (!flag) {
    70. out.collect(user.f0 + "," + user.f1 + ",null,null,null");
    71. }
    72. }
    73. }
    74. });
    75. res.print("left_join") ;
    76. see.execute() ;
    77. }
    78. }

    5 join算子 

    用于关联两个流(类似于sql中join) , 需要指定join的条件;需要在窗口中进行关联后的逻辑计算;

    1. package com.blok;
    2. import org.apache.flink.api.common.functions.CoGroupFunction;
    3. import org.apache.flink.api.common.functions.JoinFunction;
    4. import org.apache.flink.api.common.typeinfo.TypeHint;
    5. import org.apache.flink.api.java.tuple.Tuple2;
    6. import org.apache.flink.api.java.tuple.Tuple3;
    7. import org.apache.flink.configuration.Configuration;
    8. import org.apache.flink.streaming.api.datastream.DataStream;
    9. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    10. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    12. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    13. import org.apache.flink.streaming.api.windowing.time.Time;
    14. import org.apache.flink.util.Collector;
    15. /**
    16. * @Date: 22.11.8
    17. * @Author: Hang.Nian.YY
    18. * @qq: 598196583
    19. * @Tips: 学大数据 ,到多易教育
    20. * @Description: join 两个流进行关联
    21. */
    22. public class _13Base_API_Join {
    23. public static void main(String[] args) throws Exception {
    24. Configuration conf = new Configuration();
    25. conf.setInteger("rest.port", 8888);
    26. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    27. see.setParallelism(1);
    28. /**
    29. * 1 加载原始数据
    30. */
    31. // 输入的数据格式 id,name
    32. DataStreamSource ds1 = see.socketTextStream("linux01", 8899);
    33. // 输入的数据格式是 id,event,city
    34. DataStreamSource ds2 = see.socketTextStream("linux01", 9988);
    35. /**
    36. * 2 处理加载的数据成元组
    37. */
    38. //id,name
    39. SingleOutputStreamOperator> users = ds1.map(line -> {
    40. String[] arr = line.split(",");
    41. return Tuple2.of(arr[0], arr[1]);
    42. }).returns(new TypeHint>() {
    43. });
    44. // id,event,city
    45. SingleOutputStreamOperator> events = ds2.map(line -> {
    46. String[] arr = line.split(",");
    47. return Tuple3.of(arr[0], arr[1], arr[2]);
    48. }).returns(new TypeHint>() {
    49. });
    50. /**
    51. * 使用join算子将两个数据流关联在一起
    52. */
    53. DataStream res = users.join(events)
    54. .where(tp -> tp.f0)
    55. .equalTo(tp -> tp.f0)
    56. .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 划分处理数据的窗口
    57. .apply(new JoinFunction, Tuple3, String>() {
    58. @Override
    59. public String join(Tuple2 t1, Tuple3 t2) throws Exception {
    60. return t1.f0 + "," + t1.f1 + "," + t2.f0 + "," + t2.f1 + "," + t2.f2;
    61. }
    62. });
    63. res.print("join-res: ") ;
    64. see.execute() ;
    65. }
    66. }

    只有同一个窗口中的数据才会触发相对应的数据关联计算 

    代码中设置不同的窗口类型  ,触发不同的计算时机

    1. // 滚动窗口
    2. // .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 划分处理数据的窗口
    3. // .window(TumblingProcessingTimeWindows.of(Time.seconds(30) , Time.seconds(10)))
    4. // 滑动窗口
    5. .window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
    6. //会话窗口
    7. // .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
    • tumbling window  滚动

    1. DataStream res = users.join(events)
    2. .where(tp -> tp.f0)
    3. .equalTo(tp -> tp.f0)
    4. // 滚动窗口
    5. // .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 划分处理数据的窗口
    6. .apply(new JoinFunction, Tuple3, String>() {
    7. @Override
    8. public String join(Tuple2 t1, Tuple3 t2) throws Exception {
    9. return t1.f0 + "," + t1.f1 + "," + t2.f0 + "," + t2.f1 + "," + t2.f2;
    10. }
    11. });

    • sliding window滑动

    1. DataStream res = users.join(events)
    2. .where(tp -> tp.f0)
    3. .equalTo(tp -> tp.f0)
    4. // 滑动窗口
    5. .window(SlidingProcessingTimeWindows.of(Time.seconds(30), Time.seconds(10)))
    6. .apply(new JoinFunction, Tuple3, String>() {
    7. @Override
    8. public String join(Tuple2 t1, Tuple3 t2) throws Exception {
    9. return t1.f0 + "," + t1.f1 + "," + t2.f0 + "," + t2.f1 + "," + t2.f2;
    10. }
    11. });
    • session window 会话

    1. DataStream res = users.join(events)
    2. .where(tp -> tp.f0)
    3. .equalTo(tp -> tp.f0)
    4. //会话窗口
    5. // .window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
    6. .apply(new JoinFunction, Tuple3, String>() {
    7. @Override
    8. public String join(Tuple2 t1, Tuple3 t2) throws Exception {
    9. return t1.f0 + "," + t1.f1 + "," + t2.f0 + "," + t2.f1 + "," + t2.f2;
    10. }
    11. });

     6 广播流

    Broadcast State 是 Flink 1.5 引入的新特性。

    在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有task 时,就可以使用 Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到另一个数据流的计算中 。

      API 介绍  , 核心要点

    1. 将需要广播出去的流,调用broadcast方法进行广播转换,得到广播流BroadCastStream
    2. 然后在主流上调用connect算子,来连接广播流(以实现广播状态的共享处理)
    3. 在连接流上调用process算子,就会在同一个ProcessFunciton中提供两个方法分别对两个流进行处理,并在这个ProcessFunction内实现“广播状态”的共享
    1. package com.blok;
    2. import org.apache.flink.api.common.functions.JoinFunction;
    3. import org.apache.flink.api.common.state.BroadcastState;
    4. import org.apache.flink.api.common.state.MapStateDescriptor;
    5. import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    6. import org.apache.flink.api.common.typeinfo.TypeHint;
    7. import org.apache.flink.api.common.typeinfo.TypeInformation;
    8. import org.apache.flink.api.java.tuple.Tuple2;
    9. import org.apache.flink.api.java.tuple.Tuple3;
    10. import org.apache.flink.configuration.Configuration;
    11. import org.apache.flink.streaming.api.datastream.*;
    12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    13. import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    14. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
    15. import org.apache.flink.streaming.api.windowing.time.Time;
    16. import org.apache.flink.util.Collector;
    17. import javax.swing.*;
    18. /**
    19. * @Date: 22.11.8
    20. * @Author: Hang.Nian.YY
    21. * @qq: 598196583
    22. * @Tips: 学大数据 ,到多易教育
    23. * @Description: join 两个流进行关联
    24. */
    25. public class _14Base_API_BroadCast {
    26. public static void main(String[] args) throws Exception {
    27. Configuration conf = new Configuration();
    28. conf.setInteger("rest.port", 8888);
    29. StreamExecutionEnvironment see = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    30. see.setParallelism(1);
    31. /**
    32. * 1 加载原始数据
    33. */
    34. // 输入的数据格式 id,name
    35. DataStreamSource ds1 = see.socketTextStream("linux01", 8899);
    36. // 输入的数据格式是 id,event,city
    37. DataStreamSource ds2 = see.socketTextStream("linux01", 9988);
    38. /**
    39. * 2 处理加载的数据成元组
    40. */
    41. //id,event
    42. SingleOutputStreamOperator> events = ds1.map(line -> {
    43. String[] arr = line.split(",");
    44. return Tuple2.of(arr[0], arr[1]);
    45. }).returns(new TypeHint>() {
    46. });
    47. // id,name,city
    48. SingleOutputStreamOperator> users = ds2.map(line -> {
    49. String[] arr = line.split(",");
    50. return Tuple3.of(arr[0], arr[1], arr[2]);
    51. }).returns(new TypeHint>() {
    52. });
    53. /**
    54. * 示例代码 :
    55. * 流1 用户行为事件 , 一个用户可能有很多不同的行为事件 出现的时间 出现的次数不确定
    56. * 流2 用户信息 同一个用户信息数据只会来一次 ,但是来的时间不确定
    57. *
    58. * 将用户信息数据 封装成广播流
    59. */
    60. //将用户信息数据 封装成广播流
    61. MapStateDescriptor> userInfo = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint>() {}));
    62. BroadcastStream> broadcast = users.broadcast(userInfo);
    63. // 要想使用广播流 , 主流要和广播流进行connect
    64. BroadcastConnectedStream, Tuple3> connected = events.connect(broadcast);
    65. // 使用 process方法 处理关联了广播流的连接流数据
    66. connected.process(new BroadcastProcessFunction, Tuple3, String>() {
    67. /**
    68. * 处理主流中的数据
    69. * @throws Exception
    70. */
    71. @Override
    72. public void processElement(Tuple2 value, BroadcastProcessFunction, Tuple3, String>.ReadOnlyContext ctx, Collector out) throws Exception {
    73. // 从上下文对象中 获取广播状态 这个广播状态是只读的
    74. ReadOnlyBroadcastState> bc = ctx.getBroadcastState(userInfo);
    75. if (bc!=null){
    76. Tuple2 user = bc.get(value.f0);
    77. if(user!=null){
    78. out.collect(value.f0+","+user.f0+","+user.f1+","+value.f1);
    79. }else{
    80. out.collect(value.f0+",null ,null"+value.f1);
    81. }
    82. }else{ // 广播变量中没有 用户信息
    83. out.collect(value.f0+",null ,null"+value.f1);
    84. }
    85. }
    86. /**
    87. * 操作广播数据 ,将广播数据存储在共享状态中
    88. * @throws Exception
    89. */
    90. @Override
    91. public void processBroadcastElement(Tuple3 value, BroadcastProcessFunction, Tuple3, String>.Context ctx, Collector out) throws Exception {
    92. // 从上下文对象中 获取广播状态对象 (可以读写的广播流状态)
    93. BroadcastState> bc = ctx.getBroadcastState(userInfo);
    94. // 将每条广播数据 存储在广播状态中
    95. bc.put(value.f0, Tuple2.of(value.f1, value.f2));
    96. }
    97. });
    98. }
    99. }

  • 相关阅读:
    计数排序【java实现】
    ARM 架构是什么?
    预训练Bert添加new token的问题
    网站被劫持勒索怎么办
    人力资源数字化转型,是企业高质量发展的关键
    java 调用 wkhtmltopdf
    Python 异步编程原理篇之新旧协程实现对比
    企业知识库管理系统怎么做?
    推推:产品的规划和商业化分析
    1829. 每个查询的最大异或值
  • 原文地址:https://blog.csdn.net/qq_37933018/article/details/127751269