• Flink核心API之DataStream


    DataStream API主要分为3块:DataSource、Transformation、DataSink

    • DataSource是程序的输入数据源。
    • Transformation是具体的操作,它对一个或多个输入数据源进行计算处理,例如map、flatMap和filter 等操作。
    • DataSink是程序的输出,它可以把Transformation处理之后的数据输出到指定的存储介质中。

    一、常见的DataStream API之Transformation

    • map 输入一个元素进行处理,返回一个元素
    • flatMap 输入一个元素进行处理,可以返回多个元素
    • filter 对数据进行过滤,符合条件的数据会被留下
    • keyBy 根据key分组,相同key的数据会进入同一个分区
    • reduce 对当前元素和上一次的结果进行聚合操作
    • aggregations sum(),min(),max()等
    • union 合并多个流,多个流的数据类型必须一致
    • connect 只能连接两个流,两个流的数据类型可以不同
    • split 根据规则把一个数据流切分为多个流
    • shuffle 随机分区
    • rebalance 对数据集进行再平衡,重分区,消除数据倾斜
    • rescale 重分区
    • partitionCustom 自定义分区

    1.1、union 示例

    代码:

    1. def main(args: Array[String]): Unit = {
    2. val env = StreamExecutionEnvironment.getExecutionEnvironment
    3. import org.apache.flink.api.scala._
    4. val text1 = env.fromCollection(Array(1, 2, 3, 4, 5))
    5. val text2 = env.fromCollection(Array(6, 7, 8, 9, 10))
    6. val unionStream = text1.union(text2)
    7. unionStream.print().setParallelism(1)
    8. env.execute("StreamUnionScala")
    9. }

    结果:

     

    1.2、connect:只能连接两个流,两个流的数据类型可以不同

    两个流被connect之后,只是被放到了同一个流中,它们内部依然保持各自的数据和形式不发生任何变 化,两个流相互独立。 connect方法会返回connectedStream,在connectedStream中需要使用CoMap、CoFlatMap这种函 数,类似于map和flatmap        

    1. val text1 = env.fromElements("user:libaowen,age:18")
    2. val text2 = env.fromElements("user:jack_age:20")
    3. val connectionStream = text1.connect(text2)
    4. connectionStream.map(new CoMapFunction[String, String, String] {
    5. override def map1(value: String): String = {
    6. value.replace(",", "-")
    7. }
    8. override def map2(value: String): String = {
    9. value.replace("_ ", "-")
    10. }
    11. }).print().setParallelism(1)

    结果:

    1.3、split和side output

    split切分的流无法进行二次切分,并且split方法已经标记为过时了,官方不推荐使用,现在官方推荐 使用side output的方式实现。

    1. def main(args: Array[String]): Unit = {
    2. val env = StreamExecutionEnvironment.getExecutionEnvironment
    3. val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    4. //按照数据的奇偶性对数据进行分流
    5. //首先定义两个sideoutput来准备保存切分出来的数据
    6. val outputTag1 = new OutputTag[Int]("even") {} //保存偶数
    7. val outputTag2 = new OutputTag[Int]("odd") {} //保存奇数
    8. val subOutputStream = text.process(new ProcessFunction[Int, Int] {
    9. override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, collector: Collector[Int]): Unit = {
    10. if (value <= 5) {
    11. ctx.output(outputTag1, value)
    12. } else {
    13. ctx.output(outputTag2, value)
    14. }
    15. }
    16. })
    17. val lowStream = subOutputStream.getSideOutput(outputTag1)
    18. //获取大于5的数据流
    19. val highStream = subOutputStream.getSideOutput(outputTag2)
    20. lowStream.print().setParallelism(1)
    21. highStream.print().setParallelism(1)
    22. env.execute("StreamUnionScala")
    23. }

    结果:

     

    1.4、random、rebalance、rescale、broadcast

    • random  随机分区,它表示将上游数据随机分发到下游算子实例的每个分区中,在代码层面体现是调用 shuffle()函数
    • rebalance:重新平衡分区(循环分区),我觉得叫循环分区更好理解,它表示对数据集进行再平衡,消除 数据倾斜,为每个分区创建相同的负载,其实就是通过循环的方式给下游算子实例的每个分区分配数据, 在代码层面体现是调用rebalance()函数
    • rescale重分区 
    • broadcast:广播分区,将上游算子实例中的数据输出到下游算子实例的每个分区中,适合用于大数据集

    rescale与rebalance的区别是rebalance会产生全量重分区,而rescale不会。

            

     

     

  • 相关阅读:
    309. 最佳买卖股票时机含冷冻期
    记录一次线上zookeeper连接数耗尽拒绝连接的问题处理
    小米路由器青春版(R1CL)刷高恪软路由系统
    Java新特性(2):Java 10以后
    画图带你彻底弄懂三级缓存和循环依赖的问题
    [计算机入门] Windows附件程序介绍(工具类)
    webGoat目录访问控制路径
    哈希表相关知识
    [JAVAee]Spring项目的创建与基本使用
    使用JPA和Hibernate查询分页
  • 原文地址:https://blog.csdn.net/libaowen609/article/details/126445206