1. 版本 对应的版本
mysql flink kafka hudi 5.7.20-log fink 13.5 2.0.0.3 0.10
2. 采用架构
3. flink sql 的 mysql cdc 表
3.1 mysql 表结构
CREATE TABLE ` Flink_cdc` (
` id ` bigint( 64 ) NOT NULL AUTO_INCREMENT,
` name` varchar( 64 ) DEFAULT NULL,
` age` int( 20 ) DEFAULT NULL,
` birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
` ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY ( ` id ` )
) ENGINE = InnoDB AUTO_INCREMENT = 69 DEFAULT CHARSET = utf8mb4;
3.2 flink sql mysql cdc 表
Flink SQL> CREATE TABLE source_mysql (
> id BIGINT PRIMARY KEY NOT ENFORCED,
> name STRING,
> age INT,
> birthday TIMESTAMP( 3 ) ,
> ts TIMESTAMP( 3 )
> ) WITH (
> 'connector' = 'mysql-cdc' ,
> 'hostname' = '192.168.1.162' ,
> 'port' = '3306' ,
> 'username' = 'root' ,
> 'password' = '123456' ,
> 'server-time-zone' = 'Asia/Shanghai' ,
> 'debezium.snapshot.mode' = 'initial' ,
> 'database-name' = 'wudldb' ,
> 'table-name' = 'Flink_cdc'
> ) ;
>
[ INFO] Execute statement succeed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
3.2 新建hudi 表 并且插入数据
Flink SQL> CREATE TABLE flink_cdc_sink_hudi_hive_wudl(
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP( 3 ) ,
> ts TIMESTAMP( 3 ) ,
> part STRING,
> primary key( id) not enforced
> )
> PARTITIONED BY ( part)
> with(
> 'connector' = 'hudi' ,
> 'path' = 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive_wudl' ,
> 'table.type' = 'MERGE_ON_READ' ,
> 'hoodie.datasource.write.recordkey.field' = 'id' ,
> 'write.precombine.field' = 'ts' ,
> 'write.tasks' = '1' ,
> 'write.rate.limit' = '2000' ,
> 'compaction.tasks' = '1' ,
> 'compaction.async.enabled' = 'true' ,
> 'compaction.trigger.strategy' = 'num_commits' ,
> 'compaction.delta_commits' = '1' ,
> 'changelog.enabled' = 'true' ,
> 'read.streaming.enabled' = 'true' ,
> 'read.streaming.check-interval' = '3' ,
> 'hive_sync.enable' = 'true' ,
> 'hive_sync.mode' = 'hms' ,
> 'hive_sync.metastore.uris' = 'thrift://node02.com:9083' ,
> 'hive_sync.jdbc_url' = 'jdbc:hive2://node02.com:10000' ,
> 'hive_sync.table' = 'flink_cdc_sink_hudi_hive_wudl' ,
> 'hive_sync.db' = 'db_hive' ,
> 'hive_sync.username' = 'root' ,
> 'hive_sync.password' = '123456' ,
> 'hive_sync.support_timestamp' = 'true'
> ) ;
[ INFO] Execute statement succeed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
3.3 将cdc 的表数据插入到hudi 表中
Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive_wudl SELECT id, name,age,birthday, ts, DATE_FORMAT( birthday, 'yyyyMMdd' ) as part FROM source_mysql ;
[ INFO] Submitting SQL update statement to the cluster.. .
[ INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8a6e4869c43e57d57357c1767e7c2b38
4. 查看数据
5. 批处理 从hudi 表输出到 kakfa
5.1 创建hudi 表
Flink SQL> CREATE TABLE hudi_flink_kafka_source (
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP( 3 ) ,
> ts TIMESTAMP( 3 ) ,
> part STRING,
> primary key( id) not enforced
> )
> PARTITIONED BY ( part)
> WITH (
> 'connector' = 'hudi' ,
> 'path' = 'hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive20220905' ,
> 'table.type' = 'MERGE_ON_READ' ,
> 'write.operation' = 'upsert' ,
> 'hoodie.datasource.write.recordkey.field' = 'id' ,
> 'write.precombine.field' = 'ts' ,
> 'write.tasks' = '1' ,
> 'compaction.tasks' = '1' ,
> 'compaction.async.enabled' = 'true' ,
> 'compaction.trigger.strategy' = 'num_commits' ,
> 'compaction.delta_commits' = '1'
> ) ;
>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
Flink SQL> CREATE TABLE kakfa_sink6 (
> id bigint ,
> name string,
> age int,
> birthday TIMESTAMP( 3 ) ,
> ts TIMESTAMP( 3 )
> ) WITH (
> 'connector' = 'kafka' ,
> 'topic' = 'wudl2022flink03' ,
> 'properties.bootstrap.servers' = '192.168.1.161:6667' ,
> 'properties.group.id' = 'wudl20220905' ,
> 'format' = 'json' ,
> 'json.fail-on-missing-field' = 'false' ,
> 'json.ignore-parse-errors' = 'true'
> ) ;
[ INFO] Execute statement succeed.
Flink SQL> INSERT INTO kakfa_sink6 SELECT id, name,age,birthday, ts FROM hudi_flink_kafka_source ;
[ INFO] Submitting SQL update statement to the cluster.. .
[ INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 005ee1b8011319d235c6485c2abb3efb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
6. 查看表结构数据
7. 时间转化函数
7.1 flink sql LOCALTIMESTAMP 获取系统时间
Flink SQL> select DATE_FORMAT( LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss' ) ;
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | 2022 -09-05 19 :19:42 |
+----+--------------------------------+
Received a total of 1 row
Flink SQL>
Flink SQL> select TO_TIMESTAMP( DATE_FORMAT( LOCALTIMESTAMP, 'yyyy-MM-dd HH:mm:ss' )) ;
+----+-------------------------+
| op | EXPR$0 |
+----+-------------------------+
| +I | 2022 -09-05 19 :20:30.000 |
+----+-------------------------+
Received a total of 1 row
Flink SQL>
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22