• FlinkCDC 2.0


    CDC 是 Change Data Capture(变更数据获取)的简称

    CDC 的种类

    CDC 主要分为基于查询和基于 Binlog 两种方式

    基于查询的 CDC基于 Binlog 的 CDC
    开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
    执行模式BatchStreaming
    捕获变化数据
    延迟
    带给数据库压力

    1.CDC实操

    1.1 DataStream 方式的应用

    1.pom依赖运行环境flink1.3

    1. <properties>
    2. <flink.version>1.13.0flink.version>
    3. properties>
    4. <dependencies>
    5. <dependency>
    6. <groupId>org.apache.flinkgroupId>
    7. <artifactId>flink-javaartifactId>
    8. <version>${flink.version}version>
    9. dependency>
    10. <dependency>
    11. <groupId>org.apache.flinkgroupId>
    12. <artifactId>flink-streaming-java_2.12artifactId>
    13. <version>${flink.version}version>
    14. dependency>
    15. <dependency>
    16. <groupId>org.apache.flinkgroupId>
    17. <artifactId>flink-clients_2.12artifactId>
    18. <version>${flink.version}version>
    19. dependency>
    20. <dependency>
    21. <groupId>org.apache.hadoopgroupId>
    22. <artifactId>hadoop-clientartifactId>
    23. <version>3.1.3version>
    24. dependency>
    25. <dependency>
    26. <groupId>mysqlgroupId>
    27. <artifactId>mysql-connector-javaartifactId>
    28. <version>5.1.49version>
    29. dependency>
    30. <dependency>
    31. <groupId>org.apache.flinkgroupId>
    32. <artifactId>flink-table-planner-blink_2.12artifactId>
    33. <version>${flink.version}version>
    34. dependency>
    35. <dependency>
    36. <groupId>com.ververicagroupId>
    37. <artifactId>flink-connector-mysql-cdcartifactId>
    38. <version>2.0.0version>
    39. dependency>
    40. <dependency>
    41. <groupId>com.alibabagroupId>
    42. <artifactId>fastjsonartifactId>
    43. <version>1.2.75version>
    44. dependency>
    45. dependencies>
    46. <build>
    47. <plugins>
    48. <plugin>
    49. <groupId>org.apache.maven.pluginsgroupId>
    50. <artifactId>maven-assembly-pluginartifactId>
    51. <version>3.0.0version>
    52. <configuration>
    53. <descriptorRefs>
    54. <descriptorRef>jar-with-dependenciesdescriptorRef>
    55. descriptorRefs>
    56. configuration>
    57. <executions>
    58. <execution>
    59. <id>make-assemblyid>
    60. <phase>packagephase>
    61. <goals>
    62. <goal>singlegoal>
    63. goals>
    64. execution>
    65. executions>
    66. plugin>
    67. <plugin>
    68. <groupId>org.apache.maven.pluginsgroupId>
    69. <artifactId>maven-compiler-pluginartifactId>
    70. <configuration>
    71. <source>8source>
    72. <target>8target>
    73. configuration>
    74. plugin>
    75. plugins>
    76. build>

    2.代码

    1. import com.ververica.cdc.connectors.mysql.MySqlSource;
    2. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    3. import com.ververica.cdc.debezium.DebeziumSourceFunction;
    4. import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
    5. import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    6. import org.apache.flink.streaming.api.CheckpointingMode;
    7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. import java.time.Duration;
    10. public class FlinkCDC {
    11. public static void main(String[] args) throws Exception {
    12. //1.获取flink 执行环境
    13. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    14. //设置并行度
    15. env.setParallelism(1);
    16. //2.1 开启Checkpoint Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 开启Checkpoint,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
    17. //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
    18. env.enableCheckpointing(5000);
    19. //2.1.1超时时间设置为10秒
    20. env.getCheckpointConfig().setAlignmentTimeout(Duration.ofDays(10000));
    21. //2.2 指定 Checkpoint 的一致性语义
    22. env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    23. //2.3 设置最大并发
    24. env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    25. //2.4 保存路径
    26. env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));
    27. //3.通过FlinkCDC构建SourceFunction
    28. DebeziumSourceFunction sourceFunction = MySqlSource
    29. .builder()
    30. .hostname("hadoop102")
    31. .port(3306)
    32. .username("root")
    33. .password("******")
    34. .databaseList("cdc_test") //指定数据库(可变形参 可指定多个)
    35. .tableList("cdc_test.user_info") //指定表名 (以库名.表名 的方式指定)
    36. .deserializer(new StringDebeziumDeserializationSchema()) //反序列化器
    37. .startupOptions(StartupOptions.initial()) //initial通过查询的方式获取同步前的数据,后续通过监控binlog变化捕获数据
    38. //earliest通过binlog从开始处读取数据同步。注意点:需要在创建库之前开启binlog
    39. //latest 直接到binlog读取数据
    40. //specificOffset 指定保存点消费binlog
    41. //timestamp 指定时间戳消费binlog
    42. .build();
    43. //4.使用 CDC Source 从 MySQL 读取数据
    44. DataStreamSource dataStreamSource = env.addSource(sourceFunction);
    45. //5.数据打印
    46. dataStreamSource.print();
    47. env.execute("FlinkCDC");
    48. }
    49. }

    3.测试

    1)打包并上传至 Linux

    2)保证 MySQL Binlog开启

    sudo vim /etc/my.cnf
    
    1. #数据库id
    2. server-id = 1
    3. ##启动binlog,该参数的值会作为binlog的文件名
    4. log-bin=mysql-bin
    5. ##binlog类型,maxwell要求为row类型
    6. binlog_format=row
    7. ##启用binlog的数据库,需根据实际情况作出修改
    8. binlog-do-db=test
    9. binlog-do-db=cdc_test
    10. binlog-do-db=gmall2022
    11. gtid-mode=on
    12. ## 开启 gtid 模式
    13. enforce-gtid-consistency=1
    14. ## 强制 gtid 和事务的一致性

     

    3)开启hadoop集群

    4)启动 Flink 集群

    采取yarn-session模式

    ./ yarn-session.sh --nm test

    5)启动程序

    bin/flink run -c 全类名 包名 
    

    6)在 MySQL 的表中进行增删改操作

     7)查看检查点

     8)给当前的 Flink 程序创建 Savepoint  savepoint后参数为jobID

     bin/flink savepoint 09d9ca7d1bf3a59db96608a1caf13e57 hdfs://hadoop102:8020/cdc-test/savepoint
    

    9)关闭程序以后从 Savepoint 重启程序

     在mysql 表中进行增删改操作

    重新启动flinkCDC程序 参数 -s 保持点路径

    bin/flink run  -c 全类名 -s hdfs://hadoop102:8020/cdc-test/savepoint/savepoint-09d9ca-0167f4c15631 FlinkCDC-1.0-SNAPSHOT-jar-with-dependencies.jar 
    

     1.2 FlinkSQL 方式的应用

    1.代码

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    3. public class FlinkSQLCDC {
    4. public static void main(String[] args) throws Exception {
    5. //1.创建执行环境
    6. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    7. env.setParallelism(1);
    8. StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    9. //2.创建 Flink-MySQL-CDC 的 Source
    10. //2.1SQL表格不需要反序列化器
    11. //2.2启动模式 默认为initial MySQL CDC 消费者可选的启动模式, 合法的模式为 "initial" 和 "latest-offset"。
    12. tableEnv.executeSql("CREATE TABLE user_info (" +
    13. " id STRING primary key," +
    14. " name STRING," +
    15. " sex STRING" +
    16. ") WITH (" +
    17. " 'connector' = 'mysql-cdc'," +
    18. " 'scan.startup.mode' = 'latest-offset'," +
    19. " 'hostname' = 'hadoop102'," +
    20. " 'port' = '3306'," +
    21. " 'username' = 'root'," +
    22. " 'password' = '000000'," +
    23. " 'database-name' = 'cdc_test'," +
    24. " 'table-name' = 'user_info*'" + //使用正则查看分表
    25. ")");
    26. tableEnv.executeSql("select * from user_info").print();
    27. env.execute("FlinkSQLCDC");
    28. }
    29. }

    2.表中进行增删改查

     1.3 自定义反序列化器

    通过自定义反序列化器将输出的能容进行更改

    确定自定义序列化器输出的内容及格式

    1. {"db":"",
    2. "tableName":"",
    3. "before":"{"id":"1001","name":"zhangsan","sex":"male"}",
    4. "after":"{"id":"1002","name":"lisi","sex":"female"}",
    5. "op":"", //操作类型
    6. }
    1. import com.alibaba.fastjson.JSONObject;
    2. import com.ververica.cdc.connectors.mysql.MySqlSource;
    3. import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    4. import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
    5. import com.ververica.cdc.debezium.DebeziumSourceFunction;
    6. import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
    7. import io.debezium.data.Envelope;
    8. import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    9. import org.apache.flink.api.common.typeinfo.TypeInformation;
    10. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    11. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    12. import org.apache.flink.util.Collector;
    13. import org.apache.kafka.connect.data.Field;
    14. import org.apache.kafka.connect.data.Schema;
    15. import org.apache.kafka.connect.data.Struct;
    16. import org.apache.kafka.connect.source.SourceRecord;
    17. import java.util.List;
    18. public class FlinkCDC2 {
    19. public static void main(String[] args) throws Exception {
    20. //1.获取flink 执行环境
    21. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    22. //设置并行度
    23. env.setParallelism(1);
    24. DebeziumSourceFunction sourceFunction = MySqlSource
    25. .builder()
    26. .hostname("hadoop102")
    27. .port(3306)
    28. .username("root")
    29. .password("000000")
    30. .databaseList("cdc_test") //指定数据库(可变形参 可指定多个)
    31. // .tableList("cdc_test.user_info") //指定表名 (以库名.表名 的方式指定)
    32. .deserializer(new MyDeserializationSchema())//自定义反序列化器
    33. .startupOptions(StartupOptions.initial()) //initial通过查询的方式获取同步前的数据,后续通过监控binlog变化捕获数据
    34. //earliest通过binlog从开始处读取数据同步。注意点:需要在创建库之前开启binlog
    35. //latest 直接到binlog读取数据
    36. //specificOffset 指定保存点消费binlog
    37. //timestamp 指定时间戳消费binlog
    38. .build();
    39. //4.使用 CDC Source 从 MySQL 读取数据
    40. DataStreamSource dataStreamSource = env.addSource(sourceFunction);
    41. //5.数据打印
    42. dataStreamSource.print();
    43. env.execute("FlinkCDC");
    44. }
    45. /**
    46. * 自定义反序列化器 实现 DebeziumDeserializationSchema 接口
    47. */
    48. public static class MyDeserializationSchema implements DebeziumDeserializationSchema {
    49. /**
    50. * @param sourceRecord 输入 SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={transaction_id=null, ts_sec=1661421973, file=mysql-bin.000032, pos=5107, gtids=186ee13a-e180-11ec-b32b-000c29d17377:1-38, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.cdc_test.user_info', kafkaPartition=null, key=Struct{id=1006}, keySchema=Schema{mysql_binlog_source.cdc_test.user_info.Key:STRUCT}, value=Struct{before=Struct{id=1006,name=aa,sex=aa},after=Struct{id=1006,name=aaa,sex=female},source=Struct{version=1.5.2.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1661421973000,db=cdc_test,table=user_info,server_id=1,gtid=186ee13a-e180-11ec-b32b-000c29d17377:39,file=mysql-bin.000032,pos=5239,row=0},op=u,ts_ms=1661421974171}, valueSchema=Schema{mysql_binlog_source.cdc_test.user_info.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
    51. * @param collector 输出 {"db":"",
    52. * "tableName":"",
    53. * "before":"{"id":"1001","name":"zhangsan","sex":"male"}",
    54. * "after":"{"id":"1002","name":"lisi","sex":"female"}",
    55. * "op":"", //操作类型
    56. * }
    57. * @throws Exception
    58. */
    59. @Override
    60. public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
    61. //1.输出为json格式
    62. //1.1创建一个json对象用于封装输出结果
    63. JSONObject jsonObject = new JSONObject();
    64. //2.1获取 db & tableName 以.做分隔符取第二第三字段作为库名表名
    65. String topic = sourceRecord.topic(); //{topic='mysql_binlog_source.cdc_test.user_info'
    66. String[] dbNAndTblN = topic.split("\\.");
    67. jsonObject.put("db", dbNAndTblN[1]);
    68. jsonObject.put("tableName", dbNAndTblN[2]);
    69. //2.2获取before数据
    70. //2.2.1所需数据在struct结构体内 将获取的value转换为struct结构
    71. //value=Struct{before=Struct{id=1006,name=aa,sex=aa},after=Struct{id=1006,name=aaa,sex=female},
    72. Struct value = (Struct) sourceRecord.value();
    73. //2.2.2before=Struct{id=1006,name=aa,sex=aa} before依然是struct结构 通过getStruct方法直接获得before struct
    74. Struct before = value.getStruct("before");
    75. //2.2.3创建json对象封装before对象
    76. JSONObject beforeJson = new JSONObject();
    77. if (before != null) {
    78. //2.2.4获取before结构体的元数据信息
    79. Schema schema = before.schema();
    80. //2.2.5获取列名
    81. List fields = schema.fields();
    82. for (Field field : fields) {
    83. beforeJson.put(field.name(), before.get(field));
    84. }
    85. }
    86. jsonObject.put("before", beforeJson);
    87. //2.3获取after数据
    88. Struct after = value.getStruct("after");
    89. JSONObject afterJson = new JSONObject();
    90. if (after != null) {
    91. Schema schema = after.schema();
    92. List fields = schema.fields();
    93. for (Field field : fields) {
    94. afterJson.put(field.name(), after.get(field));
    95. }
    96. }
    97. jsonObject.put("after", afterJson);
    98. //获取操作类型 READ DELETE UPDATE CREATE
    99. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
    100. jsonObject.put("op",operation);
    101. //输出数据
    102. collector.collect(jsonObject.toJSONString());
    103. }
    104. /**
    105. * 获取类型
    106. * @return String
    107. */
    108. @Override
    109. public TypeInformation getProducedType() {
    110. return BasicTypeInfo.STRING_TYPE_INFO;
    111. }
    112. }
    113. }

    1.4 DataStream 与 FlinkSQL 的区别

    1.适用版本:FlinkSQL不支持flink13以前的版本

    2.DataStream 可以同时捕获多库多表、FlinkSQL只能捕获单表

    3.DataStream默认的反序列化器不实用,需要自定义反序列化器。FlinkSQL直接获取row 可以直接转换为javabean

  • 相关阅读:
    Facebook Delos 中的虚拟共识协议
    详细讲解仪器仪表modbus RTU或TCP 获取的16位数字转浮点数 附c#代码
    英国物联网初创公司【FourJaw】完成180万英镑融资
    UE5、CesiumForUnreal实现加载GeoJson绘制盒体(Box)功能(StaticMesh方式)
    [青少年CTF训练平台]web部分题解(已完结!)
    BLEMotion-Kit 支蓝牙运动传感评估套件
    【vue3】状态过渡-GSAP插件实现
    sleuth+zipkin持久化和gateway设置跨域
    ZooKeeper系统模型
    在微服务架构中管理技术债务
  • 原文地址:https://blog.csdn.net/asd623444055/article/details/126528213