• Flink 1.13(八)CDC


    一.简介

    CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费

    CDC的种类

    在这里插入图片描述

    Flink-CDC

    Flink 社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据增量变更数据的 source 组件。目前也已开源

    二.DataStream方式

    1.MySQL binlog开启

    修改/etc/my.cnf 文件
    sudo vim /etc/my.cnf

    server-id = 1
    log-bin=mysql-bin
    binlog_format=row
    binlog-do-db=database-name // 数据库名字
    
    • 1
    • 2
    • 3
    • 4

    binlog-do-db 根据自己的情况进行修改,指定具体要同步的数据库

    重启 MySQL 使配置生效

    sudo systemctl restart mysqld
    
    • 1

    2.相关依赖

    <dependencies>
    
      <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-javaartifactId>
        <version>1.13.0version>
     dependency>
     <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-streaming-java_2.12artifactId>
        <version>1.13.0version>
     dependency>
     <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-clients_2.12artifactId>
        <version>1.13.0version>
     dependency>
     <dependency>
        <groupId>org.apache.hadoopgroupId>
        <artifactId>hadoop-clientartifactId>
        <version>3.1.3version>
     dependency>
     <dependency>
        <groupId>mysqlgroupId>
        <artifactId>mysql-connector-javaartifactId>
        <version>5.1.49version>
     dependency>
    
     <dependency>
        <groupId>com.alibaba.ververicagroupId>
        <artifactId>flink-connector-mysql-cdcartifactId>
        <version>1.3.0version>
     dependency>
     <dependency>
        <groupId>com.alibabagroupId>
        <artifactId>fastjsonartifactId>
        <version>1.2.75version>
     dependency>
     
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    3.编写代码

    public class FlinkCDC {
        public static void main(String[] args) throws Exception {
            // 1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 开启状态后端 1.管理状态,提供状态的查询和访问2.状态的保存位置是内存
            env.setStateBackend(new HashMapStateBackend());
    
            // 开启检查点 5秒一次
            env.enableCheckpointing(5000);
            // 设置检查点的保存位置
            env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/flink-cdc");
            // 检查点为精准一次
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            env.getCheckpointConfig().setCheckpointTimeout(10000L);
    
            // 2.通过Flink CDC 构建MySQL SOURCE
            DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                    .hostname("175.178.154.194")
                    .port(3306)
                    .username("root")
                    .password("xxxx")
                    .databaseList("rtdw-flink") // 库名
                    .tableList("rtdw-flink.base_category1") // 表名,可以有多个,因此用库名.表名来规定表名,防止不同库之间有相同的表名
                    .deserializer(new StringDebeziumDeserializationSchema()) // 数据输出格式(反序列化),这里暂时选择默认
                    .startupOptions(StartupOptions.initial()) // 读取方式,initial表示从头开始读取,还有一种latest方式,表示从启动CDC程序后,读取此时刻以后数据库变化的数据
                    .build();
    
            DataStreamSource<String> streamSource = env.addSource(sourceFunction);
            // 3.打印数据
            streamSource.print();
    
            // 4.启动任务
            env.execute("CDC TASK ");
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    4.打包

    下面的打包插件可以将依赖一起打包

    <build>
       <plugins>
           <plugin>
               <artifactId>maven-compiler-pluginartifactId>
               <version>3.6.1version>
               <configuration>
                   <source>1.8source>
                   <target>1.8target>
               configuration>
           plugin>
           <plugin>
               <artifactId>maven-assembly-pluginartifactId>
               <configuration>
                   <descriptorRefs>
                       <descriptorRef>jar-with-dependenciesdescriptorRef>
                   descriptorRefs>
               configuration>
               <executions>
                   <execution>
                       <id>make-assemblyid>
                       <phase>packagephase>
                       <goals>
                           <goal>singlegoal>
                       goals>
                   execution>
               executions>
           plugin>
       plugins>
    build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    5.测试

    开启flink集群

    bin/start-cluster.sh
    
    • 1

    输入web页面

    http://hadoop102:8081

    添加jar包
    在这里插入图片描述

    指定启动类提交即可

    在这里插入图片描述
    可以看到输出

    在这里插入图片描述

    复制任务ID

    在这里插入图片描述

    执行命令 将该任务的检查点保存到hdfs(自定义目录)

    bin/flink savepoint 224f7761b07bcee76bcaa74a5248a0e3 hdfs://hadoop102:8020/savepoint
    
    • 1

    在这里插入图片描述

    取消任务 去MySQL修改表
    在这里插入图片描述

    在这里插入图片描述

    找到保存点文件
    在这里插入图片描述
    在这里插入图片描述
    实现了断点续传
    在这里插入图片描述

    三.自定义反序列化

    我们可以看到,我们看到的数据格式十分复杂,不利于我们后期对数据的使用,因此我们需要自定义反序列化器,来实现数据的可用性和可读性
    在这里插入图片描述

    public class CusDeserialization implements DebeziumDeserializationSchema<String> {
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            // 创建JSON对象
            JSONObject res = new JSONObject();
            // 获取topic 里面有数据库名和表名
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            // 1.获取表名和数据库名
            String database = fields[1];
            String tableName = fields[2];
    
            // 2.获取value 里面会有before(修改数据会有)和after
            Struct value = (Struct)sourceRecord.value();
            // 获取before的结构
            Struct beforeStruct = value.getStruct("before");
            // 将before对象放到JSON对象
            JSONObject beforeJSON = new JSONObject();
            // 非修改数据是不会有before的,所以要判断
            if(beforeStruct != null){
                // 获取元数据
                Schema beforeSchema = beforeStruct.schema();
                // 通过元数据获取字段名
                List<Field> fieldList = beforeSchema.fields();
                for (Field field : fieldList) {
                    // 获取字段值
                    Object beforeValue = beforeStruct.get(field);
                    // 放入JSON对象
                    beforeJSON.put(field.name(),beforeValue);
                }
            }
    
            // 3.获取after
            Struct after = value.getStruct("after");
            JSONObject afterJSON = new JSONObject();
            if(after != null){
                Schema afterSchema = after.schema();
                List<Field> fieldList = afterSchema.fields();
                for (Field field : fieldList) {
                    Object afterValue = after.get(field);
                    afterJSON.put(field.name(),afterValue);
                }
            }
    
            // 4.获取操作类型
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toLowerCase();
            if("create".equals(type)){  // 类型crud的全称
                type = "insert";
            }
    
            // 5.数据字段写入JSON
            res.put("database",database);
            res.put("tableName",tableName);
            res.put("before",beforeJSON);
            res.put("after",afterJSON);
            res.put("type",type);
    
            // 6.发送数据至下游
            collector.collect(res.toJSONString());
    
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    更改反序列化器

    public class FlinkCDC {
        public static void main(String[] args) throws Exception {
            // 1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
    
            // 2.通过Flink CDC 构建MySQL SOURCE
            DebeziumSourceFunction sourceFunction = MySQLSource.<String>builder()
                    .hostname("175.178.154.194")
                    .port(3306)
                    .username("root")
                    .password("zks123456")
                    .databaseList("rtdw-flink")
                    .tableList("rtdw-flink.base_sale_attr")
                    .deserializer(new CusDes())
                    .startupOptions(StartupOptions.initial())
                    .build();
    
            DataStreamSource<String> streamSource = env.addSource(sourceFunction);
            // 3.打印数据
            streamSource.print();
    
            // 4.启动任务
            env.execute("CDC TASK ");
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在这里插入图片描述

  • 相关阅读:
    多线程编程【条件变量】
    算法练习13——H 指数
    想转行学软件测试担心哪些问题?
    jQuery 入门-----第二节:jQuery 常用API
    vue3代码编写
    【JavaWeb】WEB容器
    解决非controller使用@Autowired注解注入为null问题
    解决老版本Oracle VirtualBox 此应用无法在此设备上运行问题
    五分钟教你使用GitHub寻找优质项目
    李代数求导
  • 原文地址:https://blog.csdn.net/ks_1998/article/details/125784516