CDC 是 Change Data Capture(变更数据获取)的简称
CDC 的种类
CDC 主要分为基于查询和基于 Binlog 两种方式
| 基于查询的 CDC | 基于 Binlog 的 CDC | |
| 开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
| 执行模式 | Batch | Streaming |
| 捕获变化数据 | 否 | 是 |
| 延迟 | 高 | 低 |
| 带给数据库压力 | 是 | 否 |
1.pom依赖运行环境flink1.3
- <properties>
- <flink.version>1.13.0flink.version>
- properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-javaartifactId>
- <version>${flink.version}version>
- dependency>
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-streaming-java_2.12artifactId>
- <version>${flink.version}version>
- dependency>
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-clients_2.12artifactId>
- <version>${flink.version}version>
- dependency>
- <dependency>
- <groupId>org.apache.hadoopgroupId>
- <artifactId>hadoop-clientartifactId>
- <version>3.1.3version>
- dependency>
- <dependency>
- <groupId>mysqlgroupId>
- <artifactId>mysql-connector-javaartifactId>
- <version>5.1.49version>
- dependency>
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-table-planner-blink_2.12artifactId>
- <version>${flink.version}version>
- dependency>
- <dependency>
- <groupId>com.ververicagroupId>
- <artifactId>flink-connector-mysql-cdcartifactId>
- <version>2.0.0version>
- dependency>
- <dependency>
- <groupId>com.alibabagroupId>
- <artifactId>fastjsonartifactId>
- <version>1.2.75version>
- dependency>
- dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-assembly-pluginartifactId>
- <version>3.0.0version>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependenciesdescriptorRef>
- descriptorRefs>
- configuration>
- <executions>
- <execution>
- <id>make-assemblyid>
- <phase>packagephase>
- <goals>
- <goal>singlegoal>
- goals>
- execution>
- executions>
- plugin>
- <plugin>
- <groupId>org.apache.maven.pluginsgroupId>
- <artifactId>maven-compiler-pluginartifactId>
- <configuration>
- <source>8source>
- <target>8target>
- configuration>
- plugin>
- plugins>
- build>
2.代码
- import com.ververica.cdc.connectors.mysql.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import org.apache.flink.runtime.state.filesystem.FsStateBackend;
- import org.apache.flink.streaming.api.CheckpointingMode;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
- import java.time.Duration;
-
- public class FlinkCDC {
- public static void main(String[] args) throws Exception {
- //1.获取flink 执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //设置并行度
- env.setParallelism(1);
-
- //2.1 开启Checkpoint Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 开启Checkpoint,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
- //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
- env.enableCheckpointing(5000);
- //2.1.1超时时间设置为10秒
- env.getCheckpointConfig().setAlignmentTimeout(Duration.ofDays(10000));
- //2.2 指定 Checkpoint 的一致性语义
- env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- //2.3 设置最大并发
- env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
- //2.4 保存路径
- env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));
-
- //3.通过FlinkCDC构建SourceFunction
- DebeziumSourceFunction
sourceFunction = MySqlSource - .
builder() - .hostname("hadoop102")
- .port(3306)
- .username("root")
- .password("******")
- .databaseList("cdc_test") //指定数据库(可变形参 可指定多个)
- .tableList("cdc_test.user_info") //指定表名 (以库名.表名 的方式指定)
- .deserializer(new StringDebeziumDeserializationSchema()) //反序列化器
- .startupOptions(StartupOptions.initial()) //initial通过查询的方式获取同步前的数据,后续通过监控binlog变化捕获数据
- //earliest通过binlog从开始处读取数据同步。注意点:需要在创建库之前开启binlog
- //latest 直接到binlog读取数据
- //specificOffset 指定保存点消费binlog
- //timestamp 指定时间戳消费binlog
- .build();
- //4.使用 CDC Source 从 MySQL 读取数据
- DataStreamSource
dataStreamSource = env.addSource(sourceFunction); -
-
- //5.数据打印
- dataStreamSource.print();
-
- env.execute("FlinkCDC");
-
- }
- }
3.测试
1)打包并上传至 Linux
2)保证 MySQL Binlog开启
sudo vim /etc/my.cnf
- #数据库id
- server-id = 1
- ##启动binlog,该参数的值会作为binlog的文件名
- log-bin=mysql-bin
- ##binlog类型,maxwell要求为row类型
- binlog_format=row
- ##启用binlog的数据库,需根据实际情况作出修改
- binlog-do-db=test
- binlog-do-db=cdc_test
- binlog-do-db=gmall2022
- gtid-mode=on
- ## 开启 gtid 模式
- enforce-gtid-consistency=1
- ## 强制 gtid 和事务的一致性
3)开启hadoop集群
4)启动 Flink 集群
采取yarn-session模式
./ yarn-session.sh --nm test
5)启动程序
bin/flink run -c 全类名 包名
6)在 MySQL 的表中进行增删改操作

