表定义要素:表定义包含表名和表描述器(TableDescriptor),其中表名包含3个部分:
catalog_name
database_name
object_name
TableDescriptor 包含4个方面:
表结构中的字段分为:物理字段、表达式字段、元数据字段、主键约束
物理字段:源自于“外部存储”系统本身 schema 中的字段
如 kafka 消息的 key、value(json 格式)中的字段;
mysql 表中的字段;hive 表中的字段;parquet 文件中的字段……
表达式字段(逻辑字段):在物理字段上施加一个 sql 表达式,并将表达式结果定义为一个字段
TableApi 中的定义方式
Schema.newBuilder()
// 声明表达式字段 age_exp, 它来源于物理字段 age+10
.columnByExpression("age_exp", "age+10")
Sql DDL 中的定义方式
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
`cost` AS price * quantity, -- cost 来源于: price*quantity
) WITH (
'connector' = 'kafka'
...
);
元数据字段:来源于 connector 从外部存储系统中获取到的“外部系统元信息”
比如,kafka 的消息,通常意义上的数据内容是在 record 的 key 和 value 中的,而实质上(底层角度来看),kafka 中的每一条 record,不光带了 key 和 value 数据内容,还带了这条 record 所属的 topic,所属的 partition,所在的 offset,以及 record 的 timetamp 和 timestamp 类型等“元信息”。而 flink 的 connector 可以获取并暴露这些元信息,并允许用户将这些信息定义成 flinksql 表中的字段;
TableApi 中的定义方式
Schema.newBuilder()
.columnByMetadata("topic",DataTypes.STRING())
Sql DDL 中的定义方式
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 元数据字段, 来源于 kafka record 的 timestamp
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);
单字段主键约束语法:
id INT PRIMARY KEY NOT ENFORCED,
name STRING
多字段主键约束语法:
id,
name,
PRIMARY KEY(id,name) NOT ENFORCED
完整示例
Table API
// 建表(数据源表)
// {"id":4,"name":"zs","nick":"tiedan","age":18,"gender":"male"}
tenv.createTable("t_person",
TableDescriptor
.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT()) // column是声明物理字段到表结构中来
.column("name", DataTypes.STRING()) // column是声明物理字段到表结构中来
.column("nick", DataTypes.STRING()) // column是声明物理字段到表结构中来
.column("age", DataTypes.INT()) // column是声明物理字段到表结构中来
.column("gender", DataTypes.STRING()) // column是声明物理字段到表结构中来
.columnByExpression("guid","id") // 声明表达式字段
/*.columnByExpression("big_age",$("age").plus(10))*/ // 声明表达式字段
.columnByExpression("big_age","age + 10") // 声明表达式字段
// isVirtual 是表示: 当这个表被sink表时,该字段是否出现在schema中
.columnByMetadata("offs",DataTypes.BIGINT(),"offset",true) // 声明元数据字段
.columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true) // 声明元数据字段
/*.primaryKey("id","name")*/
.build())
.format("json")
.option("topic","mytopic")
.option("properties.bootstrap.servers","hdp01:9092")
.option("properties.group.id","g1")
.option("scan.startup.mode","earliest-offset")
.option("json.fail-on-missing-field","false")
.option("json.ignore-parse-errors","true")
.build()
);
tenv.executeSql("select * from t_person").print();
SQL DDL
// 建表(数据源表)
// {"id":4,"name":"zs","nick":"tiedan","age":18,"gender":"male"}
tenv.executeSql(
"create table t_person "
+ " ( "
+ " id int , " // -- 物理字段
+ " name string, " // -- 物理字段
+ " nick string, "
+ " age int, "
+ " gender string , "
+ " guid as id, " // -- 表达式字段(逻辑字段)
+ " big_age as age + 10 , " // -- 表达式字段(逻辑字段)
+ " offs bigint metadata from 'offset' , " // -- 元数据字段
+ " ts TIMESTAMP_LTZ(3) metadata from 'timestamp', " // -- 元数据字段
/*+ " PRIMARY KEY(id,name) NOT ENFORCED "*/ // -- 主键约束
+ " ) "
+ " WITH ( "
+ " 'connector' = 'kafka', "
+ " 'topic' = 'mytopic', "
+ " 'properties.bootstrap.servers' = 'hdp01:9092', "
+ " 'properties.group.id' = 'g1', "
+ " 'scan.startup.mode' = 'earliest-offset', "
+ " 'format' = 'json', "
+ " 'json.fail-on-missing-field' = 'false', "
+ " 'json.ignore-parse-errors' = 'true' "
+ " ) "
);
tenv.executeSql("desc t_person").print();
tenv.executeSql("select * from t_person where id>2").print();
connector 连接器在对接外部存储时,根据外部存储中的数据格式不同,需要用到不同的 format 组件。
format 组件的作用就是:告诉连接器,如何解析外部存储中的数据及映射到表 schema。
format 组件的使用要点
所需依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jsonartifactId>
<version>1.15.0version>
dependency>
可用参数
format 组件名:json
json.fail-on-missing-field 缺失字段是否失败
json.ignor-parse-errors 是否忽略 json 解析错误
json.timestamp-format.standard json 中的 timestamp 类型字段的格式
json.map-null-key.mode 可取: FAIL ,DROP, LITERAL
json.map-null-key.literal 替换 null 的字符串
json.encode.decimal-as-plain-number
……
json 示例
样例数据: {“id”:12,“name”:{“nick”:“doe3”,“formal”:“doit edu3”,“height”:170}}
Table API
tenv.createTable("t_json",
TableDescriptor
.forConnector("filesystem")
.schema(Schema.newBuilder()
.column("id", DataTypes.INT())
.column("name", DataTypes.ROW(
DataTypes.FIELD("nick", DataTypes.STRING()),
DataTypes.FIELD("formal", DataTypes.STRING()),
DataTypes.FIELD("height", DataTypes.INT())
))
.build())
.format("json")
.option("path","data/json/qiantao")
.build());
tenv.executeSql("select id, name.formal, name.height from t_json2").print();
样例数据:{"id":1,"friends":[{"name":"a","info":{"addr":"bj","gender":"male"}},{"name":"b","info":{"addr":"sh","gender":"female"}}]}
SQL DDL
tenv.executeSql(
"create table t_json( "
+ " id int, "
+ " friends array>> "
+ ")with( "
+ " 'connector' = 'filesystem', "
+ " 'path' = 'data/json/qiantao2/', "
+ " 'format'='json' "
+ ") "
);
tenv.executeSql("select id," +
"friends[1].name as name1, friends[1].info['addr'] as addr1, friends[1].info['gender'] as gender1, " +
"friends[2].name as name2, friends[2].info['addr'] as addr2, friends[2].info['gender'] as gender2 " +
"from t_json")/*.print()*/;
所需依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-csvartifactId>
<version>1.15.0version>
dependency>
可用参数
format = csv
csv.field-delimiter = ‘,’
csv.disable-quote-character = false
csv.quote-character = ’ " ’
csv.allow-comments = false
csv.ignore-parse-erros = false 是否忽略解析错误
csv.array-element-delimiter = ’ ; ’ 数组元素之间的分隔符
csv.escape-character = none 转义字符
csv.null-literal = none null 的字面量字符串
csv 示例
tenv.executeSql(
"create table t_csv( "
+ " id int, "
+ " name string, "
+ " age string "
+ ") with ( "
+ " 'connector' = 'filesystem', "
+ " 'path' = 'data/csv/', "
+ " 'format'='csv', "
+ " 'csv.disable-quote-character' = 'false', "
+ " 'csv.quote-character' = '|', "
+ " 'csv.ignore-parse-errors' = 'true' , "
+ " 'csv.null-literal' = 'AA' , " // 将数据中的 AA 当成空处理
+ " 'csv.allow-comments' = 'true' "
+ ") "
);
tenv.executeSql("select * from t_csv").print();
connector 概述
连接器使用的核心要素:
- 导入连接器 jar 包依赖
- 指定连接器类型名
- 指定连接器所需的参数(不同连接器有不同的参数配置需求)
- 获取连接器所提供的元数据
FlinkSql 内置支持的 connector
所需依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafkaartifactId>
<version>${flink.version}version>
dependency>
示例
/**
* 对应的kafka中的数据:
* key: {"k1":100,"k2":200}
* value: {"guid":1,"eventId":"e02","eventTime":1655017433000,"pageId":"p001"}
* headers:
* h1 -> vvvv
* h2 -> tttt
*/
tenv.executeSql(
" CREATE TABLE t_kafka_connector ( "
+ " guid int, "
+ " eventId string, "
+ " eventTime bigint, "
+ " pageId string, "
+ " k1 int, "
+ " k2 int, "
+ " rec_ts timestamp(3) metadata from 'timestamp' , "
+ " `offset` bigint metadata , "
+ " headers map metadata, "
+ " rt as to_timestamp_ltz(eventTime,3) , "
+ " watermark for rt as rt - interval '0.001' second "
+ " ) WITH ( "
+ " 'connector' = 'kafka', "
+ " 'topic' = 'hdp-kafka', "
+ " 'properties.bootstrap.servers' = 'hdp:9092', "
+ " 'properties.group.id' = 'testGroup', "
+ " 'scan.startup.mode' = 'earliest-offset', "
+ " 'key.format'='json', "
+ " 'key.json.ignore-parse-errors' = 'true', "
+ " 'key.fields'='k1;k2', "
/* + " 'key.fields-prefix'='', " */
+ " 'value.format'='json', "
+ " 'value.json.fail-on-missing-field'='false', "
+ " 'value.fields-include' = 'EXCEPT_KEY' "
+ " ) "
);
tenv.executeSql("select * from t_kafka_connector")/*.print()*/;
tenv.executeSql("select guid, eventId, cast(headers['h1'] as string) as h1, cast(headers['h2'] as string) as h2 from t_kafka_connector ").print();
示例(作为 sink)
// 流转表
tenv.createTemporaryView("bean1",bean1);
tenv.createTemporaryView("bean2",bean2);
//tenv.executeSql("select gender,count(1) as cnt from bean1 group by gender").print();
// 创建目标 kafka映射表
tenv.executeSql(
" create table t_upsert_kafka( "
+ " id int primary key not enforced, "
+ " gender string, "
+ " name string "
+ " ) with ( "
+ " 'connector' = 'upsert-kafka', "
+ " 'topic' = 'hdp-upsert', "
+ " 'properties.bootstrap.servers' = 'hdp:9092', "
+ " 'key.format' = 'csv', "
+ " 'value.format' = 'csv' "
+ " ) "
);
// 查询每种性别的数据行数,并将结果插入到目标表
tenv.executeSql(
"insert into t_upsert_kafka " +
"select bean1.id, bean1.gender, bean2.name from bean1 left join bean2 on bean1.id=bean2.id"
);
tenv.executeSql("select * from t_upsert_kafka").print();
jdbc connector 有如下特性
- 可作为 scan source , 底层产生 Bounded Stream
- 可作为 lookup source, 底层是“事件驱动”式查询
- 可作为 Batch 模式的 sink
- 可作为 Stream 模式下的 append sink 和 upsert sink
所需依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-jdbcartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>8.0.21version>
dependency>
示例(作为sink)
tenv.executeSql(
"create table flink_stu( " +
" id int primary key, " +
" gender string, " +
" name string " +
") with ( " +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://hdp:3306/flinktest'," +
" 'table-name' = 'stu2', " +
" 'username' = 'root', " +
" 'password' = 'root' " +
")"
);
// 流转表
tenv.createTemporaryView("bean1", bean1);
tenv.createTemporaryView("bean2", bean2);
tenv.executeSql("insert into flink_stu " +
"select bean1.id, bean1.gender, bean2.name from bean1 left join bean2 on bean1.id=bean2.id");
filesystem connector 表特性
- 可读可写
- 作为 source 表时,支持持续监视读取目录下新文件,且每个新文件只会被读取一次
- 作为 sink 表时,支持多种文件格式、分区、文件滚动、压缩设置等功能
示例(将数据流作为source,写入 filesystem 表)
// 建表 fs_table 来映射 mysql中的flinktest.stu
tenv.executeSql(
"CREATE TABLE fs_table (" +
" user_id STRING," +
" order_amount DOUBLE," +
" dt STRING," +
" `hour` STRING" +
") PARTITIONED BY (dt, `hour`) WITH (" +
" 'connector'='filesystem'," +
" 'path'='file:///d:/filetable/'," +
" 'format'='json'," +
" 'sink.partition-commit.delay'='1 h'," +
" 'sink.partition-commit.policy.kind'='success-file'," +
" 'sink.rolling-policy.file-size' = '8M'," +
" 'sink.rolling-policy.rollover-interval'='30 min'," +
" 'sink.rolling-policy.check-interval'='10 second'" +
")"
);
// u01,88.8,2022-06-13,14
SingleOutputStreamOperator<Tuple4<String, Double, String, String>> stream = env
.socketTextStream("hdp01", 9999)
.map(s -> {
String[] split = s.split(",");
return Tuple4.of(split[0], Double.parseDouble(split[1]), split[2], split[3]);
}).returns(new TypeHint<Tuple4<String, Double, String, String>>() {
});
tenv.createTemporaryView("orders", stream);
tenv.executeSql("insert into fs_table select * from orders");
数据流 转为 数据表的过程中, 无论“源流”是否存在 watermark, 都不会自动传递 watermark。
如需时间运算(如时间窗口等),需要在转换定义中显式声明 watermark 策略。
问题:那么流转表的过程中,如何传承 事件时间 和 watermark 呢?
先设法定义一个 timestamp(3) 或者 timestamp_ltz(3) 类型的字段(可以来自于数据字段, 也可以来自于一个元数据字段:rowtime
- rt as to_timestamp_ltz(eventTime) —> 从一个 bigint 得到一个timestamp(3)类型的字段
- rt timestamp(3) metadata from ‘rowtime’ —> 从一个元数据 rowtime 得到一个 timestamp(3)类型的字段
两种方式:
- watermark for rt AS rt - interval ‘1’ second
- watermark for rt AS source_watermark() —> 代表使用底层流的 watermark 策略
Table API 示例
// {"guid":1,"eventId":"e02","eventTime":1655017433000,"pageId":"p001"}
DataStreamSource<String> s1 = env.socketTextStream("doitedu", 9999);
SingleOutputStreamOperator<Event> s2 = s1.map(s -> JSON.parseObject(s, Event.class))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.eventTime;
}
})
);
// 这样,直接把流 转成 表,会丢失watermark
tenv.createTemporaryView("t_events", s2);
// 可以在 流 转 表 时,显式声明 watermark策略
tenv.createTemporaryView("t_events2", s2, Schema.newBuilder()
.column("guid", DataTypes.INT())
.column("eventId", DataTypes.STRING())
.column("eventTime", DataTypes.BIGINT())
.column("pageId", DataTypes.STRING())
.columnByExpression("rt","to_timestamp_ltz(eventTime,3)") // 重新利用一个bigint转成 timestamp后,作为事件时间属性
.columnByMetadata("rt", DataTypes.TIMESTAMP_LTZ(3), "rowtime") // 利用底层流连接器暴露的 rowtime 元数据(代表的就是底层流中每条数据上的eventTime),声明成事件时间属性字段
.watermark("rt","rt - interval '1' second ") // 重新定义表上的watermark策略
//.watermark("rt", "source_watermark()") // 声明 watermark 直接 引用 底层流的watermark
.build());
tenv.executeSql("select guid, eventId, eventTime, pageId, rt, current_watermark(rt) as wm from t_events2").print();
SQL DDL 示例
// 只有 TIMESTAMP 或 TIMESTAMP_LTZ 类型的字段可以被声明为rowtime(事件时间属性)
tenv.executeSql(
" create table t_events( "
+ " guid int, "
+ " eventId string, "
/*+ " eventTime timestamp(3), "*/ // timestamp(3) 类型可以直接使用,否则需要使用表达式字段转换后使用
+ " eventTime bigint, "
+ " pageId string, "
+ " pt AS proctime(), " // 利用一个表达式字段,来声明 processing time属性
+ " rt as to_timestamp_ltz(eventTime,3), "
+ " watermark for rt as rt - interval '0.001' second " // 用watermark for xxx,来将一个已定义的TIMESTAMP/TIMESTAMP_LTZ字段声明成 eventTime属性及指定watermark策略
+ " ) "
+ " with ( "
+ " 'connector' = 'kafka', "
+ " 'topic' = 'doit30-events2', "
+ " 'properties.bootstrap.servers' = 'doitedu:9092', "
+ " 'properties.group.id' = 'g1', "
+ " 'scan.startup.mode' = 'earliest-offset', "
+ " 'format' = 'json', "
+ " 'json.fail-on-missing-field' = 'false', "
+ " 'json.ignore-parse-errors' = 'true' "
+ " ) "
);
tenv.executeSql("desc t_events")/*.print()*/;
tenv.executeSql("select guid, eventId, eventTime, pageId, pt, rt, CURRENT_WATERMARK(rt) as wm from t_events").print();
源表定义了 wartermark 策略。则将表转成流时,将会自动传递源表的 watermark。