• 用 flink 插件chunjun实现全量+增量同步-达梦数据库到postgresql


    flink 插件chunjun实现全量+增量同步,这里以达梦数据库同步到postgresql数据库为例。

    纯钧下载地址:纯钧

    纯钧是一款稳定、易用、高效、批流一体的数据集成框架,目前基于实时计算引擎Flink实现多种异构数据源之间的数据同步与计算,已在上千家公司部署且稳定运行。

    达梦表ddl:

    1. CREATE TABLE SYSDBA.SOURCE_TABLE (
    2. ID INT NOT NULL,
    3. NAME VARCHAR(100),
    4. CREATE_TIME INT,
    5. CONSTRAINT PK_SOURCE_TABLE_ID PRIMARY KEY (ID)
    6. );
    7. CREATE UNIQUE INDEX INDEX33555468 ON SYSDBA.SOURCE_TABLE (ID);

    postgresql ddl:

    1. CREATE TABLE public.SINK_TABLE (
    2. id int4 NOT NULL,
    3. "name" varchar(100) NULL,
    4. create_time int4 NULL,
    5. CONSTRAINT pk_SINK_TABLE_id2 PRIMARY KEY (id)
    6. );

    纯钧的sql:

    1. create table SOURCE_TABLE(
    2. ID INT,
    3. NAME varchar(200),
    4. CREATE_TIME INT
    5. )
    6. with (
    7. 'connector' = 'dm-x',
    8. 'url' = 'jdbc:dm://11.0.24.107:5236',
    9. 'schema' = 'SYSDBA',
    10. 'table-name' = 'SOURCE_TABLE',
    11. 'username' = 'SYSDBA',
    12. 'password' = 'SYSDBA001',
    13. 'scan.increment.column' = 'CREATE_TIME',
    14. 'scan.increment.column-type' = 'int',
    15. 'scan.polling-interval' = '3000',
    16. 'scan.fetch-size' = '200',
    17. 'scan.query-timeout' = '10'
    18. );
    19. CREATE TABLE SINK_TABLE (
    20. id INT,
    21. name varchar(200),
    22. create_time INT,
    23. PRIMARY KEY (id) NOT ENFORCED)
    24. with (
    25. 'password'='sys',
    26. 'connector'='postgresql-x',
    27. 'sink.buffer-flush.interval'='1000',
    28. 'sink.all-replace'='true',
    29. 'sink.buffer-flush.max-rows'='100',
    30. 'table-name'='SINK_TABLE',
    31. 'sink.parallelism'='1',
    32. 'url'='jdbc:postgresql://11.0.101.10:39001/sys',
    33. 'username'='sys'
    34. );
    35. insert into SINK_TABLE select ID,NAME,CREATE_TIME from SOURCE_TABLE;

    原理就是根据create_time这个字段的更新而增量更新修改、添加操作。

    参数解释:

    ,'scan.increment.column' = 'create_time' -- 增量字段,根据这个字段判断是否更新

    ,'scan.increment.column-type' = 'int'  -- 增量字段类型

    ,'scan.polling-interval' = '3000' --间隔轮训时间。非必填(不填为离线任务,执行一次就技术),无默认

          'sink.all-replace' = 'true', -- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句

                                      -- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。

                                      -- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。

  • 相关阅读:
    通过 ffmpeg命令行 调节视频播放速度
    软件架构设计(十二) 构件与中间件技术-构件概念
    再学Blazor——概述
    MapReduce入门实战
    协同编辑中使用的 OT 算法是什么?
    [附源码]计算机毕业设计校园便携系统Springboot程序
    MySQL中比较运算符的使用
    springboot在filter中设置跨域
    李m圆申论
    【C语言】指针作为参数传值常见问题
  • 原文地址:https://blog.csdn.net/leidengyan/article/details/133030883