7)查看检查点

8)给当前的 Flink 程序创建 Savepoint savepoint后参数为jobID

bin/flink savepoint 09d9ca7d1bf3a59db96608a1caf13e57 hdfs://hadoop102:8020/cdc-test/savepoint

9)关闭程序以后从 Savepoint 重启程序

在mysql 表中进行增删改操作
重新启动flinkCDC程序 参数 -s 保持点路径
bin/flink run -c 全类名 -s hdfs://hadoop102:8020/cdc-test/savepoint/savepoint-09d9ca-0167f4c15631 FlinkCDC-1.0-SNAPSHOT-jar-with-dependencies.jar

1.代码
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
- public class FlinkSQLCDC {
- public static void main(String[] args) throws Exception {
- //1.创建执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
-
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
- //2.创建 Flink-MySQL-CDC 的 Source
- //2.1SQL表格不需要反序列化器
- //2.2启动模式 默认为initial MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial" 和 "latest-offset"。
- tableEnv.executeSql("CREATE TABLE user_info (" +
- " id STRING primary key," +
- " name STRING," +
- " sex STRING" +
- ") WITH (" +
- " 'connector' = 'mysql-cdc'," +
- " 'scan.startup.mode' = 'latest-offset'," +
- " 'hostname' = 'hadoop102'," +
- " 'port' = '3306'," +
- " 'username' = 'root'," +
- " 'password' = '000000'," +
- " 'database-name' = 'cdc_test'," +
- " 'table-name' = 'user_info*'" + //使用正则查看分表
- ")");
- tableEnv.executeSql("select * from user_info").print();
-
- env.execute("FlinkSQLCDC");
-
- }
- }
2.表中进行增删改查

