• FlinkSQL系列03-表定义


    表定义要素:表定义包含表名和表描述器(TableDescriptor),其中表名包含3个部分:
    catalog_name
    database_name
    object_name

    一 TableDescriptor

    TableDescriptor 包含4个方面:

    • Schema 表结构
    • Format 数据格式
    • Connector 连接器
    • Option 连接器参数

    1 Schema

    表结构中的字段分为:物理字段、表达式字段、元数据字段、主键约束

    1.1 physical column

    物理字段:源自于“外部存储”系统本身 schema 中的字段
    
    • 1

    kafka 消息的 key、value(json 格式)中的字段;
    mysql 表中的字段;hive 表中的字段;parquet 文件中的字段……

    1.2 computed column

    表达式字段(逻辑字段):在物理字段上施加一个 sql 表达式,并将表达式结果定义为一个字段
    
    • 1
    • TableApi 中的定义方式

      Schema.newBuilder()
      // 声明表达式字段 age_exp, 它来源于物理字段 age+10
      .columnByExpression("age_exp", "age+10")
      
      • 1
      • 2
      • 3
    • Sql DDL 中的定义方式

      CREATE TABLE MyTable (
      `user_id` BIGINT,
      `price` DOUBLE,
      `quantity` DOUBLE,
      `cost` AS price * quantity, -- cost 来源于: price*quantity
      ) WITH (
      'connector' = 'kafka'
      ...
      );
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    1.3 metadata column

    元数据字段:来源于 connector 从外部存储系统中获取到的“外部系统元信息”
    
    • 1

    比如,kafka 的消息,通常意义上的数据内容是在 record 的 key 和 value 中的,而实质上(底层角度来看),kafka 中的每一条 record,不光带了 key 和 value 数据内容,还带了这条 record 所属的 topic,所属的 partition,所在的 offset,以及 record 的 timetamp 和 timestamp 类型等“元信息”。而 flink 的 connector 可以获取并暴露这些元信息,并允许用户将这些信息定义成 flinksql 表中的字段;

    • TableApi 中的定义方式

      Schema.newBuilder()
      .columnByMetadata("topic",DataTypes.STRING())
      
      • 1
      • 2
    • Sql DDL 中的定义方式

      CREATE TABLE MyTable (
      `user_id` BIGINT,
      `name` STRING,
      -- 元数据字段, 来源于 kafka record 的 timestamp
      `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
      ) WITH (
      'connector' = 'kafka'
      ...
      );
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9

    1.4 主键约束

    • 单字段主键约束语法:

      id INT PRIMARY KEY NOT ENFORCED,
      name STRING
      
      • 1
      • 2
    • 多字段主键约束语法:

      id,
      name,
      PRIMARY KEY(id,name) NOT ENFORCED
      
      • 1
      • 2
      • 3

    完整示例
    Table API

    // 建表(数据源表)
    // {"id":4,"name":"zs","nick":"tiedan","age":18,"gender":"male"}
    tenv.createTable("t_person",
            TableDescriptor
                    .forConnector("kafka")
                    .schema(Schema.newBuilder()
                            .column("id", DataTypes.INT())   // column是声明物理字段到表结构中来
                            .column("name", DataTypes.STRING())   // column是声明物理字段到表结构中来
                            .column("nick", DataTypes.STRING())   // column是声明物理字段到表结构中来
                            .column("age", DataTypes.INT())   // column是声明物理字段到表结构中来
                            .column("gender", DataTypes.STRING())   // column是声明物理字段到表结构中来
                            .columnByExpression("guid","id")  // 声明表达式字段
                            /*.columnByExpression("big_age",$("age").plus(10))*/     // 声明表达式字段
                            .columnByExpression("big_age","age + 10")  // 声明表达式字段
                            // isVirtual 是表示: 当这个表被sink表时,该字段是否出现在schema中
                            .columnByMetadata("offs",DataTypes.BIGINT(),"offset",true)  // 声明元数据字段
                            .columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true)  // 声明元数据字段
                            /*.primaryKey("id","name")*/
                            .build())
                    .format("json")
                    .option("topic","mytopic")
                    .option("properties.bootstrap.servers","hdp01:9092")
                    .option("properties.group.id","g1")
                    .option("scan.startup.mode","earliest-offset")
                    .option("json.fail-on-missing-field","false")
                    .option("json.ignore-parse-errors","true")
                    .build()
    );
    
    tenv.executeSql("select * from t_person").print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    SQL DDL

    // 建表(数据源表)
    // {"id":4,"name":"zs","nick":"tiedan","age":18,"gender":"male"}
    tenv.executeSql(
            "create table t_person                                          "
                    + " (                                                   "
                    + "   id int ,                                          "  // -- 物理字段
                    + "   name string,                                      "  // -- 物理字段
                    + "   nick string,                                      "
                    + "   age int,                                          "
                    + "   gender string ,                                   "
                    + "   guid as id,                                       "  // -- 表达式字段(逻辑字段)
                    + "   big_age as age + 10 ,                             "  // -- 表达式字段(逻辑字段)
                    + "   offs bigint metadata from 'offset' ,             "   // -- 元数据字段
                    + "   ts TIMESTAMP_LTZ(3) metadata from 'timestamp',    "   // -- 元数据字段
                    /*+ "   PRIMARY KEY(id,name) NOT ENFORCED                 "*/    // -- 主键约束
                    + " )                                                   "
                    + " WITH (                                              "
                    + "  'connector' = 'kafka',                             "
                    + "  'topic' = 'mytopic',                              "
                    + "  'properties.bootstrap.servers' = 'hdp01:9092',   "
                    + "  'properties.group.id' = 'g1',                      "
                    + "  'scan.startup.mode' = 'earliest-offset',           "
                    + "  'format' = 'json',                                 "
                    + "  'json.fail-on-missing-field' = 'false',            "
                    + "  'json.ignore-parse-errors' = 'true'                "
                    + " )                                                   "
    );
    
    tenv.executeSql("desc t_person").print();
    tenv.executeSql("select * from t_person where id>2").print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    2 format

    connector 连接器在对接外部存储时,根据外部存储中的数据格式不同,需要用到不同的 format 组件。
    format 组件的作用就是:告诉连接器,如何解析外部存储中的数据及映射到表 schema。

    format 组件的使用要点

    • 导入 format 组件的 jar 包依赖
    • 指定 format 组件的名称
    • 设置 format 组件所需的参数(不同 format 组件有不同的参数配置需求)

    2.1 json format 详解

    所需依赖

    <dependency>
    	<groupId>org.apache.flinkgroupId>
    	<artifactId>flink-jsonartifactId>
    	<version>1.15.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可用参数
    format 组件名:json
    json.fail-on-missing-field 缺失字段是否失败
    json.ignor-parse-errors 是否忽略 json 解析错误
    json.timestamp-format.standard json 中的 timestamp 类型字段的格式
    json.map-null-key.mode 可取: FAIL ,DROP, LITERAL
    json.map-null-key.literal 替换 null 的字符串
    json.encode.decimal-as-plain-number
    ……

    json 示例
    样例数据: {“id”:12,“name”:{“nick”:“doe3”,“formal”:“doit edu3”,“height”:170}}

    Table API

    tenv.createTable("t_json",
            TableDescriptor
                    .forConnector("filesystem")
                    .schema(Schema.newBuilder()
                            .column("id", DataTypes.INT())
                            .column("name", DataTypes.ROW(
                                    DataTypes.FIELD("nick", DataTypes.STRING()),
                                    DataTypes.FIELD("formal", DataTypes.STRING()),
                                    DataTypes.FIELD("height", DataTypes.INT())
                            ))
                            .build())
                    .format("json")
                    .option("path","data/json/qiantao")
                    .build());
    tenv.executeSql("select id, name.formal, name.height from t_json2").print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    样例数据:{"id":1,"friends":[{"name":"a","info":{"addr":"bj","gender":"male"}},{"name":"b","info":{"addr":"sh","gender":"female"}}]}
    
    • 1

    SQL DDL

    tenv.executeSql(
            "create table t_json(                                                 "
                    + "   id  int,                                                "
                    + "   friends array>> "
                    + ")with(                                      "
                    + " 'connector' = 'filesystem',                "
                    + " 'path' = 'data/json/qiantao2/',            "
                    + " 'format'='json'                            "
                    + ")                                           "
    );
    tenv.executeSql("select id," +
            "friends[1].name as name1, friends[1].info['addr'] as addr1, friends[1].info['gender'] as gender1,   " +
            "friends[2].name as name2, friends[2].info['addr'] as addr2, friends[2].info['gender'] as gender2    " +
            "from  t_json")/*.print()*/;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    2.2 csv format 详解

    所需依赖

    <dependency>
    	<groupId>org.apache.flinkgroupId>
    	<artifactId>flink-csvartifactId>
    	<version>1.15.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可用参数
    format = csv
    csv.field-delimiter = ‘,’
    csv.disable-quote-character = false
    csv.quote-character = ’ " ’
    csv.allow-comments = false
    csv.ignore-parse-erros = false 是否忽略解析错误
    csv.array-element-delimiter = ’ ; ’ 数组元素之间的分隔符
    csv.escape-character = none 转义字符
    csv.null-literal = none null 的字面量字符串

    csv 示例

    tenv.executeSql(
            "create table t_csv(                                   "
                    + "  id int,                                   "
                    + "  name string,                              "
                    + "  age  string                               "
                    + ") with (                                    "
                    + " 'connector' = 'filesystem',                "
                    + " 'path' = 'data/csv/',                      "
                    + " 'format'='csv',                            "
                    + " 'csv.disable-quote-character' = 'false',   "
                    + " 'csv.quote-character' = '|',               "
                    + "  'csv.ignore-parse-errors' = 'true' ,      "
                    + "  'csv.null-literal' = 'AA'    ,            "   // 将数据中的 AA 当成空处理
                    + "  'csv.allow-comments' = 'true'             "
                    + ")                                           "
    );
    
    tenv.executeSql("select * from  t_csv").print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3 connector

    connector 概述

    • connector 通常是用于对接外部存储建表(源表或目标表) 时的映射器、 桥接器
    • connector 本质上是对 flink 的 table source /table sink 算子的封装

    连接器使用的核心要素:

    1. 导入连接器 jar 包依赖
    2. 指定连接器类型名
    3. 指定连接器所需的参数(不同连接器有不同的参数配置需求)
    4. 获取连接器所提供的元数据

    FlinkSql 内置支持的 connector
    在这里插入图片描述

    3.1 kafka connector

    所需依赖

    <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-kafkaartifactId>
                <version>${flink.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    示例

     /**
     *  对应的kafka中的数据:
     *      key: {"k1":100,"k2":200}
     *      value: {"guid":1,"eventId":"e02","eventTime":1655017433000,"pageId":"p001"}
     *      headers:
     *          h1 ->  vvvv
     *          h2 ->  tttt
     */
    tenv.executeSql(
            " CREATE TABLE t_kafka_connector (                       "
                    + "     guid   int,                                        "
                    + "     eventId string,                                    "
                    + "     eventTime bigint,                                  "
                    + "     pageId    string,                                  "
                    + "     k1        int,                                     "
                    + "     k2        int,                                     "
                    + " 	rec_ts   timestamp(3) metadata from 'timestamp' ,  "
                    + " 	`offset` bigint metadata ,                         "
                    + " 	headers map metadata,                "
                    + " 	rt as to_timestamp_ltz(eventTime,3) ,              "
                    + " 	watermark for rt as rt - interval '0.001' second   "
                    + " ) WITH (                                               "
                    + "  'connector' = 'kafka',                                "
                    + "  'topic' = 'hdp-kafka',                             "
                    + "  'properties.bootstrap.servers' = 'hdp:9092',      "
                    + "  'properties.group.id' = 'testGroup',                  "
                    + "  'scan.startup.mode' = 'earliest-offset',           "
                    + "  'key.format'='json',                               "
                    + "  'key.json.ignore-parse-errors' = 'true',           "
                    + "  'key.fields'='k1;k2',                              "
                    /* + "  'key.fields-prefix'='',                   "     */
                    + "  'value.format'='json',                             "
                    + "  'value.json.fail-on-missing-field'='false',        "
                    + "  'value.fields-include' = 'EXCEPT_KEY'              "
                    + " )                                                   "
    
    );
    
    tenv.executeSql("select * from t_kafka_connector")/*.print()*/;
    tenv.executeSql("select guid, eventId, cast(headers['h1'] as string) as h1, cast(headers['h2'] as string) as h2 from t_kafka_connector ").print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    3.2 upsert-kafka-connector

    • 作为 source
      根据所定义的主键,将读取到的数据转换为 +I/-U/+U 记录,如果读到 null,则转换为-D 记录
    • 作为 sink
      对于 -U/+U/+I 记录,都以正常的 append 消息写入 kafka;
      对于-D 记录,则写入一个 null 到 kafka 来表示 delete 操作。

    示例(作为 sink)

     // 流转表
    tenv.createTemporaryView("bean1",bean1);
    tenv.createTemporaryView("bean2",bean2);
    
    //tenv.executeSql("select gender,count(1) as cnt from bean1 group by gender").print();
    
    // 创建目标 kafka映射表
    tenv.executeSql(
            " create table t_upsert_kafka(                 "
                    + "    id int primary key not enforced,        "
                    + "    gender string,                          "
                    + "    name string                             "
                    + " ) with (                                    "
                    + "  'connector' = 'upsert-kafka',              "
                    + "  'topic' = 'hdp-upsert',                "
                    + "  'properties.bootstrap.servers' = 'hdp:9092',  "
                    + "  'key.format' = 'csv',                             "
                    + "  'value.format' = 'csv'                            "
                    + " )                                                  "
    
    );
    // 查询每种性别的数据行数,并将结果插入到目标表
    tenv.executeSql(
            "insert into t_upsert_kafka " +
            "select bean1.id, bean1.gender, bean2.name from bean1 left join bean2 on bean1.id=bean2.id"
    );
    
    tenv.executeSql("select  *  from t_upsert_kafka").print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    3.3 jdbc connector

    jdbc connector 有如下特性

    • 可作为 scan source , 底层产生 Bounded Stream
    • 可作为 lookup source, 底层是“事件驱动”式查询
    • 可作为 Batch 模式的 sink
    • 可作为 Stream 模式下的 append sink 和 upsert sink

    所需依赖

    <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-jdbcartifactId>
                <version>${flink.version}version>
    dependency>
    <dependency>
    			<groupId>mysqlgroupId>
    			<artifactId>mysql-connector-javaartifactId>
    			<version>8.0.21version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    示例(作为sink)

    tenv.executeSql(
            "create table flink_stu(                " +
                    "   id  int  primary key,       " +
                    "   gender string,              " +
                    "   name string                 " +
                    ") with (                       " +
                    "  'connector' = 'jdbc',        " +
                    "  'url' = 'jdbc:mysql://hdp:3306/flinktest'," +
                    "  'table-name' = 'stu2',       " +
                    "  'username' = 'root',         " +
                    "  'password' = 'root'          " +
                    ")"
    );
     // 流转表
    tenv.createTemporaryView("bean1", bean1);
    tenv.createTemporaryView("bean2", bean2);
    
    tenv.executeSql("insert into flink_stu " +
            "select bean1.id, bean1.gender, bean2.name from bean1 left join bean2 on bean1.id=bean2.id");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.4 filesystem connector

    filesystem connector 表特性

    • 可读可写
    • 作为 source 表时,支持持续监视读取目录下新文件,且每个新文件只会被读取一次
    • 作为 sink 表时,支持多种文件格式、分区、文件滚动、压缩设置等功能

    示例(将数据流作为source,写入 filesystem 表)

    // 建表 fs_table 来映射 mysql中的flinktest.stu
    tenv.executeSql(
            "CREATE TABLE fs_table (" +
                    "  user_id STRING," +
                    "  order_amount DOUBLE," +
                    "  dt STRING," +
                    "  `hour` STRING" +
                    ") PARTITIONED BY (dt, `hour`) WITH (" +
                    "  'connector'='filesystem'," +
                    "  'path'='file:///d:/filetable/'," +
                    "  'format'='json'," +
                    "  'sink.partition-commit.delay'='1 h'," +
                    "  'sink.partition-commit.policy.kind'='success-file'," +
                    "  'sink.rolling-policy.file-size' = '8M'," +
                    "  'sink.rolling-policy.rollover-interval'='30 min'," +
                    "  'sink.rolling-policy.check-interval'='10 second'" +
                    ")"
    );
    
    // u01,88.8,2022-06-13,14
    SingleOutputStreamOperator<Tuple4<String, Double, String, String>> stream = env
            .socketTextStream("hdp01", 9999)
            .map(s -> {
                String[] split = s.split(",");
                return Tuple4.of(split[0], Double.parseDouble(split[1]), split[2], split[3]);
            }).returns(new TypeHint<Tuple4<String, Double, String, String>>() {
            });
    
    tenv.createTemporaryView("orders", stream);
    
    tenv.executeSql("insert into fs_table select * from orders");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    watermark与时间属性

    1 流转表

    数据流 转为 数据表的过程中, 无论“源流”是否存在 watermark, 都不会自动传递 watermark。
    如需时间运算(如时间窗口等),需要在转换定义中显式声明 watermark 策略。
    
    • 1
    • 2

    问题:那么流转表的过程中,如何传承 事件时间 和 watermark 呢?

    1. 需要一个 timestamp(3) 类型字段(可以是物理字段,也可以是表达式字段,也可以是元数据字段)

    先设法定义一个 timestamp(3) 或者 timestamp_ltz(3) 类型的字段(可以来自于数据字段, 也可以来自于一个元数据字段:rowtime

    • rt as to_timestamp_ltz(eventTime) —> 从一个 bigint 得到一个timestamp(3)类型的字段
    • rt timestamp(3) metadata from ‘rowtime’ —> 从一个元数据 rowtime 得到一个 timestamp(3)类型的字段
    1. 需要用一个 watermarkExpression 来指定 watermark 策略

    两种方式:

    • watermark for rt AS rt - interval ‘1’ second
    • watermark for rt AS source_watermark() —> 代表使用底层流的 watermark 策略

    Table API 示例

    // {"guid":1,"eventId":"e02","eventTime":1655017433000,"pageId":"p001"}
    DataStreamSource<String> s1 = env.socketTextStream("doitedu", 9999);
    
    SingleOutputStreamOperator<Event> s2 = s1.map(s -> JSON.parseObject(s, Event.class))
         .assignTimestampsAndWatermarks(WatermarkStrategy
                 .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                     @Override
                     public long extractTimestamp(Event element, long recordTimestamp) {
                         return element.eventTime;
                     }
                 })
         );
    // 这样,直接把流 转成 表,会丢失watermark
    tenv.createTemporaryView("t_events", s2);
    
    // 可以在  流  转 表 时,显式声明 watermark策略
    tenv.createTemporaryView("t_events2", s2, Schema.newBuilder()
            .column("guid", DataTypes.INT())
            .column("eventId", DataTypes.STRING())
            .column("eventTime", DataTypes.BIGINT())
            .column("pageId", DataTypes.STRING())
    
            .columnByExpression("rt","to_timestamp_ltz(eventTime,3)")  // 重新利用一个bigint转成 timestamp后,作为事件时间属性
            .columnByMetadata("rt", DataTypes.TIMESTAMP_LTZ(3), "rowtime")  // 利用底层流连接器暴露的 rowtime 元数据(代表的就是底层流中每条数据上的eventTime),声明成事件时间属性字段
    
            .watermark("rt","rt - interval '1' second ")  // 重新定义表上的watermark策略
            //.watermark("rt", "source_watermark()") // 声明 watermark 直接 引用 底层流的watermark
            .build());
    
    tenv.executeSql("select guid, eventId, eventTime, pageId, rt, current_watermark(rt) as wm from t_events2").print();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    SQL DDL 示例

    // 只有 TIMESTAMP 或 TIMESTAMP_LTZ 类型的字段可以被声明为rowtime(事件时间属性)
    tenv.executeSql(
            " create table t_events(                                          "
                    + "   guid int,                                                     "
                    + "   eventId string,                                               "
                    /*+ "   eventTime timestamp(3),                                     "*/ // timestamp(3) 类型可以直接使用,否则需要使用表达式字段转换后使用
                    + "   eventTime bigint,                                             "
                    + "   pageId  string,                                               "
                    + "   pt AS proctime(),                                             "  // 利用一个表达式字段,来声明 processing time属性
                    + "   rt as to_timestamp_ltz(eventTime,3),                          "
                    + "   watermark for rt as rt - interval '0.001' second             "  // 用watermark for xxx,来将一个已定义的TIMESTAMP/TIMESTAMP_LTZ字段声明成 eventTime属性及指定watermark策略
                    + " )                                                               "
                    + " with (                                                          "
                    + "   'connector' = 'kafka',                                        "
                    + "   'topic' = 'doit30-events2',                                   "
                    + "   'properties.bootstrap.servers' = 'doitedu:9092',              "
                    + "   'properties.group.id' = 'g1',                                 "
                    + "   'scan.startup.mode' = 'earliest-offset',                      "
                    + "   'format' = 'json',                                            "
                    + "   'json.fail-on-missing-field' = 'false',                       "
                    + "   'json.ignore-parse-errors' = 'true'                           "
                    + " )                                                               "
    );
    
    tenv.executeSql("desc t_events")/*.print()*/;
    tenv.executeSql("select guid, eventId, eventTime, pageId, pt, rt, CURRENT_WATERMARK(rt) as wm from t_events").print();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    2 表转流

    源表定义了 wartermark 策略。则将表转成流时,将会自动传递源表的 watermark。

  • 相关阅读:
    从零开发短视频电商 Spring事务嵌套问题
    【SpringMVC】SpringMVC接受请求参数和数据回显
    MySQL-DDL语句
    ClickHouse多种安装方式
    AI二次开发C#分组
    广电5G正式启航,黄金频段将如何应用引关注
    mmpose关键点(二):构建自己的训练集
    TSINGSEE青犀视频平台Linux云存储挂载工具使最新配置与部署方式
    暑假算法7.28,Day27
    使用docker搭建homarr
  • 原文地址:https://blog.csdn.net/qq_17310871/article/details/126561782