• flinksql postgres到mysql案例


    1.flink版本我用的是1.13.6版本,注意flink版本要与驱动包版本相对应,版本不对应会导致任务启动失败,1.13.6版本对应驱动如下。

    2.postgres数据库和mysql数据库我都是用docker搭建的,搭建postgres数据库参考:docker部署postgres数据库_今朝花落悲颜色的博客-CSDN博客

    搭建好了一定要修改postgres.conf配置文件,在挂载目录/docker/postgresql/data/下面找到postgres.conf,修改wal_level=logical,然后重启postgres。

     mysql表结构sql

    CREATE TABLE `sync_test_1` (
      `total_gmv` bigint(20) DEFAULT NULL,
      `day_time` varchar(255) COLLATE utf8mb4_bin NOT NULL,
      PRIMARY KEY (`day_time`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

    postgres表结构

    CREATE TABLE "public"."ball" (
      "total_gmv" int8,
      "day_time" varchar(32) COLLATE "pg_catalog"."default"
    );

    ALTER TABLE "public"."ball" ADD CONSTRAINT "ball_pkey" PRIMARY KEY ("day_time");

    Flink ddl语句postgres实时同步到mysql如下

    --源表

    CREATE TABLE pgtest (

      day_time VARCHAR,

        total_gmv bigint,

        PRIMARY KEY (day_time) NOT ENFORCED  -- 如果要同步的数据库表定义了主键, 则这里也需要定义

    ) WITH (

      'connector' = 'postgres-cdc',  -- 必须为 'postgres-cdc'

      'hostname' = 'localhost',     -- 数据库的 IP

      'port' = '5432',     -- 数据库的访问端口

      'username' = 'postgres',           -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例)

      'password' = '123456',    -- 数据库访问使用的密码

      'database-name' = 'postgres',  -- 需要同步的数据库名

      'schema-name' = 'public',      -- 需要同步的数据库模式 (Schema)

      'table-name' = 'ball' ,      -- 需要同步的数据表名

      'decoding.plugin.name' = 'pgoutput',  -- pgoutput,必须

       'debezium.slot.name' = 'pgtestflink'  -- 指定slot名称,必须

    );

    --结果表

    create table mysqltest ( day_time VARCHAR,

        total_gmv bigint,

        PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:mysql://172.18.11.224:3306/flinktest?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',

        'table-name' = 'sync_test_1',

        'username' = 'root',

        'password' = '123456'

    );

    INSERT INTO mysqltest  

    SELECT day_time,total_gmv

    FROM pgtest ;

    这里要说明一下cdc和jdbc,cdc是实时捕获源表数据变化的,jdbc是sink表连接用的,所以要读哪张表用cdc连接器,写哪张表用jdbc连接器。 

    反过来mysql到postgres

    --源表

    create table mysqltest ( day_time VARCHAR,

        total_gmv bigint,

        PRIMARY KEY (day_time) NOT ENFORCED ) WITH (

           'connector' = 'mysql-cdc',

           'hostname' = '172.18.11.224',

           'port' = '3306',

           'username' = 'root',

           'password' = '123456',

           'database-name' = 'flinktest',

           'table-name' = 'sync_test_1'

    );

    //结果表

    create table pgtest ( day_time VARCHAR,

        total_gmv bigint,PRIMARY KEY (day_time) NOT ENFORCED) WITH (

        'connector' = 'jdbc',

        'url' = 'jdbc:postgresql://localhost:5432/postgres',

        'table-name' = 'public.ball',

        'username' = 'postgres',

        'password' = '123456'

    );

    INSERT INTO pgtest  

    SELECT day_time,total_gmv

    FROM mysqltest ;

    注意:必须指定主键PRIMARY KEY (主键字段) NOT ENFORCED,与数据表中的主键要对应。

  • 相关阅读:
    Linux应用 防止程序重复发起
    动态内存开辟(上)
    02_Alibaba微服务组件Nacos注册中心
    四种质数筛选算法总结(c++)
    acwing算法提高之数据结构--树状数组
    【28-业务开发-基础业务-属性管理-SKU和SPU基本概念-SKU和SPU关联关系-属性实体之间的关联关系-批量菜单创建】
    32.JavaScript类数组(Array-like)和可迭代对象(Iterable-object)的实现原理
    Nomad 系列-Nomad 挂载存储卷
    御神楽的学习记录之基于FPGA的AHT10温湿度数据采集
    Kotlin高仿微信-第35篇-支付-二维码收款(二维码)
  • 原文地址:https://blog.csdn.net/letterss/article/details/126501054