• 大数据(9j)FlinkCDC


    CDC概述

    • Change Data Capture【捕捉变更的数据】
      监测并捕获数据库的变动(数据或表的增删改…),按发生的顺序 完整地写到消息中间件
    CDC的种类基于查询基于 Binlog
    工具SqoopCanal、Maxwell、Debezium
    处理模式
    能否捕获所有变化
    延迟
    是否增加数据库压力

    Flink-CDC

    • Flink社区开发并开源了flink-cdc-connectors组件
      可直接从MySQL、PostgreSQL等数据库 读取全量和增量变化数据
    Maxwell
    Flink-CDC
    比较Maxwell和Flink-CDC方案
    MySQL之Binlog
    Kafka
    Flink
    MySQL之Binlog
    Flink

    Flink-CDC代码测试

    1、开启MySQL8 Binlog

    1、编辑MySQL配置

    vim /etc/my.cnf
    
    • 1

    2、添加如下内容

    server-id=1
    log-bin=mysql-bin
    binlog_format=row
    binlog-do-db=db1
    
    • 1
    • 2
    • 3
    • 4
    参数说明
    server-idMySQL主从复制时,主从之间每个实例都有独一无二的ID
    log-bin生成的日志文件的前缀
    binlog_formatbinlog格式
    binlog-do-db指定哪些库需要写到binlog;如果不配置,就会是所有库
    binlog_format参数值参数值说明空间占用数据一致性
    statement语句级:记录每次一执行写操作的语句如果用binlog进行数据恢复,执行时间不同可能会导致数据不一致
    row行级:记录每次操作后每行记录的变化绝对支持
    mixedstatement的升级版极端情况下仍会造成数据不一致

    3、重启MySQL

    sudo systemctl restart mysqld
    
    • 1

    4、检测配置是否成功

    mysql -uroot -e'SHOW variables LIKE "%log_bin%"' -p
    
    • 1

    2、MySQL数据准备

    1、被监控的MySQL数据

    DROP DATABASE IF EXISTS db1;
    CREATE DATABASE db1;
    CREATE TABLE db1.t
    (a INT PRIMARY KEY,b VARCHAR(255),c TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
    INSERT db1.t(a,b,c) VALUES (2,'ab','2022-10-24 00:00:00');
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、插入数据(测试时才执行)

    INSERT db1.t(a,b) VALUES (3,'bc');
    UPDATE db1.t SET a=2,b='cd' WHERE a=2;
    SELECT * FROM db1.t;
    
    • 1
    • 2
    • 3

    3、准备开发环境

    WIN10+JDK1.8+IDEA2021+创建Maven项目;pom.xml添加依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-javaartifactId>
            <version>1.13.6version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_2.12artifactId>
            <version>1.13.6version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-clients_2.12artifactId>
            <version>1.13.6version>
        dependency>
        
        <dependency>
            <groupId>com.ververicagroupId>
            <artifactId>flink-connector-mysql-cdcartifactId>
            <version>2.1.0version>
        dependency>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-table-planner-blink_2.12artifactId>
            <version>1.13.6version>
        dependency>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-table-api-java-bridge_2.12artifactId>
            <version>1.13.6version>
        dependency>
        
        <dependency>
            <groupId>com.alibabagroupId>
            <artifactId>fastjsonartifactId>
            <version>2.0.19version>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    4、Java代码

    4.1、DataStream式的示例

    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class FlinkStreamCDC {
        public static void main(String[] args) throws Exception {
            //TODO 1 创建流处理环境,设置并行度
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
            //TODO 2 创建Flink-MySQL-CDC数据源
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                    .hostname("主机地址")
                    .port(3306)
                    .username("root")
                    .password("密码")
                    //设置要捕获的库
                    .databaseList("db1")
                    //设置要捕获的表(库不能省略)
                    .tableList("db1.t")
                    //将接收到的SourceRecord反序列化为JSON字符串
                    .deserializer(new JsonDebeziumDeserializationSchema())
                    //启动策略
                    // initial:对监视的数据库表执行初始快照,并继续读取最新的binlog
                    // earliest:从binlog的开头读取数据
                    // latest:从binlog的末尾读取数据
                    // 还有specificOffset和timestamp,具体看源码即可……
                    .startupOptions(StartupOptions.initial())
                    .build();
            //TODO 3 读取数据并打印
            env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "sourceName").print();
            //TODO 4 执行
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    测试结果打印

    {"before":null,"after":{"a":2,"b":"ab","c":"2022-10-24T00:00:00Z"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993379274,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":0,"gtid":null,"file":"","pos":0,"row":0,"thread":null,"query":null},"op":"r","ts_ms":1669993379280,"transaction":null}
    
    //执行DML后
    
    {"before":null,"after":{"a":3,"b":"bc","c":"2022-12-02T15:09:55Z"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993795000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":1851,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1669993794980,"transaction":null}
    {"before":{"a":2,"b":"ab","c":"2022-10-23T16:00:00Z"},"after":{"a":2,"b":"cd","c":"2022-10-23T16:00:00Z"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1669993795000,"snapshot":"false","db":"db1","sequence":null,"table":"t","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2145,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1669993794987,"transaction":null}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    4.2、FlinkSQL示例

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    public class FlinkSqlCdc {
        public static void main(String[] args) throws Exception {
            // TODO 1. 准备环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            // TODO 2. 创建动态表
            tableEnv.executeSql("CREATE TABLE t (" +
                    " a INT," +
                    " b STRING," +
                    " c TIMESTAMP," +
                    " PRIMARY KEY(a) NOT ENFORCED" +
                    ") WITH (" +
                    " 'connector' = 'mysql-cdc'," +
                    " 'hostname' = '主机地址'," +
                    " 'port' = '3306'," +
                    " 'username' = 'root'," +
                    " 'password' = '密码'," +
                    " 'database-name' = 'db1'," +
                    " 'table-name' = 't'" +
                    ")");
            tableEnv.executeSql("SELECT * FROM t").print();
            // TODO 3. 执行
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    测试结果打印

    +----+----+-----+----------------------------+
    | op |  a |   b |                          c |
    +----+----+-----+----------------------------+
    | +I |  2 |  ab | 2022-10-24 00:00:00.000000 |
    
    //执行DML后
    
    | +I |  3 |  bc | 2022-12-02 15:15:14.000000 |
    | -U |  2 |  ab | 2022-10-23 16:00:00.000000 |
    | +U |  2 |  cd | 2022-10-23 16:00:00.000000 |
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    测试结果JSON格式一览(StartupOptions.initial)

    数据库的操作opbeforeafter
    insertcnull行数据
    update先d后c行数据行数据
    deleted行数据null

    1、对监视的数据库表执行初始快照

    -- 建库
    DROP DATABASE IF EXISTS db1;CREATE DATABASE db1;
    -- 建表
    CREATE TABLE db1.t(a INT PRIMARY KEY,b TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
    -- 插入
    INSERT db1.t(a,b) VALUES (1,'2022-10-24 00:00:00');
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    JSON

    {
    	"before": null,
    	"after": {
    		"a": 1,
    		"b": "2022-10-24T00:00:00Z"
    	},
    	"source": {
    		"version": "1.5.4.Final",
    		"connector": "mysql",
    		"name": "mysql_binlog_source",
    		"ts_ms": 1670656489808,
    		"snapshot": "false",
    		"db": "db1",
    		"sequence": null,
    		"table": "t",
    		"server_id": 0,
    		"gtid": null,
    		"file": "",
    		"pos": 0,
    		"row": 0,
    		"thread": null,
    		"query": null
    	},
    	"op": "r",
    	"ts_ms": 1670656489815,
    	"transaction": null
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    2、插入数据

    INSERT db1.t(a) VALUES (2);
    
    • 1

    JSON

    {
    	"before": null,
    	"after": {
    		"a": 2,
    		"b": "2022-12-10T07:15:52Z"
    	},
    	"source": {
    		"version": "1.5.4.Final",
    		"connector": "mysql",
    		"name": "mysql_binlog_source",
    		"ts_ms": 1670656552000,
    		"snapshot": "false",
    		"db": "db1",
    		"sequence": null,
    		"table": "t",
    		"server_id": 1,
    		"gtid": null,
    		"file": "mysql-bin.000001",
    		"pos": 5152,
    		"row": 0,
    		"thread": null,
    		"query": null
    	},
    	"op": "c",
    	"ts_ms": 1670656552743,
    	"transaction": null
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    3、更新数据

    UPDATE db1.t SET a=3 WHERE a=1;
    SELECT * FROM db1.t;
    
    • 1
    • 2

    JSON

    {
    	"before": {
    		"a": 1,
    		"b": "2022-10-23T16:00:00Z"
    	},
    	"after": null,
    	"source": {
    		"version": "1.5.4.Final",
    		"connector": "mysql",
    		"name": "mysql_binlog_source",
    		"ts_ms": 1670656602000,
    		"snapshot": "false",
    		"db": "db1",
    		"sequence": null,
    		"table": "t",
    		"server_id": 1,
    		"gtid": null,
    		"file": "mysql-bin.000001",
    		"pos": 5434,
    		"row": 0,
    		"thread": null,
    		"query": null
    	},
    	"op": "d",
    	"ts_ms": 1670656602253,
    	"transaction": null
    }
    
    
    {
    	"before": null,
    	"after": {
    		"a": 3,
    		"b": "2022-10-23T16:00:00Z"
    	},
    	"source": {
    		"version": "1.5.4.Final",
    		"connector": "mysql",
    		"name": "mysql_binlog_source",
    		"ts_ms": 1670656602000,
    		"snapshot": "false",
    		"db": "db1",
    		"sequence": null,
    		"table": "t",
    		"server_id": 1,
    		"gtid": null,
    		"file": "mysql-bin.000001",
    		"pos": 5434,
    		"row": 0,
    		"thread": null,
    		"query": null
    	},
    	"op": "c",
    	"ts_ms": 1670656602253,
    	"transaction": null
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    4、删除数据

    DELETE FROM db1.t WHERE a=3;
    
    • 1

    JSON

    {
    	"before": {
    		"a": 3,
    		"b": "2022-10-23T16:00:00Z"
    	},
    	"after": null,
    	"source": {
    		"version": "1.5.4.Final",
    		"connector": "mysql",
    		"name": "mysql_binlog_source",
    		"ts_ms": 1670656744000,
    		"snapshot": "false",
    		"db": "db1",
    		"sequence": null,
    		"table": "t",
    		"server_id": 1,
    		"gtid": null,
    		"file": "mysql-bin.000001",
    		"pos": 5717,
    		"row": 0,
    		"thread": null,
    		"query": null
    	},
    	"op": "d",
    	"ts_ms": 1670656744059,
    	"transaction": null
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
  • 相关阅读:
    51单片机自行车码表 速度里程计霍尔测速模拟电机设计
    ICPC2023合肥站:D. Balanced Array
    Blender:对模型着色
    论文阅读---CASCADING REINFORCEMENT LEARNING
    Apache Hudi 0.13.0版本重磅发布!
    【深度学习】实现基于MNIST数据集的TensorFlow/Keras深度学习案例
    (2022最新)Java毕业设计参考题目-题目新颖(值得收藏)
    一文读懂Python异常
    ARM——综合作业
    vue3 echarts实现k线
  • 原文地址:https://blog.csdn.net/Yellow_python/article/details/127896953