• Flink CDC MySQL同步MySQL错误记录


    0、相关Jar包

    https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.16/

    https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/

    或者从mvnrepository.com下载
    https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc

    https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc

    1、启动 Flink SQL

    [appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh
    
    • 1

    在这里插入图片描述

    2、新建源表

    问题1:Encountered “(”
    处理方法:去掉int(11),改为int

    Flink SQL> CREATE TABLE `t_user` (
    >   `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
    >   `did` int(11) DEFAULT NULL COMMENT 'dept id',
    >   `username` varchar(14) DEFAULT NULL,
    >   `add_time` datetime DEFAULT NULL,
    >   PRIMARY KEY (`uid`) NOT ENFORCED
    > ) WITH (
    >       'connector' = 'mysql-cdc',
    >       'hostname' = '192.25.34.2',
    >       'port' = '3306',
    >       'username' = '*******',
    >       'password' = '*******',
    >       'database-name' = 'test',
    >       'table-name' = 't_user'
    > );
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.sql.parser.impl.ParseException: Encountered "(" at line 2, column 12.
    Was expecting one of:
        "CONSTRAINT" ...
        "NOT" ...
        "NULL" ...
        "PRIMARY" ...
        "UNIQUE" ...
        "COMMENT" ...
        "METADATA" ...
        ")" ...
        "," ...
        "MULTISET" ...
        "ARRAY" ...
    
    Flink SQL> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    问题2:Encountered “AUTO_INCREMENT”
    处理方法:删除AUTO_INCREMENT

    Flink SQL> CREATE TABLE `t_user` (
    >   `uid` int NOT NULL AUTO_INCREMENT COMMENT 'user id',
    >   `did` int DEFAULT NULL COMMENT 'dept id',
    >   `username` varchar(14) DEFAULT NULL,
    >   `add_time` datetime DEFAULT NULL,
    >   PRIMARY KEY (`uid`) NOT ENFORCED
    > ) WITH (
    >       'connector' = 'mysql-cdc',
    >       'hostname' = '192.25.34.2',
    >       'port' = '3306',
    >       'username' = '*******',
    >       'password' = '*******',
    >       'database-name' = 'test',
    >       'table-name' = 't_user'
    > );
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.sql.parser.impl.ParseException: Encountered "AUTO_INCREMENT" at line 2, column 22.
    Was expecting one of:
        "CONSTRAINT" ...
        "PRIMARY" ...
        "UNIQUE" ...
        "COMMENT" ...
        "METADATA" ...
        ")" ...
        "," ...
        "MULTISET" ...
        "ARRAY" ...
    
    Flink SQL> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    问题3:Encountered “DEFAULT”
    处理方法:删去DEFAULT

    Flink SQL> CREATE TABLE `t_user` (
    >   `uid` int NOT NULL COMMENT 'user id',
    >   `did` int DEFAULT NULL COMMENT 'dept id',
    >   `username` varchar(14) DEFAULT NULL,
    >   `add_time` datetime DEFAULT NULL,
    >   PRIMARY KEY (`uid`) NOT ENFORCED
    > ) WITH (
    >       'connector' = 'mysql-cdc',
    >       'hostname' = '192.25.34.2',
    >       'port' = '3306',
    >       'username' = '*******',
    >       'password' = '*******',
    >       'database-name' = 'test',
    >       'table-name' = 't_user'
    > );
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.sql.parser.impl.ParseException: Encountered "DEFAULT" at line 3, column 13.
    Was expecting one of:
        "CONSTRAINT" ...
        "NOT" ...
        "NULL" ...
        "PRIMARY" ...
        "UNIQUE" ...
        "COMMENT" ...
        "METADATA" ...
        ")" ...
        "," ...
        "MULTISET" ...
        "ARRAY" ...
    
    Flink SQL> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    问题4:Unknown identifier ‘datetime
    处理方法:改用 TIMESTAMP(3)

    Flink SQL> CREATE TABLE `t_user` (
    >   `uid` int NOT NULL COMMENT 'user id',
    >   `did` int COMMENT 'dept id',
    >   `username` varchar(14) ,
    >   `add_time` datetime ,
    >   PRIMARY KEY (`uid`) NOT ENFORCED
    > ) WITH (
    >       'connector' = 'mysql-cdc',
    >       'hostname' = '192.25.34.2',
    >       'port' = '3306',
    >       'username' = '*******',
    >       'password' = '*******',
    >       'database-name' = 'test',
    >       'table-name' = 't_user'
    > );
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'datetime'
    
    Flink SQL> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    创建成功:

    Flink SQL> CREATE TABLE `t_user` (
    >   `uid` int NOT NULL COMMENT 'user id',
    >   `did` int COMMENT 'dept id',
    >   `username` varchar(14) ,
    >   `add_time` TIMESTAMP(3),
    >   PRIMARY KEY (`uid`) NOT ENFORCED
    > ) WITH (
    >       'connector' = 'mysql-cdc',
    >       'hostname' = '192.25.34.2',
    >       'port' = '3306',
    >       'username' = '*******',
    >       'password' = '*******',
    >       'database-name' = 'test',
    >       'table-name' = 't_user'
    > );
    [INFO] Execute statement succeed.
    
    Flink SQL> 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    3、创建目标表

    Flink SQL> CREATE TABLE `ods_t_user` (
    >     `uid` int NOT NULL COMMENT 'user id',
    >     `did` int COMMENT 'dept id',
    >     `username` varchar(14) ,
    >     `add_time` TIMESTAMP(3),
    >     PRIMARY KEY (`uid`) NOT ENFORCED
    >  ) WITH (
    >      'connector' = 'jdbc',
    >      'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    >      'driver' = 'com.mysql.cj.jdbc.Driver',
    >      'username' = '*******',
    >      'password' = '*******',
    >      'table-name' = 'ods_t_user'
    > );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    4、将源表加载到目标表

    错误1:Connector ‘mysql-cdc’ can only be used as a source. It cannot be used as a sink.

    Flink SQL> insert into t_user select * from ods_t_user;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.ValidationException: Connector 'mysql-cdc' can only be used as a source. It cannot be used as a sink.
    
    Flink SQL> 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    原因:方向搞反了,插入表应该是目标表

    Flink SQL> insert into ods_t_user select * from t_user;
    [ERROR] Could not execute SQL statement. Reason:
    java.io.StreamCorruptedException: unexpected block data
    
    Flink SQL> 
    
    • 1
    • 2
    • 3
    • 4
    • 5

    错误2:unexpected block data
    解决办法:
    (1)更新jar包如下

    [appuser@whtpjfscpt01 flink-1.17.1]$ ll lib/
    total 223320
    -rw-r--r-- 1 appuser appuser    196491 May 19 18:56 flink-cep-1.17.1.jar
    -rw-r--r-- 1 appuser appuser    542620 May 19 18:59 flink-connector-files-1.17.1.jar
    -rw-r--r-- 1 appuser appuser    266420 Sep 25 14:21 flink-connector-jdbc-3.1.1-1.17.jar
    -rw-r--r-- 1 appuser appuser    345711 Sep 25 15:45 flink-connector-mysql-cdc-2.4.1.jar
    -rw-r--r-- 1 appuser appuser    102472 May 19 19:02 flink-csv-1.17.1.jar
    -rw-r--r-- 1 appuser appuser 135975541 May 19 19:13 flink-dist-1.17.1.jar
    -rw-r--r-- 1 appuser appuser   8452171 Sep 19 10:20 flink-doris-connector-1.17-1.4.0.jar
    -rw-r--r-- 1 appuser appuser    180248 May 19 19:02 flink-json-1.17.1.jar
    -rw-r--r-- 1 appuser appuser  21043319 May 19 19:12 flink-scala_2.12-1.17.1.jar
    -rw-r--r-- 1 appuser appuser  15407424 May 19 19:13 flink-table-api-java-uber-1.17.1.jar
    -rw-r--r-- 1 appuser appuser  38191226 May 19 19:08 flink-table-planner-loader-1.17.1.jar
    -rw-r--r-- 1 appuser appuser   3146210 May 19 18:56 flink-table-runtime-1.17.1.jar
    -rw-r--r-- 1 appuser appuser    208006 May 17 18:07 log4j-1.2-api-2.17.1.jar
    -rw-r--r-- 1 appuser appuser    301872 May 17 18:07 log4j-api-2.17.1.jar
    -rw-r--r-- 1 appuser appuser   1790452 May 17 18:07 log4j-core-2.17.1.jar
    -rw-r--r-- 1 appuser appuser     24279 May 17 18:07 log4j-slf4j-impl-2.17.1.jar
    -rw-r--r-- 1 appuser appuser   2462364 Sep 19 11:30 mysql-connector-java-8.0.26.jar
    [appuser@whtpjfscpt01 flink-1.17.1]$
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    (2)重启flink

    [appuser@whtpjfscpt01 flink-1.17.1]$ bin/stop-cluster.sh 
    Stopping taskexecutor daemon (pid: 41993) on host whtpjfscpt01.
    Stopping standalonesession daemon (pid: 41597) on host whtpjfscpt01.
    [appuser@whtpjfscpt01 flink-1.17.1]$ bin/start-cluster.sh 
    Starting cluster.
    Starting standalonesession daemon on host whtpjfscpt01.
    Starting taskexecutor daemon on host whtpjfscpt01.
    [appuser@whtpjfscpt01 flink-1.17.1]$ bin/sql-client.sh
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    (3)重新执行

    Flink SQL> SET execution.checkpointing.interval = 3s;
    [INFO] Execute statement succeed.
    
    Flink SQL> CREATE TABLE `t_user` (
    >     `uid` int NOT NULL COMMENT 'user id',
    >     `did` int COMMENT 'dept id',
    >     `username` varchar(14) ,
    >     `add_time` TIMESTAMP(3),
    >     PRIMARY KEY (`uid`) NOT ENFORCED
    > ) WITH (
    >       'connector' = 'mysql-cdc',
    >       'hostname' = '192.25.34.2',
    >       'port' = '3306',
    >       'username' = '*******',
    >       'password' = '*******',
    >       'database-name' = 'test',
    >       'table-name' = 't_user'
    >  );
    [INFO] Execute statement succeed.
    
    Flink SQL> CREATE TABLE `ods_t_user` (
    >     `uid` int NOT NULL COMMENT 'user id',
    >     `did` int COMMENT 'dept id',
    >     `username` varchar(14) ,
    >     `add_time` TIMESTAMP(3),
    >     PRIMARY KEY (`uid`) NOT ENFORCED
    >  ) WITH (
    >      'connector' = 'jdbc',
    >      'url' = 'jdbc:mysql://192.25.34.2:3306/demo?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=UTC',
    >      'driver' = 'com.mysql.cj.jdbc.Driver',
    >      'username' = '*******',
    >      'password' = '*******',
    >      'table-name' = 'ods_t_user'
    > );
    [INFO] Execute statement succeed.
    
    Flink SQL>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    (4)成功执行

    Flink SQL> insert into ods_t_user select * from t_user;
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: c2e69d061f3777c031b0acb4ec03d13a
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    错误3:无目标表
    在这里插入图片描述

     CREATE TABLE demo.ods_t_user (
      `uid` int(11) NOT NULL AUTO_INCREMENT COMMENT 'user id',
      `did` int(11) DEFAULT NULL COMMENT 'dept id',
      `username` varchar(14) DEFAULT NULL,
       `add_time` datetime DEFAULT NULL,
      PRIMARY KEY (`uid`) 
    ) 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述
    源表添加新纪录

    INSERT INTO test.t_user(did,username)values('3','test'); 
    
    • 1

    目标表自动同步数据
    在这里插入图片描述

  • 相关阅读:
    React18的useEffect会执行两次
    排序算法之归并排序
    数据结构与算法之排序: 冒泡排序 (Javascript版)
    JAVA面试题总结基础篇(三)
    玩转MySQL:带你认识什么是索引
    elementUI textarea可自适应文本高度的文本域
    MXNet详细介绍,MXNet是什么
    C专家编程 第7章 对内存的思考 7.6 内存泄漏
    工作中总结的30个常用Linux指令,实在记不住就别硬记了,看这篇就够了
    【一起学Rust | 设计模式】习惯语法——默认特质、集合智能指针、析构函数
  • 原文地址:https://blog.csdn.net/chengyuqiang/article/details/133275963