• Flink中DataStream、DataSet和Table之间的互相转换


    一、DataStream转Table

    通过TableEnvironment ,可 以 把 DataStream 或 者 DataSet 注 册 为 Table , 这 样 就 可 以 使 用 Table API 和 SQL 查 询 了 。 通 过 TableEnvironment 也可以把Table对象转换为DataStream或者DataSet,这样就可以使用DataStream 或者DataSet中的相关API了

       1.1 使用DataStream创建view视图

    1. val ssENV = StreamExecutionEnvironment.getExecutionEnvironment
    2. val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner()
    3. .inStreamingMode().build()
    4. val ssTableEnv = StreamTableEnvironment.create(ssENV, ssSettings)
    5. //获取DataStream
    6. import org.apache.flink.api.scala._
    7. val stream = ssENV.fromCollection(Array((1, "java"), (2, "tom"), (3, "mac")))
    8. import org.apache.flink.table.api._
    9. ssTableEnv.createTemporaryView("myTable",stream,fields = 'id,'name)
    10. ssTableEnv.sqlQuery("select * from myTable where id>1") .execute().print()

    结果:

    1.2 使用DataStream创建table对象

    1. val ssENV = StreamExecutionEnvironment.getExecutionEnvironment
    2. val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner()
    3. .inStreamingMode().build()
    4. val ssTableEnv = StreamTableEnvironment.create(ssENV, ssSettings)
    5. //获取DataStream
    6. import org.apache.flink.api.scala._
    7. val stream = ssENV.fromCollection(Array((1, "java"), (2, "tom"), (3, "mac")))
    8. import org.apache.flink.table.api._
    9. val table = ssTableEnv.fromDataStream(stream, fields = 'id,'name)
    10. table.select($"id", $"name").filter($"id" > 1).execute().print()

    结果:

    二、DataSet转Table

    此时只能使用旧的执行引擎,新的blink执行引擎不支持和DataSet转换

      1.1 使用DataSet创建view视图

    1. def main(args: Array[String]): Unit = {
    2. val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    3. val bbTable= BatchTableEnvironment.create(bbEnv);
    4. import org.apache.flink.api.scala._
    5. val stream = bbEnv.fromCollection(Array((1, "java"), (2, "tom"), (3, "mac")))
    6. //第一种 将DataStream 转换成view试图
    7. import org.apache.flink.table.api._
    8. bbTable.createTemporaryView("myTable",stream,fields = 'id,'name)
    9. bbTable.sqlQuery("select * from myTable where id>1") .execute().print()
    10. }

    1.2 使用DataSet创建table对象

    1. def main(args: Array[String]): Unit = {
    2. val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    3. val bbTableEvc= BatchTableEnvironment.create(bbEnv);
    4. import org.apache.flink.api.scala._
    5. val stream = bbEnv.fromCollection(Array((1, "java"), (2, "tom"), (3, "mac")))
    6. import org.apache.flink.table.api._
    7. val table = bbTableEvc.fromDataSet(stream, fields = 'id,'name)
    8. table.select($"id", $"name").filter($"id" > 1).execute().print()
    9. }

    三、将table转换成 DataStream

    流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此 动态查询的DataStream需要对表的更新进行编码。 有几种模式可以将Table转换为DataStream。

    • Append Mode:这种模式只适用于当动态表仅由INSERT更改修改时(仅附加),之前添加的数据不会被更新。
    • Retract Mode:可以始终使用此模式,它使用一个Boolean标识来编码INSERT和DELETE更改。
    1. def main(args: Array[String]): Unit = {
    2. val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    3. val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    4. val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
    5. ssTableEnv.executeSql("" +
    6. "create table myTable(\n" +
    7. "id int,\n" +
    8. "name string\n" +
    9. ") with (\n" +
    10. "'connector.type' = 'filesystem',\n" +
    11. "'connector.path' = 'D:\\bigdata\\source',\n" +
    12. "'format.type' = 'csv'\n" +
    13. ")")
    14. val table = ssTableEnv.from("myTable")
    15. import org.apache.flink.api.scala._
    16. //如果只有新增(追加)操作,可以使用toAppendStream
    17. val appStream = ssTableEnv.toAppendStream[Row](table)
    18. appStream.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString)).print()
    19. //如果有增加操作,还有删除操作,则使用toRetractStream
    20. val retStream = ssTableEnv.toRetractStream[Row](table)
    21. retStream.map(tup=>{
    22. val flag = tup._1
    23. val row = tup._2
    24. val id = row.getField(0).toString.toInt
    25. val name = row.getField(1).toString
    26. (flag,id,name)
    27. }).print()
    28. //注意:将table对象转换为DataStream之后,就需要调用StreamExecutionEnvironment
    29. ssEnv.execute("TableToDataStreamScala")
    30. }

     四、将table转换成 DataSet

    1. def main(args: Array[String]): Unit = {
    2. val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    3. val bbTableEnv = BatchTableEnvironment.create(bbEnv)
    4. bbTableEnv.executeSql("" +
    5. "create table myTable(\n" +
    6. "id int,\n" +
    7. "name string\n" +
    8. ") with (\n" +
    9. "'connector.type' = 'filesystem',\n" +
    10. "'connector.path' = 'D:\\bigdata\\source',\n" +
    11. "'format.type' = 'csv'\n" +
    12. ")")
    13. //获取table
    14. val table = bbTableEnv.from("myTable")
    15. //table转换为DataSet
    16. import org.apache.flink.api.scala._
    17. val set = bbTableEnv.toDataSet[Row](table)
    18. set.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString))
    19. .print()
    20. }

     

  • 相关阅读:
    LaneNet 论文阅读
    Java手写背包问题算法
    bootz启动 Linux内核过程中涉及的 do_bootm_states 函数
    PHP foreach 循环跳过本次循环
    SpringBoot主启动类使用@ComponentScans、@ComponentScan扫描组件类,注意避坑
    修过书上的建网站用的CMS程序源码,增加在文章中插入图片功能
    【数之道 08】走进“卷积神经网络“,了解图像识别背后的原理
    网安学习-Python3
    前端开发中基于Web Speech API(speechSynthesis接口)实现文字转语音功能
    MS SQL Server问题汇总
  • 原文地址:https://blog.csdn.net/libaowen609/article/details/126466886