通过TableEnvironment ,可 以 把 DataStream 或 者 DataSet 注 册 为 Table , 这 样 就 可 以 使 用 Table API 和 SQL 查 询 了 。 通 过 TableEnvironment 也可以把Table对象转换为DataStream或者DataSet,这样就可以使用DataStream 或者DataSet中的相关API了
- val ssENV = StreamExecutionEnvironment.getExecutionEnvironment
-
- val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner()
- .inStreamingMode().build()
-
- val ssTableEnv = StreamTableEnvironment.create(ssENV, ssSettings)
- //获取DataStream
- import org.apache.flink.api.scala._
- val stream = ssENV.fromCollection(Array((1, "java"), (2, "tom"), (3, "mac")))
- import org.apache.flink.table.api._
- ssTableEnv.createTemporaryView("myTable",stream,fields = 'id,'name)
- ssTableEnv.sqlQuery("select * from myTable where id>1") .execute().print()
结果:
- val ssENV = StreamExecutionEnvironment.getExecutionEnvironment
-
- val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner()
- .inStreamingMode().build()
-
- val ssTableEnv = StreamTableEnvironment.create(ssENV, ssSettings)
-
- //获取DataStream
- import org.apache.flink.api.scala._
- val stream = ssENV.fromCollection(Array((1, "java"), (2, "tom"), (3, "mac")))
-
- import org.apache.flink.table.api._
- val table = ssTableEnv.fromDataStream(stream, fields = 'id,'name)
- table.select($"id", $"name").filter($"id" > 1).execute().print()
结果:
此时只能使用旧的执行引擎,新的blink执行引擎不支持和DataSet转换
- def main(args: Array[String]): Unit = {
-
- val bbEnv = ExecutionEnvironment.getExecutionEnvironment
- val bbTable= BatchTableEnvironment.create(bbEnv);
-
- import org.apache.flink.api.scala._
- val stream = bbEnv.fromCollection(Array((1, "java"), (2, "tom"), (3, "mac")))
- //第一种 将DataStream 转换成view试图
- import org.apache.flink.table.api._
- bbTable.createTemporaryView("myTable",stream,fields = 'id,'name)
- bbTable.sqlQuery("select * from myTable where id>1") .execute().print()
- }
- def main(args: Array[String]): Unit = {
-
- val bbEnv = ExecutionEnvironment.getExecutionEnvironment
- val bbTableEvc= BatchTableEnvironment.create(bbEnv);
- import org.apache.flink.api.scala._
- val stream = bbEnv.fromCollection(Array((1, "java"), (2, "tom"), (3, "mac")))
- import org.apache.flink.table.api._
-
- val table = bbTableEvc.fromDataSet(stream, fields = 'id,'name)
- table.select($"id", $"name").filter($"id" > 1).execute().print()
- }
流式查询的结果Table会被动态地更新,即每个新的记录到达输入流时结果就会发生变化。因此,转换此 动态查询的DataStream需要对表的更新进行编码。 有几种模式可以将Table转换为DataStream。
- def main(args: Array[String]): Unit = {
- val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
- val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
- val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
-
- ssTableEnv.executeSql("" +
- "create table myTable(\n" +
- "id int,\n" +
- "name string\n" +
- ") with (\n" +
- "'connector.type' = 'filesystem',\n" +
- "'connector.path' = 'D:\\bigdata\\source',\n" +
- "'format.type' = 'csv'\n" +
- ")")
-
- val table = ssTableEnv.from("myTable")
-
- import org.apache.flink.api.scala._
- //如果只有新增(追加)操作,可以使用toAppendStream
- val appStream = ssTableEnv.toAppendStream[Row](table)
- appStream.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString)).print()
-
- //如果有增加操作,还有删除操作,则使用toRetractStream
- val retStream = ssTableEnv.toRetractStream[Row](table)
- retStream.map(tup=>{
- val flag = tup._1
- val row = tup._2
- val id = row.getField(0).toString.toInt
- val name = row.getField(1).toString
- (flag,id,name)
- }).print()
- //注意:将table对象转换为DataStream之后,就需要调用StreamExecutionEnvironment
- ssEnv.execute("TableToDataStreamScala")
-
- }

- def main(args: Array[String]): Unit = {
-
- val bbEnv = ExecutionEnvironment.getExecutionEnvironment
- val bbTableEnv = BatchTableEnvironment.create(bbEnv)
-
- bbTableEnv.executeSql("" +
- "create table myTable(\n" +
- "id int,\n" +
- "name string\n" +
- ") with (\n" +
- "'connector.type' = 'filesystem',\n" +
- "'connector.path' = 'D:\\bigdata\\source',\n" +
- "'format.type' = 'csv'\n" +
- ")")
- //获取table
- val table = bbTableEnv.from("myTable")
- //将table转换为DataSet
- import org.apache.flink.api.scala._
- val set = bbTableEnv.toDataSet[Row](table)
- set.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString))
- .print()
-
- }