通过自定义反序列化器将输出的能容进行更改
确定自定义序列化器输出的内容及格式
- {"db":"",
- "tableName":"",
- "before":"{"id":"1001","name":"zhangsan","sex":"male"}",
- "after":"{"id":"1002","name":"lisi","sex":"female"}",
- "op":"", //操作类型
- }
-
- import com.alibaba.fastjson.JSONObject;
- import com.ververica.cdc.connectors.mysql.MySqlSource;
- import com.ververica.cdc.connectors.mysql.table.StartupOptions;
- import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
- import com.ververica.cdc.debezium.DebeziumSourceFunction;
- import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
- import io.debezium.data.Envelope;
- import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
- import org.apache.flink.api.common.typeinfo.TypeInformation;
- import org.apache.flink.streaming.api.datastream.DataStreamSource;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.util.Collector;
- import org.apache.kafka.connect.data.Field;
- import org.apache.kafka.connect.data.Schema;
- import org.apache.kafka.connect.data.Struct;
- import org.apache.kafka.connect.source.SourceRecord;
-
- import java.util.List;
-
- public class FlinkCDC2 {
- public static void main(String[] args) throws Exception {
- //1.获取flink 执行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //设置并行度
- env.setParallelism(1);
-
-
- DebeziumSourceFunction
sourceFunction = MySqlSource - .
builder() - .hostname("hadoop102")
- .port(3306)
- .username("root")
- .password("000000")
- .databaseList("cdc_test") //指定数据库(可变形参 可指定多个)
- // .tableList("cdc_test.user_info") //指定表名 (以库名.表名 的方式指定)
- .deserializer(new MyDeserializationSchema())//自定义反序列化器
- .startupOptions(StartupOptions.initial()) //initial通过查询的方式获取同步前的数据,后续通过监控binlog变化捕获数据
- //earliest通过binlog从开始处读取数据同步。注意点:需要在创建库之前开启binlog
- //latest 直接到binlog读取数据
- //specificOffset 指定保存点消费binlog
- //timestamp 指定时间戳消费binlog
- .build();
- //4.使用 CDC Source 从 MySQL 读取数据
- DataStreamSource
dataStreamSource = env.addSource(sourceFunction); -
-
- //5.数据打印
- dataStreamSource.print();
-
- env.execute("FlinkCDC");
-
- }
-
- /**
- * 自定义反序列化器 实现 DebeziumDeserializationSchema 接口
- */
- public static class MyDeserializationSchema implements DebeziumDeserializationSchema
{ -
- /**
- * @param sourceRecord 输入 SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1661421973, file=mysql-bin.000032, pos=5107, gtids=186ee13a-e180-11ec-b32b-000c29d17377:1-38, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null, key=Struct{id=1006}, keySchema=Schema{mysql_binlog_source.cdc_test.user_info.Key:STRUCT}, value=Struct{before=Struct{id=1006,name=aa,sex=aa},after=Struct{id=1006,name=aaa,sex=female},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1661421973000,db=cdc_test,table=user_info,server_id=1,gtid=186ee13a-e180-11ec-b32b-000c29d17377:39,file=mysql-bin.000032,pos=5239,row=0},op=u,ts_ms=1661421974171}, valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
- * @param collector 输出 {"db":"",
- * "tableName":"",
- * "before":"{"id":"1001","name":"zhangsan","sex":"male"}",
- * "after":"{"id":"1002","name":"lisi","sex":"female"}",
- * "op":"", //操作类型
- * }
- * @throws Exception
- */
- @Override
- public void deserialize(SourceRecord sourceRecord, Collector
collector) throws Exception { - //1.输出为json格式
- //1.1创建一个json对象用于封装输出结果
- JSONObject jsonObject = new JSONObject();
-
- //2.1获取 db & tableName 以.做分隔符取第二第三字段作为库名表名
- String topic = sourceRecord.topic(); //{topic='mysql_binlog_source.cdc_test.user_info'
- String[] dbNAndTblN = topic.split("\\.");
- jsonObject.put("db", dbNAndTblN[1]);
- jsonObject.put("tableName", dbNAndTblN[2]);
-
- //2.2获取before数据
- //2.2.1所需数据在struct结构体内 将获取的value转换为struct结构
- //value=Struct{before=Struct{id=1006,name=aa,sex=aa},after=Struct{id=1006,name=aaa,sex=female},
- Struct value = (Struct) sourceRecord.value();
- //2.2.2before=Struct{id=1006,name=aa,sex=aa} before依然是struct结构 通过getStruct方法直接获得before struct
- Struct before = value.getStruct("before");
- //2.2.3创建json对象封装before对象
- JSONObject beforeJson = new JSONObject();
- if (before != null) {
- //2.2.4获取before结构体的元数据信息
- Schema schema = before.schema();
- //2.2.5获取列名
- List
fields = schema.fields(); - for (Field field : fields) {
- beforeJson.put(field.name(), before.get(field));
- }
- }
- jsonObject.put("before", beforeJson);
- //2.3获取after数据
- Struct after = value.getStruct("after");
- JSONObject afterJson = new JSONObject();
- if (after != null) {
- Schema schema = after.schema();
- List
fields = schema.fields(); - for (Field field : fields) {
- afterJson.put(field.name(), after.get(field));
- }
- }
- jsonObject.put("after", afterJson);
-
- //获取操作类型 READ DELETE UPDATE CREATE
- Envelope.Operation operation = Envelope.operationFor(sourceRecord);
- jsonObject.put("op",operation);
-
- //输出数据
- collector.collect(jsonObject.toJSONString());
- }
-
- /**
- * 获取类型
- * @return String
- */
- @Override
- public TypeInformation
getProducedType() { - return BasicTypeInfo.STRING_TYPE_INFO;
- }
- }
- }

1.适用版本:FlinkSQL不支持flink13以前的版本
2.DataStream 可以同时捕获多库多表、FlinkSQL只能捕获单表
3.DataStream默认的反序列化器不实用,需要自定义反序列化器。FlinkSQL直接获取row 可以直接转换为javabean