• FlinkSql之TableAPI详解


    一、FlinkSql的概念

    核心概念

    Flink 的 Table APISQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解.

    动态表和连续查询

    动态表(Dynamic Tables) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。

    与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

    TableAPI

    首先需要导入依赖

     <dependency>
         <groupId>org.apache.flinkgroupId>
         <artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
         <version>${flink.version}version>
         <scope>providedscope>
     dependency>
     <dependency>
         <groupId>org.apache.flinkgroupId>
         <artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
         <version>${flink.version}version>
         <scope>providedscope>
     dependency>
     <dependency>
         <groupId>org.apache.flinkgroupId>
         <artifactId>flink-csvartifactId>
         <version>${flink.version}version>
     dependency>
     <dependency>
         <groupId>org.apache.flinkgroupId>
         <artifactId>flink-jsonartifactId>
         <version>${flink.version}version>
     dependency>
     
     
     <dependency>
         <groupId>org.apache.commonsgroupId>
         <artifactId>commons-compressartifactId>
         <version>1.21version>
     dependency>
     /**
      * 使用TableAPI的基本套路:
      * 1.创建表的执行环境
      * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
      * 3.对动态表进行查询
      * 4.把动态表转换为流
      */

    这里需要注意的问题:

    1.TableAPI 中将动态表转换为流时有两种方法

     DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);

    toAppendStream方法只能在查询时使用,不能使用包含聚合函数等更新语句

     DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);

    toRetractStream则可以使用

    2.上述两种方法内传入的参数Row.class,表示将表中查询出的数据封装为行类型,也就是对每行进行封装,解决查询出的数据列少于或者多于原表。如何能够确保所查询的数据与之前封装的Bean有完全一致的结构则也可以封装为原Bean.class

    代码实现:

     package net.cyan.FlinkSql;
     
     import net.cyan.POJO.WaterSensor;
     import org.apache.flink.api.common.functions.FilterFunction;
     import org.apache.flink.api.java.tuple.Tuple2;
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.streaming.api.datastream.DataStream;
     import org.apache.flink.streaming.api.datastream.DataStreamSource;
     import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
     import org.apache.flink.table.api.Table;
     import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
     import org.apache.flink.types.Row;
     
     import static org.apache.flink.table.api.Expressions.$;
     
     /**
      * 使用TableAPI的基本套路:
      * 1.创建表的执行环境
      * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
      * 3.对动态表进行查询
      * 4.把动态表转换为流
      */
     public class Demo1 {
         public static void main(String[] args) {
             Configuration configuration=new Configuration();
             configuration.setInteger("rest.port",3333);
             //创建执行环境
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
             env.setParallelism(1);
             //模拟数据
             DataStreamSource<WaterSensor> WaterSensorSource = env.fromElements(
                     new WaterSensor("S1", 1000L, 10),
                     new WaterSensor("S1", 1000L, 10),
                     new WaterSensor("S2", 2000L, 20),
                     new WaterSensor("S3", 3000L, 30),
                     new WaterSensor("S4", 4000L, 40),
                     new WaterSensor("S5", 5000L, 50)
            );
             //创建表的执行环境
             StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
             //创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取
             Table table = tableEnvironment.fromDataStream(WaterSensorSource);
             //对表进行查询
             //1、过时的查询书写
             Table result = table
                    .where("id='S1'")
                    .select("*");
             //2、不过时的书写
             Table result1 = table
     //               .where($("id").isEqual("S1"))
                    .select($("id"), $("ts"), $("vc"));
             //3.聚合函数
             Table select = table
                    .groupBy($("id"))
                    .aggregate($("vc").sum().as("sum_vc"))
                    .select($("id"), $("sum_vc"));
             //把动态表转换为流,使用到了之前创建的表运行环境
     
             SingleOutputStreamOperator<Row> tuple2DataStream = tableEnvironment
                    .toRetractStream(select, Row.class)
                    .filter(t -> t.f0)
                    .map(t -> t.f1);
     //       DataStream rowDataStream = tableEnvironment.toAppendStream(result, Row.class);
     //       DataStream rowDataStream1 = tableEnvironment.toAppendStream(result1, Row.class);
     //       rowDataStream.print();
     //       rowDataStream1.print();
             tuple2DataStream.print();
     
     
             try {
                 //启动执行环境
                 env.execute();
            } catch (Exception e) {
                 e.printStackTrace();
            }
     
     
     
        }
     }

     

    二、TableAPI读取文件

    使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名

     package net.cyan.FlinkSql;
     
     import org.apache.flink.configuration.Configuration;
     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
     
     import org.apache.flink.table.api.DataTypes;
     import org.apache.flink.table.api.Table;
     import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
     import org.apache.flink.table.descriptors.Csv;
     import org.apache.flink.table.descriptors.FileSystem;
     import org.apache.flink.table.descriptors.Schema;
     import org.apache.flink.table.types.DataType;
     
     import static org.apache.flink.table.api.Expressions.$;
     
     public class Demo2_readWriteText {
         public static void main(String[] args) {
             //创建执行环境
     //       Configuration configuration = new Configuration();
     //       configuration.setInteger("rest.port", 3333);
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
             env.setParallelism(1);
             StreamTableEnvironment talEnv = StreamTableEnvironment.create(env);
             //创建查询的数据结果封装类型
             Schema schema = new Schema()
                    .field("id", DataTypes.STRING())
                    .field("ts", DataTypes.BIGINT())
                    .field("vc", DataTypes.INT());
             talEnv
                    .connect(new FileSystem().path("input/sensor.txt"))  //读取文件路径
                    .withFormat(new Csv()) //读取文件的数据格式
                    .withSchema(schema) //读取出来的数据格式
                    .createTemporaryTable("sensor");//定义结果表名
     
             //进行查询
             Table select = talEnv.from("sensor")
                    .where($("id").isEqual("sensor_1"))
                    .select($("id"), $("ts"), $("vc"));
     
     
             //将查询结果写入到新文件中
             //同样建立一个动态表连接
             talEnv
                    .connect(new FileSystem().path("input/b.txt"))  //写入路径
                    .withFormat(new Csv()) //写入文件的数据格式
                    .withSchema(schema) //写入的数据格式
                    .createTemporaryTable("abc");//定义写入表名
             //进行写入操作
     
             select.executeInsert("abc");
     
     //       try {
     //           //启动执行环境
     //           env.execute();
     //       } catch (Exception e) {
     //           e.printStackTrace();
     //       }
     
        }
     }

     

    三、TableAPI 读取、写入Kakfa

    基本流程

    1>需要创建表的运行环境

     //创建表的运行环境
     StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

    2>创建查询出的数据写出结构

     //创建表结构
     Schema schema=new Schema()
            .field("id",DataTypes.STRING())
            .field("ts",DataTypes.BIGINT())
            .field("vc",DataTypes.INT());

    3> 创建kafka连接

     //创建kafka连接
     tabEnv.connect(
             new Kafka()
            .version("universal")// 版本号
            .property("bootstrap.servers","hadoop102:9092")//地址
            .property("group.id","cy")//消费者组
            .topic("first")//消费主题
     
      )
            .withFormat(new Json())//写入的格式
            .withSchema(schema)
            .createTemporaryTable("a");//临时表

    4> 进行查询

     //创建表
     Table select = tabEnv.from("a").select("*");

    5> 创建写入kafka连接

     //创建写入主题
     tabEnv.connect(
             new Kafka()
                    .version("universal")// 版本号
                    .property("bootstrap.servers","hadoop102:9092")//地址
                    .topic("first1")//消费主题
                    .sinkPartitionerRoundRobin()//随机分区
     
     )
            .withFormat(new Json())//写入的格式
            .withSchema(schema)
            .createTemporaryTable("c");

    6> 写入

     //写入
     select.executeInsert("c");

    完整代码如下

     package net.cyan.FlinkSql;
     
     import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
     import org.apache.flink.table.api.DataTypes;
     import org.apache.flink.table.api.Table;
     import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
     import org.apache.flink.table.descriptors.Json;
     import org.apache.flink.table.descriptors.Kafka;
     import org.apache.flink.table.descriptors.Schema;
     
     public class Demo5_readWriteKafka {
         public static void main(String[] args) {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
             //创建表的运行环境
             StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
             //创建表结构
             Schema schema=new Schema()
                    .field("id",DataTypes.STRING())
                    .field("ts",DataTypes.BIGINT())
                    .field("vc",DataTypes.INT());
             //创建kafka连接
             tabEnv.connect(
                     new Kafka()
                    .version("universal")// 版本号
                    .property("bootstrap.servers","hadoop102:9092")//地址
                    .property("group.id","cy")//消费者组
                    .topic("first")//消费主题
     
              )
                    .withFormat(new Json())//写入的格式
                    .withSchema(schema)
                    .createTemporaryTable("a");
             //创建表
             Table select = tabEnv.from("a").select("*");
             //创建写入主题
             tabEnv.connect(
                     new Kafka()
                            .version("universal")// 版本号
                            .property("bootstrap.servers","hadoop102:9092")//地址
                            .topic("first1")//消费主题
                            .sinkPartitionerRoundRobin()//随即分区
     
            )
                    .withFormat(new Json())//写入的格式
                    .withSchema(schema)
                    .createTemporaryTable("c");
     
             //写入
             select.executeInsert("c");
     
     
        }
     }
     
  • 相关阅读:
    Power BI 傻瓜入门 6. 从动态数据源获取数据
    网络安全(黑客技术)自学笔记
    微服务项目:尚融宝(32)(后端搭建:会员列表搭建(3))
    H264基本原理
    仅11w粉涨800w播放,UP主在B站新分区带货变现!
    Overload和Override的区别说明
    基于单片机的声光控制节能灯设计
    【自学开发之旅】Flask-数据查询-数据序列化-数据库关系(四)
    软件测试的就业前景到底怎么样?
    ESP32 之 ESP-IDF 教学(十七)——组件依赖
  • 原文地址:https://www.cnblogs.com/CYan521/p/16845742.html