• Flink PostgreSQL CDC配置和常见问题


    部分内容来源:流计算 Oceanus 数据库 PostgreSQL CDC-SQL 开发指南-文档中心-腾讯云

    介绍

    Postgres 的 CDC 源表(即 Postgres 的流式源表)用于依次读取 PostgreSQL 数据库全量快照数据和变更数据,保证不多读也不少读一条数据。即使发生故障,也能采用 Exactly Once 方式处理。

    版本说明

    Flink 版本说明
    1.11支持
    1.13支持
    1.14不支持

    使用范围

    PostgreSQL CDC 只支持作为源表。支持的 PostgreSQL 版本为9.6及以上版本。

    PostgreSQL 数据库配置准备

    postgresql.conf 配置

    1. # 更改wal发送最大进程数(默认值为10),这个值和上面的solts设置一样
    2. max_wal_senders = 10 # max number of walsender processes
    3. wal_keep_segments = 2 # in logfile segments; 0 disables
    4. # 中断那些停止活动超过指定毫秒数的复制连接,可以适当设置大一点(默认60s)
    5. wal_sender_timeout = 60s # in milliseconds; 0 disables
    6. #更改solts最大数量(默认值为10),flink-cdc默认一张表占用一个slots
    7. max_replication_slots = 10 # max number of replication slots
    8. #指定为logical
    9. wal_level = logical # minimal, replica, or logical
    10. # - Archiving -
    11. archive_mode = on
    12. archive_command = 'test ! -f /var/lib/pgsql/11/arch/%f && cp %p /var/lib/pgsql/11/arch/%f'

    pg_hba.conf

     添加

    1. # IPv4 local connections:
    2. host    all                 all              0.0.0.0/0               md5
    3. host    replication     all             0.0.0.0/0               md5

    FlinkCDC DDL 定义

    1. CREATE TABLE postgres_cdc_source_table (
    2. id INT,
    3. name STRING,
    4. PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
    5. ) WITH (
    6. 'connector' = 'postgres-cdc', -- 固定值 'postgres-cdc'
    7. 'hostname' = 'yourHostname', -- 数据库的 IP
    8. 'port' = '5432', -- 数据库的访问端口
    9. 'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)
    10. 'password' = 'yourPassWord', -- 数据库访问的密码
    11. 'database-name' = 'yourDatabaseName', -- 需要同步的数据库
    12. 'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)
    13. 'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)
    14. 'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符
    15. );

    WITH 参数

    参数说明是否必填备注
    connector源表类型固定值为 postgres-cdc
    hostnamePostgres 数据库的 IP 地址或者 Hostname-
    usernamePostgres 数据库服务的用户名有特定权限(包括 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT)的 Postgres 用户
    passwordPostgres 数据库服务的密码-
    database-namePostgres 数据库名称-
    schema-namePostgres Schema 名称Schema 名称支持正则表达式以读取多个 Schema 的数据
    table-namePostgres 表名表名支持正则表达式以读取多个表的数据
    portPostgres 数据库服务的端口号默认值为5432
    decoding.plugin.namePostgres Logical Decoding 插件名称根据 Postgres 服务上安装的插件确定。支持的插件列表如下:
    • decoderbufs(默认值)
    • wal2json
    • wal2json_rds
    • wal2json_streaming
    • wal2json_rds_streaming
    • pgoutput
    debezium.*Debezium 属性参数从更细粒度控制 Debezium 客户端的行为。例如'debezium.slot.name' = 'xxxx',以避免出现 PSQLException: ERROR: replication slot "dl_test" is active for PID 19997 详情请参见 配置属性

    类型映射

    Postgres CDC 和 Flink 字段类型对应关系如下:

    Postgres CDC 字段类型Flink 字段类型
    SMALLINTSMALLINT
    INT2
    SMALLSERIAL
    SERIAL2
    INTEGERINT
    SERIAL
    BIGINTBIGINT
    BIGSERIAL
    REALFLOAT
    FLOAT4
    FLOAT8DOUBLE
    DOUBLE PRECISION
    NUMERIC(p, s)DECIMAL(p, s)
    DECIMAL(p, s)
    BOOLEANBOOLEAN
    DATEDATE
    TIME [(p)] [WITHOUT TIMEZONE]TIME [(p)] [WITHOUT TIMEZONE]
    TIMESTAMP [(p)] [WITHOUT TIMEZONE]TIMESTAMP [(p)] [WITHOUT TIMEZONE]
    CHAR(n)STRING
    CHARACTER(n)
    VARCHAR(n)
    CHARACTER VARYING(n)
    TEXT
    BYTEABYTES

    代码示例

    1. CREATE TABLE postgres_cdc_source_table (
    2. id INT,
    3. name STRING,
    4. PRIMARY KEY (`id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义
    5. ) WITH (
    6. 'connector' = 'postgres-cdc', -- 固定值 'postgres-cdc'
    7. 'hostname' = 'yourHostname', -- 数据库的 IP
    8. 'port' = '5432', -- 数据库的访问端口
    9. 'username' = 'yourUserName', -- 数据库访问的用户名(需要提供 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT权限)
    10. 'password' = 'yourPassWord', -- 数据库访问的密码
    11. 'database-name' = 'yourDatabaseName', -- 需要同步的数据库
    12. 'schema-name' = 'yourSchemaName', -- 需要同步的数据表所属schema (支持正则表达式)
    13. 'table-name' = 'yourTableName', -- 需要同步的数据表名 (支持正则表达式)
    14. 'debezium.slot.name' = 'customslotname' -- 定义一个唯一slot名称,可以包含小写字母、数字和下划线字符
    15. );
    16. CREATE TABLE `print_table` (
    17. `id` INT,
    18. `name` STRING
    19. ) WITH (
    20. 'connector' = 'print'
    21. );
    22. insert into print_table select * from postgres_cdc_source_table;

    来源: 流计算 Oceanus 数据库 PostgreSQL CDC-SQL 开发指南-文档中心-腾讯云

    注意事项

    用户权限

    用来同步的用户至少具有 REPLICATION、LOGIN、SCHEMA、DATABASE、SELECT 权限。

    1. CREATE ROLE debezium_user REPLICATION LOGIN;
    2. GRANT USAGE ON SCHEMA schema_name TO debezium_user;
    3. GRANT USAGE ON DATABASE schema_name TO debezium_user;
    4. GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;

    postgres CDC 遇到的错误:

    1. number of requested standby connections exceeds max_wal_senders (currently 1)。

    max_wal_senders 须大于 wal_keep_segments

    2.org.postgresql.util.PSQLException: ERROR: could not access file "decoderbufs": No such file or directory。

    如果是 table sql 方式,pgoutput是 PostgreSQL 10+ 中的标准逻辑解码输出插件。需要设置一下。添加如下配置

    WITH(

           'decoding.plugin.name' = 'pgoutput'

    )

    3 .io.debezium.DebeziumException: Failed to start replication stream at LSN{0/1100AA50}; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.

    org.postgresql.util.PSQLException: ERROR: replication slot "flink" is active for PID 26632

    同一个数据源存在两个复制槽,因此需要为每个指定不同的名称。

    WITH(

           'debezium.slot.name' = '自定义名称'

    )

  • 相关阅读:
    Android四大组件详解
    char *, char **,char a[] ,char *a[]啥啥分不清楚?
    android13 RK356X 预装第三方apk失败
    limux环境配置文件
    软件工程测试与度量课程学习---基本测试过程----线性模型
    CMM—软件企业走向世界的通行证
    MySQL之账号管理
    Cassandra介绍(二)
    Java面试干货:关于数组查找的几个常用实现算法
    具有计算功能的模拟信号平均值采集隔离放大器
  • 原文地址:https://blog.csdn.net/xiweiller/article/details/126030209