• 【Flink CDC(一)】实现mysql整表与增量读取


    MySQL CDC 连接器允许从 MySQL 数据库读取快照数据(比如:flink任务消费时刻的整表数据)和增量数据。本文描述了如何设置 MySQL CDC 连接器来对 MySQL 数据库运行 SQL 查询。

    本篇只关注mysql整表与增量读取的实现,对于并发读取等能力后续再探索。

     

    一. 运行前准备

    1. 依赖

    1.1. Maven dependency

    <dependency>
      <groupId>com.ververicagroupId>
      <artifactId>flink-connector-mysql-cdcartifactId>
      
      <version>2.4.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

     

    1.2. SQL Client JAR(推荐)

    下载 flink-sql-connector-mysql-cdc-2.4.0.jar/lib/ 目录下。

     

    2. 配置 MySQL 服务器(必须)

    你必须定义一个 MySQL 用户,该用户对 MySQL CDC 连接器监视的所有数据库都应该具有所需的权限。

    # 创建用户
    mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
    
    # 赋权
    mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    
    # 刷新权限
    mysql> FLUSH PRIVILEGES;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    注意:

    scan.incremental.snapshot.enabled 参数已启用时(默认情况下已启用)时,不再需要授予 reload 权限。

     

    二. 功能说明

    1. 启动模式

    配置选项scan.startup.mode指定 MySQL CDC 使用者的启动模式。有效枚举包括:

    • initial (默认):在第一次启动时对受监视的数据库表执行初始快照,并继续读取最新的 binlog

    • earliest-offset:跳过快照阶段,从可读取的最早 binlog 位点开始读取

    • latest-offset:首次启动时,从不对受监视的数据库表执行快照, 连接器仅从 binlog
      的结尾处开始读取,这意味着连接器只能读取在连接器启动之后的数据更改

    • specific-offset:跳过快照阶段,从指定的 binlog 位点开始读取。位点可通过 binlog 文件名和位置指定,或者在 GTID 在集群上启用时通过 GTID 集合指定。

    • timestamp:跳过快照阶段,从指定的时间戳开始读取 binlog 事件。

    
    
    MySQLSource.builder()
        .startupOptions(StartupOptions.earliest()) // 从最早位点启动
        .startupOptions(StartupOptions.latest()) // 从最晚位点启动
        .startupOptions(StartupOptions.specificOffset("mysql-bin.000003", 4L) // 从指定 binlog 文件名和位置启动
        .startupOptions(StartupOptions.specificOffset("24DA167-0C0C-11E8-8442-00059A3C7B00:1-19")) // 从 GTID 集合启动
        .startupOptions(StartupOptions.timestamp(1667232000000L) // 从时间戳启动
        ...
        .build()
    
    
    
    
    CREATE TABLE mysql_source (...) WITH (
        'connector' = 'mysql-cdc',
        'scan.startup.mode' = 'earliest-offset', -- 从最早位点启动
        'scan.startup.mode' = 'latest-offset', -- 从最晚位点启动
        'scan.startup.mode' = 'specific-offset', -- 从特定位点启动
     
        'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位点启动模式下指定 binlog 文件名
        'scan.startup.specific-offset.pos' = '4', -- 在特定位点启动模式下指定 binlog 位置
        'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位点启动模式下指定 GTID 集合
    
        'scan.startup.mode' = 'timestamp', -- 从特定位点启动
        'scan.startup.timestamp-millis' = '1667232000000' -- 在时间戳启动模式下指定启动时间戳
        ...
    )
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

     

    2. 全量阶段支持 checkpoint

    增量快照读取提供了在区块级别执行检查点的能力。它使用新的快照读取机制解决了以前版本中的检查点超时问题。

     

    3. 关于无主键表

    从2.4.0 版本开始支持无主键表,使用无主键表必须设置 scan.incremental.snapshot.chunk.key-column,且只能选择非空类型的一个字段。

    在使用无主键表时,需要注意以下两种情况。

    1. 配置 scan.incremental.snapshot.chunk.key-column 时,如果表中存在索引,请尽量使用索引中的列来加快 select 速度。

    2. 无主键表的处理语义由 scan.incremental.snapshot.chunk.key-column 指定的列的行为决定:

    • 如果指定的列不存在更新操作,此时可以保证 Exactly once 语义。
    • 如果指定的列存在更新操作,此时只能保证 At least once 语义。但可以结合下游,通过指定下游主键,结合幂等性操作来保证数据的正确性。

     

    Exactly-Once 处理

    MySQL CDC 连接器是一个 Flink Source 连接器,它将首先读取表快照块,然后继续读取 binlog, 无论是在快照阶段还是读取 binlog 阶段,MySQL CDC 连接器都会在处理时准确读取数据,即使任务出现了故障。

     

    三. 实战

    1. 实现mysql整表与增量表同步

    -- 'scan.startup.mode'= 'initial' 
    -- 
    CREATE TABLE tjy_sql1  
    (  
      `id` int,  
      `name` string,  
      `face` string  
     ,PRIMARY KEY(id) NOT ENFORCED  
    ) WITH (  
            'connector' = 'mysql-cdc',  
            'hostname' = 'xxx',  
            'port' = '3306',  
            'username' = 'middle_test',  
            'password' = '123456',  
            'database-name' = 'middle_test',  
            'table-name' = 'tjy_fortest1'  
           -- ,'scan.incremental.snapshot.enabled' = 'false'  
           --  initial: 默认值,全表同步,然后进行增量同步;
           --  'scan.startup.mode'= 'initial'  
           -- 'debezium.snapshot.mode' = 'initial'      );  
      
      
     CREATE TABLE tjy_sql1_sink  
     (  
      `id` int,  
      `name` string,  
      `face` string  
      ,PRIMARY KEY(id) NOT ENFORCED  
     ) WITH (  
               'connector' = 'mysql-x',  
               'url' = 'jdbc:mysql://xxx:3306/middle_test?useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true',  
               'username' = 'middle_test',  
               'password' = '123456',  
               'table-name' = 'flink_type',  
               'table-name' = 'tjy_fortest2'  
           );  
      
      
    insert into tjy_sql1_sink select * from tjy_sql1;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

     

    FAQ

    相关问题:https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)

    可能涉及到的问题

    在这里插入图片描述

     

    参考:
    官网:https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mysql-cdc%28ZH%29.html

  • 相关阅读:
    GO微服务实战第三十一节 案例:如何在微服务中集成 Zipkin 组件?
    fastadmin/thinkPHPQueue消息队列详细教程
    Chapter5.1:线性系统的频域分析法
    牛血清白蛋白/人血清白蛋白/卵清白蛋白/小鼠血清白蛋白纳米粒改性Angiopep血管肽(2022已更新)
    文本挖掘day6 基于文本挖掘的化工事故致因网络分析
    自学Java很困难?那是你没找到方法
    OpenCV图像处理学习九,双边滤波器 (Bilateral Filter)和中位数滤波器 (Median Filter)
    easyexcel和poi版本冲突报错深入解析v2
    form组件的封装(element ui ) 简单版本
    Arduino开发实例-DIY电能表
  • 原文地址:https://blog.csdn.net/hiliang521/article/details/136299132