• flinksql postgres到mysql案例


    1.flink版本我用的是1.13.6版本,注意flink版本要与驱动包版本相对应,版本不对应会导致任务启动失败,1.13.6版本对应驱动如下。

    2.postgres数据库和mysql数据库我都是用docker搭建的,搭建postgres数据库参考:docker部署postgres数据库_今朝花落悲颜色的博客-CSDN博客

    搭建好了一定要修改postgres.conf配置文件,在挂载目录/docker/postgresql/data/下面找到postgres.conf,修改wal_level=logical,然后重启postgres。

     mysql表结构sql

    CREATE TABLE `sync_test_1` (
      `total_gmv` bigint(20) DEFAULT NULL,
      `day_time` varchar(255) COLLATE utf8mb4_bin NOT NULL,
      PRIMARY KEY (`day_time`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

    postgres表结构

    CREATE TABLE "public"."ball" (
      "total_gmv" int8,
      "day_time" varchar(32) COLLATE "pg_catalog"."default"
    );

    ALTER TABLE "public"."ball" ADD CONSTRAINT "ball_pkey" PRIMARY KEY ("day_time");

    Flink ddl语句postgres实时同步到mysql如下

    --源表

    CREATE TABLE pgtest (

      day_time VARCHAR,

        total_gmv bigint,

        PRIMARY KEY (day_time) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义

    ) WITH (

      'connector' = 'postgres-cdc',  -- 必须为 'postgres-cdc'

      'hostname' = 'localhost',     -- 数据库的 IP

      'port' = '5432',     -- 数据库的访问端口

      'username' = 'postgres',           -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例)

      'password' = '123456',    -- 数据库访问使用的密码

      'database-name' = 'postgres',  -- 需要同步的数据库名

      'schema-name' = 'public',      -- 需要同步的数据库模式 (Schema)

      'table-name' = 'ball' ,      -- 需要同步的数据表名

      'decoding.plugin.name' = 'pgoutput',  -- pgoutput,必须

       'debezium.slot.name' = 'pgtestflink'  -- 指定slot名称,必须

    );

    --结果表

    create table mysqltest ( day_time VARCHAR,

        total_gmv bigint,

        PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:mysql://172.18.11.224:3306/flinktest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',

        'table-name' = 'sync_test_1',

        'username' = 'root',

        'password' = '123456'

    );

    INSERT INTO mysqltest  

    SELECT day_time,total_gmv

    FROM pgtest ;

    这里要说明一下cdc和jdbc,cdc是实时捕获源表数据变化的,jdbc是sink表连接用的,所以要读哪张表用cdc连接器,写哪张表用jdbc连接器。 

    反过来mysql到postgres

    --源表

    create table mysqltest ( day_time VARCHAR,

        total_gmv bigint,

        PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

           'connector' = 'mysql-cdc',

           'hostname' = '172.18.11.224',

           'port' = '3306',

           'username' = 'root',

           'password' = '123456',

           'database-name' = 'flinktest',

           'table-name' = 'sync_test_1'

    );

    //结果表

    create table pgtest ( day_time VARCHAR,

        total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://localhost:5432/postgres',

        'table-name' = 'public.ball',

        'username' = 'postgres',

        'password' = '123456'

    );

    INSERT INTO pgtest  

    SELECT day_time,total_gmv

    FROM mysqltest ;

    注意:必须指定主键PRIMARY KEY (主键字段) NOT ENFORCED,与数据表中的主键要对应。

  • 相关阅读:
    未来各职业的人,都会涌入Python和AI大潮中,老教授深度解析
    利用RVB2601开发板的温湿度采样
    【JavaEE进阶系列 | 从小白到工程师】Java中的构造代码块和静态代码块详解
    Linux并发与竞争(一)
    新基建智慧铁路:高铁沿线综合视频监控及风险智能预警管理方案
    用LightningChart .NET 数据可视化控件制作多线程应用程序
    CSS必学:元素之间的空白与行内块的幽灵空白问题
    和力链携手纷享销客推动CRM业财一体化,引领大健康产业数智化发展
    B树、B+树与磁盘读取的关系
    10.1网站编写(Tomcat和servlet基础)
  • 原文地址:https://blog.csdn.net/letterss/article/details/126501054