a.依赖准备
flink-connector-postgres-cdc-*.jar
b.Synchronizing Tables(同步表)
在Flink DataStream作业中使用 PostgresSyncTableAction 或直接通过flink run
,可以将PostgreSQL中的一个或多个表同步到一个Paimon表中。
/bin/flink run \
/path/to/paimon-flink-action-0.7.0-incubating.jar \
postgres_sync_table
--warehouse \
--database \
--table \
[--partition_keys ] \
[--primary_keys ] \
[--type_mapping ] \
[--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
[--metadata_column ] \
[--postgres_conf [--postgres_conf ...]] \
[--catalog_conf [--catalog_conf ...]] \
[--table_conf [--table_conf ...]]
配置信息如下:
Configuration | Description |
---|---|
–warehouse | The path to Paimon warehouse. |
–database | The database name in Paimon catalog. |
–table | The Paimon table name. |
–partition_keys | The partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”. |
–primary_keys | The primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”. |
–type_mapping | It is used to specify how to map PostgreSQL data type to Paimon type. Supported options:“to-string”: maps all PostgreSQL types to STRING. |
–computed_column | The definitions of computed columns. The argument field is from PostgreSQL table field name. See here for a complete list of configurations. |
–metadata_column | –metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its document for a complete list of available metadata. |
–postgres_conf | The configuration for Flink CDC Postgres sources. Each configuration should be specified in the format “key=value”. hostname, username, password, database-name, schema-name, table-name and slot.name are required configurations, others are optional. See its document for a complete list of configurations. |
–catalog_conf | The configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations. |
–table_conf | The configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations. |
如果指定的Paimon表不存在,将自动创建该表,表结构将从所有指定的PostgreSQL表中派生出来。
如果Paimon表已经存在,其表结构将与所有指定PostgreSQL表的结构进行比较。
示例1:将表同步到一个Paimon表中
/bin/flink run \
/path/to/paimon-flink-action-0.7.0-incubating.jar \
postgres_sync_table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table test_table \
--partition_keys pt \
--primary_keys pt,uid \
--computed_column '_year=year(age)' \
--postgres_conf hostname=127.0.0.1 \
--postgres_conf username=root \
--postgres_conf password=123456 \
--postgres_conf database-name='source_db' \
--postgres_conf schema-name='public' \
--postgres_conf table-name='source_table1|source_table2' \
--postgres_conf slot.name='paimon_cdc' \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4
如示例所示,postgres_conf的表名支持正则表达式,以监控满足正则表达式的多个表。所有表的结构将合并到一个Paimon表结构中。
示例2:将分片的表同步到一个Paimon表中
使用正则表达式设置“schema-name”来捕获多个schemas。
典型场景:表“source_table”被拆分为模式“source_schema1”,“source_schema2”…,然后将所有“source_table”的数据同步到一个Paimon表中。
/bin/flink run \
/path/to/paimon-flink-action-0.7.0-incubating.jar \
postgres_sync_table \
--warehouse hdfs:///path/to/warehouse \
--database test_db \
--table test_table \
--partition_keys pt \
--primary_keys pt,uid \
--computed_column '_year=year(age)' \
--postgres_conf hostname=127.0.0.1 \
--postgres_conf username=root \
--postgres_conf password=123456 \
--postgres_conf database-name='source_db' \
--postgres_conf schema-name='source_schema.+' \
--postgres_conf table-name='source_table' \
--postgres_conf slot.name='paimon_cdc' \
--catalog_conf metastore=hive \
--catalog_conf uri=thrift://hive-metastore:9083 \
--table_conf bucket=4 \
--table_conf changelog-producer=input \
--table_conf sink.parallelism=4