• Apache Paimon 使用 Postgres CDC 获取数据


    a.依赖准备

    flink-connector-postgres-cdc-*.jar
    
    • 1

    b.Synchronizing Tables(同步表)

    在Flink DataStream作业中使用 PostgresSyncTableAction 或直接通过flink run,可以将PostgreSQL中的一个或多个表同步到一个Paimon表中。

    /bin/flink run \
        /path/to/paimon-flink-action-0.7.0-incubating.jar \
        postgres_sync_table
        --warehouse  \
        --database  \
        --table  \
        [--partition_keys ] \
        [--primary_keys ] \
        [--type_mapping ] \
        [--computed_column <'column-name=expr-name(args[, ...])'> [--computed_column ...]] \
        [--metadata_column ] \
        [--postgres_conf  [--postgres_conf  ...]] \
        [--catalog_conf  [--catalog_conf  ...]] \
        [--table_conf  [--table_conf  ...]]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    配置信息如下

    ConfigurationDescription
    –warehouseThe path to Paimon warehouse.
    –databaseThe database name in Paimon catalog.
    –tableThe Paimon table name.
    –partition_keysThe partition keys for Paimon table. If there are multiple partition keys, connect them with comma, for example “dt,hh,mm”.
    –primary_keysThe primary keys for Paimon table. If there are multiple primary keys, connect them with comma, for example “buyer_id,seller_id”.
    –type_mappingIt is used to specify how to map PostgreSQL data type to Paimon type. Supported options:“to-string”: maps all PostgreSQL types to STRING.
    –computed_columnThe definitions of computed columns. The argument field is from PostgreSQL table field name. See here for a complete list of configurations.
    –metadata_column–metadata_column is used to specify which metadata columns to include in the output schema of the connector. Metadata columns provide additional information related to the source data, for example: --metadata_column table_name,database_name,schema_name,op_ts. See its document for a complete list of available metadata.
    –postgres_confThe configuration for Flink CDC Postgres sources. Each configuration should be specified in the format “key=value”. hostname, username, password, database-name, schema-name, table-name and slot.name are required configurations, others are optional. See its document for a complete list of configurations.
    –catalog_confThe configuration for Paimon catalog. Each configuration should be specified in the format “key=value”. See here for a complete list of catalog configurations.
    –table_confThe configuration for Paimon table sink. Each configuration should be specified in the format “key=value”. See here for a complete list of table configurations.

    如果指定的Paimon表不存在,将自动创建该表,表结构将从所有指定的PostgreSQL表中派生出来。

    如果Paimon表已经存在,其表结构将与所有指定PostgreSQL表的结构进行比较。

    示例1:将表同步到一个Paimon表中

    /bin/flink run \
        /path/to/paimon-flink-action-0.7.0-incubating.jar \
        postgres_sync_table \
        --warehouse hdfs:///path/to/warehouse \
        --database test_db \
        --table test_table \
        --partition_keys pt \
        --primary_keys pt,uid \
        --computed_column '_year=year(age)' \
        --postgres_conf hostname=127.0.0.1 \
        --postgres_conf username=root \
        --postgres_conf password=123456 \
        --postgres_conf database-name='source_db' \
        --postgres_conf schema-name='public' \
        --postgres_conf table-name='source_table1|source_table2' \
        --postgres_conf slot.name='paimon_cdc' \
        --catalog_conf metastore=hive \
        --catalog_conf uri=thrift://hive-metastore:9083 \
        --table_conf bucket=4 \
        --table_conf changelog-producer=input \
        --table_conf sink.parallelism=4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    如示例所示,postgres_conf的表名支持正则表达式,以监控满足正则表达式的多个表。所有表的结构将合并到一个Paimon表结构中。

    示例2:将分片的表同步到一个Paimon表中

    使用正则表达式设置“schema-name”来捕获多个schemas。

    典型场景:表“source_table”被拆分为模式“source_schema1”,“source_schema2”…,然后将所有“source_table”的数据同步到一个Paimon表中。

    /bin/flink run \
        /path/to/paimon-flink-action-0.7.0-incubating.jar \
        postgres_sync_table \
        --warehouse hdfs:///path/to/warehouse \
        --database test_db \
        --table test_table \
        --partition_keys pt \
        --primary_keys pt,uid \
        --computed_column '_year=year(age)' \
        --postgres_conf hostname=127.0.0.1 \
        --postgres_conf username=root \
        --postgres_conf password=123456 \
        --postgres_conf database-name='source_db' \
        --postgres_conf schema-name='source_schema.+' \
        --postgres_conf table-name='source_table' \
        --postgres_conf slot.name='paimon_cdc' \
        --catalog_conf metastore=hive \
        --catalog_conf uri=thrift://hive-metastore:9083 \
        --table_conf bucket=4 \
        --table_conf changelog-producer=input \
        --table_conf sink.parallelism=4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 相关阅读:
    凯文凯利10条人生建议,送给迷茫的你!(上)
    LeetCode53. 最大子数组和
    鸡得关节炎有哪些症状 鸡喂什么药预防球菌病
    中国首个接入大模型的Linux操作系统;ChatGPT支持图片和语音输入;抖音上线方言自动翻译功能丨RTE开发者日报 Vol.57
    等保测评 —— 安全控制点
    EMNLP 2023 | DeepMind提出大模型In-Context Learning的可解释理论框架
    学习笔记-TP5反序列化利用
    中小型企业选择CRM系统时应该注意哪些?
    Python:实现radix sort基数排序算法(附完整源码)
    <MySQL> 什么是JDBC?如何使用JDBC进行编程?
  • 原文地址:https://blog.csdn.net/m0_50186249/article/details/136730815