• 5 Flink CDC同步


    本文目标

    使用FLinkCDC实时同步源表和目标表。

    为了减少依赖,本文只用到了MySQL+FLinkCDC,源表和目标表都在MySQL内,FLink数据抽取和入库,没有经过Kafka

    部署启动

    相关程序

    flink-1.14.6-bin-scala_2.12.tgz #flink主程序
    flink-sql-connector-mysql-cdc-2.2.1.jar # 从MySQL获取binlog同步
    flink-connector-jdbc_2.12-1.14.6.jar # 目标端入库
    解压tgz,并将jar放入lib目录
    使用start-cluster.sh启动flink框架
    
    • 1
    • 2
    • 3
    • 4
    • 5

    测试过程

    -- 在MySQL创建t1表,做为数据源
    CREATE TABLE `t1`  (
      `f1` bigint(0) NOT NULL,
      `f2` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
      `f3` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0),
      PRIMARY KEY (`f1`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
    
    -- 在MySQL创建t1_cp表,作为同步目标端
    CREATE TABLE `t1_cp`  (
      `f1` bigint(0) NOT NULL,
      `f2` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL,
      `f3` datetime(0) NULL DEFAULT CURRENT_TIMESTAMP(0),
      PRIMARY KEY (`f1`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
    
    启动sql-client.sh,进入命令行
    -- 在flink创建映射表,注意connector源表用mysql-cdc目标表用jdbc
    CREATE TABLE t1 (
     f1 BIGINT NOT NULL,
     f2 STRING,
     f3 TIMESTAMP(0),
     PRIMARY KEY(f1) NOT ENFORCED
    ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = 'localhost',
     'port' = '6306',
     'username' = 'test',
     'password' = 'test',
     'database-name' = 'test',
     'table-name' = 't1'
    );
    
    CREATE TABLE t1_cp (
     f1 BIGINT NOT NULL,
     f2 STRING,
     f3 TIMESTAMP(0),
     PRIMARY KEY(f1) NOT ENFORCED
    ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:mysql://127.0.0.1:6306/test',
     'username' = 'test',
     'password' = 'test',
     'driver' = 'com.mysql.cj.jdbc.Driver',
     'table-name' = 't1_cp'
    );
    
    -- 查询源表数据,此时在MySQL内增删改几条记录,本处应该实时显示。
    SELECT f1, f2, f3 FROM t1;
    
    -- 直接同步
    Flink SQL> insert into t1_cp(f1, f2, f3) select f1, f2, f3 from t1;
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 685d277246a140e1d845e96a5b66f1d4
    Flink SQL> 
    -- 任务开启成功,此时到MySQL更改t1表数据,通过以上insert生成的job,会实时同步到t1_cp表。
    
    -- 只同步符合条件的数据
    Flink SQL> insert into t1_cp(f1, f2, f3) select f1, f2, f3 from t1 where f1>1;
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 685d277246a140e1d845e96a5b66f1d4
    Flink SQL> 
    -- 只同步主键f1大于1的记录。
    
    -- f2字段按照分隔符拆分
    Flink SQL> insert into t1_cp(f1, f2, f3) select f1, substr(f2, position('|' in f2)+1), f3 from t1 where f1>1;
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 685d277246a140e1d845e96a5b66f1d4
    Flink SQL> 
    -- 只同步主键f1大于1的记录,并且将f2字段按照"|"分割,丢弃第一个竖线之前的字符。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
  • 相关阅读:
    windows,linux,ssh免密登录
    CVPR'22 | 基于像素差异学习的视频高光检测算法及在视频广告中的应用
    电子科技大学编译原理复习笔记(四):程序语言的设计
    企微获客助手到底有哪些价值?
    STM32项目工程的搭建
    链接脚本(Linker Script)解析
    可扩展标记语言-----XML
    C++泛型编程--模版
    ORA-28001:the password has expired,Linux上修改Oracle密码
    Terminnal will be login out after 20 second
  • 原文地址:https://blog.csdn.net/hryyx/article/details/128166414