• Flink 流程处理和批处理开发


    流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理。

    批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点。

    Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型:

    • Flink以固定的缓存块为单位进行网络数据传输,用户可以通过设置缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟
    • 如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量
    • 同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量 
       

    一、流处理系统

    流处理Streaming
    • StreamExecutionEnvironment
    • DataStreaming

    需求 通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合 统计,并且把时间窗口内计算的结果打印出来

    代码:

    1. def main(args: Array[String]): Unit = {
    2. val env = StreamExecutionEnvironment.getExecutionEnvironment
    3. val text = env.socketTextStream("192.168.221.131", 9001)
    4. import org.apache.flink.api.scala._
    5. val wordCount = text.flatMap(_.split(" "))//将每一行数据根据空格切分单词
    6. .map((_,1)) //每一个单词转换为tuple2的形式(单词,1)
    7. .keyBy(tup=>tup._1) //官方推荐使用keyselector选择器选择数据
    8. .timeWindow(Time.seconds(2)) //时间窗口为2秒,表示每隔2秒钟计算一次接收到的数
    9. .sum(1)
    10. wordCount.print().setParallelism(1)
    11. //执行程序
    12. env.execute("SocketWindowWord")
    13. }

    在服务器上输出nc - l 9001,如图所示,然后启动项目

    数据hello you  hello me hello you

    idea控制台可以看到如下效果

    二、批处理系统

    批处理Streaming
    • ExecutionEnvironment

    需求:统计指定文件中单词出现的总次数

    代码:

    1. def main(args: Array[String]): Unit = {
    2. val env = ExecutionEnvironment.getExecutionEnvironment
    3. val inputPath="hdfs://192.168.221.131:9000/hello.txt"
    4. val outPath="hdfs://192.168.221.131:9000/out082001"
    5. val text = env.readTextFile(inputPath)
    6. import org.apache.flink.api.scala._
    7. val wordCount= text.flatMap(_.split(" "))
    8. .map((_,1)).groupBy(0).sum(1).setParallelism(1)
    9. wordCount.writeAsCsv(outPath,"\n","") //指定行分割符和字段分割符
    10. env.execute("BatchWordCount")
    11. }

    结果:

     

    三、异常处理

    3.1 flink编译报错

    错误信息:Error:(18, 33) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]

    解决方法:

    推荐的做法是在代码中引入以下包:

    import org.apache.flink.streaming.api.scala._

    但是没有解决问题,最后发现是Scala版本问题。自己的idea的scala版本和pom.xml中的不一致!!!

    我的版本为 2.11.12

    然后看下pom.xml文件,flink对应的scala版本是2.12系列,所以报错。flink对应的scala版本是2.11系列即可解决问题,如下图所示

    3.2 启动异常

    异常信息:

    Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
    Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/.
     

    解决方法:在pom文件引入

    
        org.apache.hadoop
        hadoop-client
        3.2.0
    

    即可解决问题

  • 相关阅读:
    elementui修改message消息提示颜色
    skywalking9.2.0源码修改
    多人协作多版本开发冲突的正确解决姿势
    H5+Vue3编写官网,并打包发布到同一个域名下
    前缀树及AC自动机
    原码、反码、补码
    [Windows环境]nvm工具的介绍和安装
    2022最新android设备uuid、udid使用教程​
    ts的函数
    ElasticSearch入门到入土手册请查阅
  • 原文地址:https://blog.csdn.net/libaowen609/article/details/126433947