CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
基于查询的 CDC | 基于 Binlog 的 CDC | |
---|---|---|
开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否可以捕获所有数据变化 | 否 | 是 |
延迟性 | 高延迟 | 低延迟 |
是否增加数据库压力 | 是 | 否 |
Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>1.12.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_2.12artifactId>
<version>1.12.0version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_2.12artifactId>
<version>1.12.0version>
dependency>
<dependency>
<groupId>org.apache.hadoopgroupId>
<artifactId>hadoop-clientartifactId>
<version>3.1.3version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.49version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_2.12artifactId>
<version>1.12.0version>
dependency>
<dependency>
<groupId>com.ververicagroupId>
<artifactId>flink-connector-mysql-cdcartifactId>
<version>2.0.0version>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.75version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-assembly-pluginartifactId>
<version>3.0.0version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependenciesdescriptorRef>
descriptorRefs>
configuration>
<executions>
<execution>
<id>make-assemblyid>
<phase>packagephase>
<goals>
<goal>singlegoal>
goals>
execution>
executions>
plugin>
plugins>
build>
public class FlinkCDC {
public static void main(String[] args) throws Exception {
//1. 创建 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
//1.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
env.enableCheckpointing(5000L);
//1.2 指定 CK 的一致性语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//1.3 设置任务关闭的时候保留最后一次 CK 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//1.4 指定从 CK 自动重启策略
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
//1.5 设置状态后端
env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
//1.6 设置访问 HDFS 的用户名
System.setProperty("HADOOP_USER_NAME", "lgb");
//2. 创建 FlinkCDC Source
/*
StartupOptions 有 5 种类型:
1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取
2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog
3. latest:从 binlog 的最近位置监控读取
4. specificOffset:从 binlog 的指定位置读取
5. timestamp:从 binlog 的指定时间戳读取
*/
DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder()
.hostname("hadoop102") //Mysql所在主机名
.port(3306) //mysql端口号
.username("root") //登录mysql用户名
.password("123456") //登录mysql密码
.databaseList("cdc_test") //监控的数据库列表,可变参数
.tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表
.deserializer(new StringDebeziumDeserializationSchema()) //反序列化器
.startupOptions(StartupOptions.initial()) //指定读取策略
.build();
//3. 通过 FlinkCDC Source 创建 DataStream
DataStream<String> dataStream = env.addSource(mysqlSource);
//4. 打印输出流
dataStream.print();
//5. 启动任务
env.execute("FlinkCDC");
}
}
将 FlinkCDC 程序进行打包并上传到集群
启动 Hadoop、zookeeper 和 Flink 集群
运行 FlinkCDC 程序
bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
给当前的 Flink 程序创建 Savepoint
bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
停止 FlinkCDC 程序
在Mysql数据表中进行增删改操作
从 Savepoint 重启程序查看程序输出结果
bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持
public class FlinkSQLCDC {
public static void main(String[] args) throws Exception {
//1. 创建 Flink 执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2. 创建 FlinkSQL 表环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offset
tableEnv.executeSql(
"create table user_info (" +
"id String primary key, name String, sex String) with (" +
" 'connector' = 'mysql-cdc'," +
" 'scan.startup.mode' = 'initial'," +
" 'hostname' = 'hadoop102'," +
" 'port' = '3306'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'database-name' = 'cdc_test'," +
" 'table-name' = 'user_info'" +
")"
);
//4. 查询输出表中数据
Table table = tableEnv.sqlQuery("select * from user_info");
DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);
dataStream.print();
//5. 启动任务
env.execute("FlinkSqlCDC");
}
}
规范化数据输出格式,方便后续解析
/**
自定义反序列化器:实现 DebeziumDeserializationSchema 接口并实现 deserialize 和 getProducedType 方法
*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
/*
想要展示的数据格式:
{
"dbName":"",
"tableName":"",
"before":{"field1":"value1",...},
"after":{"field1":"value1",...},
"op":""
}
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
JSONObject result = new JSONObject();
//1.获取库名和表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
//2. 获取 before 数据
Struct value = (Struct) sourceRecord.value();
Struct before = value.getStruct("before");
JSONObject beforeJSON = new JSONObject();
if(before != null) {
Schema schema = before.schema();
List<Field> fields = schema.fields();
for(Field field : fields) {
beforeJSON.put(field.name(), before.get(field));
}
}
//3. 获取 after 数据
Struct after = value.getStruct("after");
JSONObject afterJSON = new JSONObject();
if(after != null) {
Schema schema = after.schema();
List<Field> fields = schema.fields();
for(Field field : fields) {
afterJSON.put(field.name(), after.get(field));
}
}
//4. 获取操作类型 READ DELETE UPDATE CREATE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
result.put("dbName", fields[1]);
result.put("tableName", fields[2]);
result.put("before", beforeJSON);
result.put("after", afterJSON);
result.put("op", operation);
collcetor.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
}