• 一文进入Flink CDC 的世界


    前言

    CDC有两种方式,一种是离线的,一种是实时的,也就是一种是基于查询的,一种是Binlog的这种方式。

    为什么要学FlinkCDC

    我们用传统的CDC工具检测到一个数据后,我们要对数据进行计算,常规的方案是采用CDC工具将数据采集到消息队列中,用Spark 或者 Flink 进行计算,加工, 有了Flink CDC 我们可以读取数据和加工数据用Flink一起完成,这就是学习Flink CDC 的一个根本原因。

    什么是CDC

    CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

    CDC 的种类

    CDC 主要分为基于查询和基于 Binlog 两种方式,

    在这里插入图片描述

    Flink CDC

    Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的 source 组件。

    开源地址:https://github.com/ververica/flink-cdc-connectors

    Flink CDC 案例实操

    依赖准备

    <dependencies>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-javaartifactId>
                <version>1.12.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-java_2.12artifactId>
                <version>1.12.0version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-clients_2.12artifactId>
                <version>1.12.0version>
            dependency>
            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-clientartifactId>
                <version>3.1.3version>
            dependency>
            <dependency>
                <groupId>mysqlgroupId>
                <artifactId>mysql-connector-javaartifactId>
                <version>5.1.49version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-planner-blink_2.12artifactId>
                <version>1.12.0version>
            dependency>
            <dependency>
                <groupId>com.ververicagroupId>
                <artifactId>flink-connector-mysql-cdcartifactId>
                <version>2.0.0version>
            dependency>
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.75version>
            dependency>
        dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-assembly-pluginartifactId>
                    <version>3.0.0version>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependenciesdescriptorRef>
                        descriptorRefs>
                    configuration>
                    <executions>
                        <execution>
                            <id>make-assemblyid>
                            <phase>packagephase>
                            <goals>
                                <goal>singlegoal>
                            goals>
                        execution>
                    executions>
                plugin>
            plugins>
        build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    编码(DataStream)

    package com.bigdata.cdc;
    
    import com.ververica.cdc.connectors.mysql.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import com.ververica.cdc.debezium.DebeziumSourceFunction;
    import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class FlinkStreamCDC {
        public static void main(String[] args) throws Exception {
    
            //创建流处理执行环境
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
    
            //设置并行度
            env.setParallelism( 1 );
    
            //创建 Flink-MySQL-CDC 的 Source
            DebeziumSourceFunction<String> mysqlSource = MySqlSource
                    .<String>builder()
                    .hostname( "hadoop102" ) //主机名
                    .port( 3306 ) //端口号
                    .username( "root" ) //用户名
                    .password( "000000" ) //密码
                    .databaseList( "cdc_test" ) //数据库名
                    .tableList( "cdc_test.user_info" ) //表名
                    .deserializer( new StringDebeziumDeserializationSchema() ) // 反序列化
                    .startupOptions( StartupOptions.initial() )  //初始化 (拍快照)
                    .build();
    
            //添加数据源
            DataStreamSource<String> dataStream = env.addSource( mysqlSource );
    
            //做一个打印
            dataStream.print();
    
            //执行
            env.execute( "Flink_CDD" );
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    本地测试

    本地测试的前提得有环境,刚刚在编码中测试的数据库是 cdc_test 表名是user_info,我们现在得把它们创建出来,但是前提是,一定要在MySQL 配置文件中,添加binlog 这样才能有效的进行测试。

    配置MySQL中的binlog

    进入MySQL配置文件 my.cnf

    [root@hadoop102 ~]# vim /etc/my.cnf
    
    • 1

    将如下内容添加 [mysqld] 下中

    #binlog日志名称前缀
    log-bin=mysql-bin
    ##默认值未0,如果使用默认值则不能和从节点通信,这个值的区间是:1到(2^32)-1
    server-id=1
    binlog-do-db=cdc_test
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    重启MySQL服务

    [root@hadoop102 ~]# systemctl restart mysqld
    
    • 1

    创建数据库,建表,添加一个语句

    create database cdc_test;
    
    use cdc_test;
    
    create table user_info(
    	id int,
    	name varchar(200),
    	age varchar(200)
    );
    
    insert into user_info values(1,"张三","24");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    执行代码查看

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v8MfxJcH-1661149264947)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302183745037.png)]

    下面添加一个数据

    insert into user_info values(2,"李四","24");
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GpojriCW-1661149264949)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302183836996.png)]

    下面修改数据,将 年龄 24 改成 26

     update user_info set age = "26" where id = 2;
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iDzopbHI-1661149264953)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302184057021.png)]

    添加王五 并删除

    insert into user_info values(3,"王五","23");
    
    delete from user_info where id = 3;
    
    • 1
    • 2
    • 3

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FRNW9PCd-1661149264955)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302184315253.png)]

    假如说,我们这个挂掉了,再次开启的时候,它会不会读取之前的状态呢,就是说,当前我们有张三和和李四,那么会不会将它们读取到呢?

    其实还是能读取到的,只是以前的操作看不到了,op 也变成了 r ,还是能将之前的数据展示出来

    集群测试

    代码中添加如下配置

    /**
             * Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,
             * 需要从 Checkpoint 或者 Savepoint 启动程序
             */
            //开启 Checkpoint,每隔 5 秒钟做一次 CK
            env.enableCheckpointing(5000L);
            //指定 CK 的一致性语义
            env.getCheckpointConfig().setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE);
            //设置CK 的超时时间 10秒钟
            env.getCheckpointConfig().setCheckpointTimeout( 10000L );
            //并发检查点尝试的最大次数。
            env.getCheckpointConfig().setMaxConcurrentCheckpoints( 1 );
    
            //设置状态后端
            env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc_test/ck"));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    进行打包,将打包后的jar 上传到Linux 下的 Flink 目录中

    启动集群测试之前,先启动Hadoop集群和Flink集群

    Flink 集群

    start-cluster.sh
    
    • 1

    输入如下命令

    bin/flink run -m hadoop102:8081 -c com.bigdata.cdc.FlinkDreamCDC_Cluster ./Flink_CDC-1.0-SNAPSHOT-jar-with-dependencies.jar 
    
    • 1

    数据库中的数据

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-L0kVjQtE-1661149264957)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302221144455.png)]

    Flink 集群页面如下,读取两组数据

    在这里插入图片描述

    开始尝试添加一组数据

    在这里插入图片描述

    此时Flink 集群中读取的数据 如下:

    在这里插入图片描述

    断点续传做数据的恢复,输入如下命令

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vPDNOcmF-1661149264965)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302222248369.png)]

    flink savepoint 8876a5be5f6e0a754aa662bff41fc03d hdfs://hadoop102:8020/cdc_test/savepoint
    
    • 1

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cosu0B9O-1661149264967)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302222855707.png)]

    此时我杀掉Flink当年前的任务,并且做一下 增加 ,修改以及删除的操作。

    添加人员张栋,修改欢欢年龄20,删除姓名为李四的人

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9Di9Ed62-1661149264968)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220302223718649.png)]

    在这里插入图片描述

    开始再次启动Flink 查看,断电传续功能

    bin/flink run -m hadoop102:8081 -s hdfs://hadoop102:8020/cdc_test/savepoint/savepoint-8876a5-aa9882ca253c -c com.bigdata.cdc.FlinkDreamCDC_Cluster ./Flink_CDC-1.0-SNAPSHOT-jar-with-dependencies.jar
    
    • 1

    注意:-s 后面加入的是上述断点传续命令中如下图片中圈起来的模块。

    在这里插入图片描述

    结果如下:

    在这里插入图片描述

    前6行是原有的功能,从第8行开始,操作的数据都是在Flink 挂掉之后操作的,添加了 张呆呆,将欢欢年龄改成了20,又删除了李四,这个就是断点续传功能

    其他参数测试

    //创建 Flink-MySQL-CDC 的 Source
            DebeziumSourceFunction<String> mysqlSource = MySqlSource
                    .<String>builder()
                    .hostname( "hadoop102" ) //主机名
                    .port( 3306 ) //端口号
                    .username( "root" ) //用户名
                    .password( "000000" ) //密码
                    .databaseList( "cdc_test" ) //数据库名
                    .tableList( "cdc_test.user_info" ) //表名
                    .deserializer( new StringDebeziumDeserializationSchema() ) // 反序列化
                    .startupOptions( StartupOptions.initial() )  //初始化 (拍快照)
                    .build();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    1,当我们把表名注销掉之后,运行时,依然也能看到信息,说明,只要有数据库名称,哪怕是不写上表的名字,依然也能监控表中的信息

    2,如果 参数是 startupOptions(StartupOptions.latest()); 那么不会输出历史信息,只会讲再次操作的信息展示出来。

    前言

    FlinkCDC 用的是2.0版本的,但是2.0版本不支持1.12版本的FlinkSQL , 只支持1.13版本的FlinkSQL,如果要使用FlinkSQL 编程 CDC 的话,需要引入1.13版本的Flink

    在这里插入图片描述

    依赖准备

    <properties>
            <flink-version>1.13.0flink-version>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-javaartifactId>
                <version>${flink-version}version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-java_2.12artifactId>
                <version>${flink-version}version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-clients_2.12artifactId>
                <version>${flink-version}version>
            dependency>
            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-clientartifactId>
                <version>3.1.3version>
            dependency>
            <dependency>
                <groupId>mysqlgroupId>
                <artifactId>mysql-connector-javaartifactId>
                <version>5.1.27version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-planner-blink_2.12artifactId>
                <version>${flink-version}version>
            dependency>
            <dependency>
                <groupId>com.ververicagroupId>
                <artifactId>flink-connector-mysql-cdcartifactId>
                <version>2.0.0version>
            dependency>
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.75version>
            dependency>
        dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-assembly-pluginartifactId>
                    <version>3.0.0version>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependenciesdescriptorRef>
                        descriptorRefs>
                    configuration>
                    <executions>
                        <execution>
                            <id>make-assemblyid>
                            <phase>packagephase>
                            <goals>
                                <goal>singlegoal>
                            goals>
                        execution>
                    executions>
                plugin>
            plugins>
        build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-B58his7i-1661149264977)(C:\Users\栾昊\AppData\Roaming\Typora\typora-user-images\image-20220303161834107.png)]

    scan.startup.mode 这个参数只有两个值 "initial" and "latest-offset". (初始化),initial 代表 初始化拍快照,lateset-offset 代表不拍快照

    官网是有的,想配置相应的参数参考官网:

    https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html

    编码(FlinkSQL

    package com.bigdata.cdc;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQLCDC {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = 		 		
            			StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setParallelism( 1 );
            //创建表环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create( env );
    
            //用FlinkSQL 构建CDC
            tableEnv.executeSql( "CREATE TABLE user_info(" +
                    "id INT," +
                    "name STRING," +
                    "age STRING" +
                    ") WITH (" +
                    "'connector' = 'mysql-cdc'," +
                    "'scan.startup.mode' = 'latest-offset'," +
                    "'hostname' = 'hadoop102'," +
                    "'port' = '3306'," +
                    "'username' = 'root'," +
                    "'password' = '000000'," +
                    "'database-name' = 'cdc_test'," +
                    "'table-name' = 'user_info'," +
                    "'scan.incremental.snapshot.enabled' = 'false'" +
                    ")"
            );
    
    
            //查询输出并转换流输出
            Table table = tableEnv.sqlQuery( "select * from user_info" );
            DataStream<Tuple2<Boolean, Row>> rowDataStream = 
            					tableEnv.toRetractStream( table, Row.class );
            rowDataStream.print();
    
            //执行
            env.execute( "Job" );
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加’scan.incremental.snapshot.enabled’ = 'false’否则会报错:

    [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'

    自定义反序列化器

    1,实现DebeziumDeserializationSchema接口

    2,重写deserialize,getProducedType两个方法

    3,进入StringDebeziumDeserializationSchema类中将getProducedType返回值写进刚刚重写的getProducedType 方法中

    如果用官方的序列化的话,打印的数据是如下这样子的。

    SourceRecord{
    	sourcePartition={server=mysql_binlog_source}, 
    	sourceOffset={ts_sec=1646306260, file=mysql-bin.000002,pos=443,snapshot=true}
    } 
    ConnectRecord{
    	topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null,key=null, 			  keySchema=null, 
    	value=Struct{
    		after=Struct{id=3,name=欢欢,age=20},
     source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1646306260302,snapshot=true,db=cdc_test,table=user_info,server_id=0,file=mysql-bin.000002,pos=443,row=0},op=r,ts_ms=1646306260302}, 
     valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    使用自定义反序列化器,打印出如下格式

    {

    ​ “db”:" ",

    ​ “tableName”:" ",

    ​ “before”:{“id”:“1”,“name”:“…”…},

    ​ “after”:{“id”:“1”,“name”:“…”…},

    ​ “op”:" "

    }

    代码展示:同目录下function目录两组文件

    自定义序列化之后,展示的数据如下:

    {
        "op":"READ",
        "before":{},
        "after":{"name":"欢欢","id":3,"age":"20"},
        "db":"cdc_test",
        "tableName":"user_info"
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    总结

    1,FlinkDream 做 CDC 在1.12版本和1.13版本中都能用 ,而FlinkSQL 只能在1.13版本使用

    2,FlinkDream 可以监控多表,而FlinkSQL 只能监控单表

    • FlinkDream 的缺点就是,自带的反序列化的数据,太复杂了,想简单化需要自己编写,而FlinkSQL已经是最好的Row对象展示,很方便,如果一定要使用FlinkDream 做CDC 的话,需要自定义一个反序列化器,要考虑到这一点。
    • FlinkSQL的缺点就是只能监控单表,其他的比较完美,后续等官方更新,看是否支持监控多表
  • 相关阅读:
    Golang入门笔记(5)—— 流程控制之switch分支
    全局前置路由守卫(beforeEach)
    物联网开发笔记(48)- 使用Micropython开发ESP32开发板之控制OLED ssd1306屏幕
    到底该怎么学python啊?
    C语言内存函数
    机器学习---拉格朗日乘子法、Huber Loss、极大似然函数取对数的原因
    10 Debezium Oracle xstream
    Springboot毕设项目购物网站3ztkv(java+VUE+Mybatis+Maven+Mysql)
    趣味证书制作生成微信小程序源码
    uqrcode+uni-app 微信小程序生成二维码
  • 原文地址:https://blog.csdn.net/weixin_45417821/article/details/126465210