• Flink核心API之Table API和SQL


    Table API和SQL是一种关系型 API,用户可以像操作 Mysql 数据库表一样的操作数据

    一、在pom中引入依赖

    如果你想要使用Table API 和SQL的话,需要添加下面的依赖

    1. <dependency>
    2.   <groupId>org.apache.flink</groupId>
    3.   <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    4.   <version>1.11.0</version>
    5. </dependency>
    6. <dependency>
    7.   <groupId>org.apache.flink</groupId>
    8.   <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    9.   <version>1.11.0</version>
    10. </dependency>


    如果你想在 本地 IDE中运行程序,还需要添加下面的依赖

    1. <dependency>
    2.   <groupId>org.apache.flink</groupId>
    3.   <artifactId>flink-table-planner-blink_2.11</artifactId>
    4.   <version>1.11.0</version>
    5. </dependency>
    1. <dependency>
    2.     <groupId>org.apache.flink</groupId>
    3.     <artifactId>flink-table-planner_2.11</artifactId>
    4.     <version>1.11.1</version>
    5. </dependency>


    如果你用到了老的执行引擎,还需要添加下面这个依赖

        org.apache.flink
        flink-table-planner_2.11
        1.11.1

    二、创建tableEnviroment对象

    2.1 如果Table API和SQL不需要和DataStream或者DataSet互相转换

    则针对stream和batch都可以使用TableEnvironment

    1. //指定底层使用blink引擎,及数据处理模式-stream
    2. val sSetting = EnvironmentSettings.newInstance().useBlinkPlanner()
    3. .inStreamingMode().build()
    4. val sTableEnv = TableEnvironment.create(sSetting)
    5. //指定底层使用blink引擎,及数据处理模式-batch
    6. val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner()
    7. .inBatchMode().build()
    8. //创建TableEnvironment对象
    9. val sTableEnv = TableEnvironment.create(sSettings)

    2.2 如果Table API和SQL需要和DataStream或者DataSet互相转换

    •  针对stream需要使用StreamTableEnvironment
    •  针对batch需要使用BatchTableEnvironment
    1. #针对stream需要使用StreamTableEnvironment
    2. val ssEnv = StreamExecutionEnvironment.getExecutionEnvironment
    3. val ssSettings = EnvironmentSettings.newInstance().useBlinkPlanner().
    4. inStreamingMode().build()
    5. val ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings)
    6. #针对batch需要使用BatchTableEnvironment
    7. val bbEnv = ExecutionEnvironment.getExecutionEnvironment
    8. val bb = BatchTableEnvironment.create(bbEnv);

    三、Table API和 SQL的使用

    3.1 创建表

    1. sTableEnv.executeSql("create table myTable(id int,name String)\n" +
    2. ") with(\n" +
    3. "'connector.type'='filesystem',\n" +
    4. "'connector.path'='D:\\bigdata/source',\n" +
    5. "'connector.type'='csv'\n"+
    6. ")"
    7. )
    • connector.type’ = ‘filesystem’说明外部连接器的类型为文件系统,
    • ‘connector.path’ ='D:\\bigdata/source’指定了外部文件系统连接器的文件路径,
    • ‘format.type’ = 'csv’则说明文件系统的数据格式为CSV。
    • csv是以纯文本的形式存储表格数据,逗号分隔值文件格式

    3.2 使用sql 实现数据查询和过滤操作

      val result = sTableEnv.sqlQuery("select id,name from myTable where id>1")

     3.3 使用table实现数据查询和过滤操作

    1. import org.apache.flink.table.api._
    2. val result = sTableEnv.from("myTable").select($"id",$"name").filter($"id" >1)

    四、异常:Static methods in interface require -target:jvm-1.8

     解决办法:在File -> Settings中按图设置即可:-target:jvm-1.8

     

  • 相关阅读:
    Springboot毕设项目基于Java的Cisco网络安全设备采购平台wl7jy(java+VUE+Mybatis+Maven+Mysql)
    栈(Stack)的概念+MyStack的实现+栈的应用
    Cocos2dx 安装运行
    zabbix基础环境部署
    Vue3.3指北(一)
    前端随笔0:URL与状态的双向绑定
    计算机毕业设计java+ssm医院医疗救助系统的设计与实现
    Guava Cache介绍-面试用
    北大青鸟培训后端培训第二天
    事件循环 Event Loop
  • 原文地址:https://blog.csdn.net/libaowen609/article/details/126463352