• Flink CDC-MySQL CDC配置及DataStream API实现代码...可实现监控采集多个数据库的多个表


    MySQL CDC配置

    第一步: 启用binlog

    1. 检查MySQL的binlog是否已启用

    show variables like '%log_bin%';
    
    • 1

    2. 若未启用binlog

    1. 打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)
    2. 找到[mysqld]部分,添加如下配置
      log-bin=mysql-bin    # 指定二进制日志文件的名称前缀
      server-id=1          # 唯一标识MySQL服务器的数字
      expire_logs_days=30  # binlog日志过期时间(按实际情况配置)
      
      • 1
      • 2
      • 3
    3. 保存并关闭配置文件, 并重启MySQL服务使配置生效
      sudo systemctl restart mysqld
      
      • 1

    第二步: 设置binlog格式为row

    因为要监控表记录变更前后的具体数据, 需要将binlog格式设置为row.

    1. 确保MySQL的binlog格式设置为ROW

    show variables like '%binlog_format%';
    
    • 1

    2. 若未设置为row

    1. 打开MySQL配置文件my.cnf(MySQL安装目录的etc文件夹下)
    2. 找到[mysqld]部分,添加如下配置
      binlog_format=ROW
      
      • 1
    3. 保存并关闭配置文件, 并重启MySQL服务使配置生效
      sudo systemctl restart mysqld
      
      • 1

    第三步: 创建CDC用户

    创建一个具备合适权限的MySQL用户, 使得Debezium MySQL connector可以监控数据库的变化.

    • 创建MySQL用户, 用于Flink CDC连接到MySQL数据库

      CREATE USER 'flinkcdc'@'%' IDENTIFIED BY 'FlinkCDC_123456';
      
      • 1
    • 授予该用户适当的权限以访问要采集的数据库和表。

      GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkcdc' IDENTIFIED BY 'FlinkCDC_123456';
      
      • 1
    • 使权限生效

      FLUSH PRIVILEGES;
      
      • 1

    MySQL CDC DataStream API实现

    所使用软件的版本

    • java 1.8
    • Scala 2.11
    • Flink 1.14.2
    • Flink CDC 2.3.0
    • Source MySQL 5.7
    • Sink MySQL 5.7
    • jackson 2.10.2

    MySQL CDC DataStream API可实现一个job监控采集多个数据库、多个表.

    1. 定义MySqlSource

    //源数据库连接配置文件
    Properties dbProps = DbConfigUtil.loadConfig("mysql.properties");
    
    //Debezium配置
    Properties debeziumProps = new Properties();
    //decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string
    //precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)
    //以double值来表示它们,这可能会到值精度丢失
    //string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)
    debeziumProps.setProperty("decimal.handling.mode", "string");
    //Time、date和timestamps可以以不同的精度表示,包括:
    //adaptive_time_microseconds(默认值):精确地捕获date、datetime和timestamp的值,使用毫秒、微秒或纳秒精度值,具体取决于数据库列的类型,但 TIME 类型字段除外,它们始终以微秒表示。
    //adaptive(不建议使用):以数据库列类型为基础,精确地捕获时间和时间戳值,使用毫秒、微秒或纳秒精度值。
    //connect:总是使用 Kafka Connect 内置的 Time、Date 和 Timestamp 表示法表示时间和时间戳值,无论数据库列的精度如何,都使用毫秒精度。
    debeziumProps.setProperty("time.precision.mode", "connect");
    
    //MySQL CDC数据源
    MySqlSource<String> sourceFunction = MySqlSource.<String>builder()
            .hostname(dbProps.getProperty("host"))
            .port(Integer.parseInt(dbProps.getProperty("port")))
            .databaseList(dbProps.getProperty("database_list").split(","))
            .tableList(dbProps.getProperty("table_list").split(","))
            .username(dbProps.getProperty("username"))
            .password(dbProps.getProperty("password"))
            .connectionPoolSize(2)
            .serverTimeZone("Asia/Shanghai")
            .debeziumProperties(debeziumProps)
            .deserializer(new JsonDebeziumDeserializationSchema())
            .serverId("6001")
            .startupOptions(StartupOptions.initial())
            .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    2. 数据处理

    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    // 启用Checkpoint
    env.enableCheckpointing(60000);
    // 默认即为EXACTLY_ONCE。设置Checkpoint模式为EXACTLY_ONCE,每条记录在恢复的时候都是精确一次地处理的
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 设置状态后端
    env.setStateBackend(new HashMapStateBackend());
    // 设置Checkpoint状态存储系统及目录
    env.getCheckpointConfig().setCheckpointStorage("hdfs://ns/flink/checkpoint/mysql_cdc");
    // 两次Checkpoint之间的最小暂停时间是500 ms
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    // checkpoints必须在指定的时间内完成,否则被丢弃
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    //只允许checkpoint连续失败两次
    env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
    // 设置最大并行运行的Checkpoint数量
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // 在作业取消时保留外部检查点
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    // 启用非对齐Checkpoint,可以极大减少背压情况的下Checkpoint次数
    env.getCheckpointConfig().enableUnalignedCheckpoints();
    
    
    //获取数据源
    SingleOutputStreamOperator<String> dataStreamSource = env
            .addSource(sourceFunction)
            .uid("source-01").name("read-from-source");
    
    ObjectMapper mapper = new ObjectMapper();
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    mapper.configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
    
    //JSON字符串转JsonNode
    SingleOutputStreamOperator<JsonNode> dataStreamJsonNode = dataStreamSource
            .map(line -> mapper.readTree(line))
            .uid("map-01").name("source-to-JsonNode");
        
    
    // 从监控的多个表中过滤出'订单表', 并解析Json的after数据
    SingleOutputStreamOperator<OrderInfo> orderOperator = dataStreamJsonNode
      .filter(line -> "order_info".equalsIgnoreCase(line.get("source").get("table").asText()))
      .uid("order-info-filter-01").name("filter-order-info")
      .map(line -> line.get("after").toString())
      .uid("order-info-map-01").name("parse-order-info-after")
      .map(line -> mapper.readValue(line, OrderInfo.class))
      .uid("order-info-map-02").name("order-info-to-pojo");
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    3. sink到MySQL

    
    // 定义JdbcSink
    SinkFunction<OrderInfo> orderInfoSink = JdbcSink.sink(
            UPSERT_SQL,
            (JdbcStatementBuilder<OrderInfo>) (ps, order) -> new OrderInfoPreparedStatementSetter().setParams(ps, order),
            JdbcExecutionOptions.builder()
                    .withBatchSize(100)
                    .withBatchIntervalMs(2000)
                    .withMaxRetries(3)
                    .build(),
            JdbcSinkConnUtil.getConnOptions("sink-mysql.properties")
    );
    
    orderOperator.addSink(orderInfoSink);
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    参考

    1. https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-property-decimal-handling-mode
    2. https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/mysql-cdc.html
    3. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
  • 相关阅读:
    .NET的求复杂类型集合的差集、交集、并集
    【dgl学习】dgl处理图中的节点/边的属性/特征/类型
    代码随想录一一一链表一一一设计链表
    表单识别(五)——票据识别-论文研读:基于深度学习的票据识别系统设计与实现,卞飞飞(上)
    Java基本语法2
    rabbitMQ的Topic模式的生产者与消费者使用案例
    基于C语言设计的植物大战僵尸小游戏
    Magicodes.Pay已支持Volo Abp
    Kubernetes:(九)coredns(浪不动了)
    【DNS系列-K8S排错】busybox 中使用 nslookup 进行域名解析时好时坏
  • 原文地址:https://blog.csdn.net/lovetechlovelife/article/details/132817272