• 详解 Flink CDC 的介绍和入门案例


    一、Flink CDC 简介

    1. CDC 介绍

    ​ CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

    2. CDC 种类

    基于查询的 CDC基于 Binlog 的 CDC
    开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
    执行模式BatchStreaming
    是否可以捕获所有数据变化
    延迟性高延迟低延迟
    是否增加数据库压力

    3. Flink CDC 介绍

    ​ Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors

    二、Flink CDC 案例实操

    1. DataStream 实现

    1.1 导入依赖
    <dependencies>
    	 <dependency>
    		 <groupId>org.apache.flinkgroupId>
    		 <artifactId>flink-javaartifactId>
    		 <version>1.12.0version>
    	 dependency>
    	 <dependency>
    		 <groupId>org.apache.flinkgroupId>
    		 <artifactId>flink-streaming-java_2.12artifactId>
    		 <version>1.12.0version>
    	 dependency>
    	 <dependency>
    		 <groupId>org.apache.flinkgroupId>
    		 <artifactId>flink-clients_2.12artifactId>
    		 <version>1.12.0version>
    	 dependency>
    	 <dependency>
    		 <groupId>org.apache.hadoopgroupId>
    		 <artifactId>hadoop-clientartifactId>
    		 <version>3.1.3version>
    	 dependency>
    	 <dependency>
    		 <groupId>mysqlgroupId>
    		 <artifactId>mysql-connector-javaartifactId>
    		 <version>5.1.49version>
    	 dependency>
    	 <dependency>
    		 <groupId>org.apache.flinkgroupId>
    		 <artifactId>flink-table-planner-blink_2.12artifactId>
    		 <version>1.12.0version>
    	 dependency>
    	 <dependency>
    		 <groupId>com.ververicagroupId>
    		 <artifactId>flink-connector-mysql-cdcartifactId>
    		 <version>2.0.0version>
    	 dependency>
    	 <dependency>
    		 <groupId>com.alibabagroupId>
    		 <artifactId>fastjsonartifactId>
    		 <version>1.2.75version>
    	 dependency>
    dependencies>
    <build>
    	 <plugins>
    		 <plugin>
    			 <groupId>org.apache.maven.pluginsgroupId>
    			 <artifactId>maven-assembly-pluginartifactId>
    			 <version>3.0.0version>
    			 <configuration>
    				 <descriptorRefs>
    				 	<descriptorRef>jar-with-dependenciesdescriptorRef>
    				 descriptorRefs>
    			 configuration>
    			 <executions>
    				 <execution>
    					 <id>make-assemblyid>
    					 <phase>packagephase>
    					 <goals>
    					 	<goal>singlegoal>
    					 goals>
    				 execution>
    			 executions>
    		 plugin>
    	 plugins>
    build>
    
    1.2 编写程序代码
    public class FlinkCDC {
        public static void main(String[] args) throws Exception {
            //1. 创建 Flink 执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            
            //Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序
             //1.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
             env.enableCheckpointing(5000L);
             //1.2 指定 CK 的一致性语义
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
             //1.3 设置任务关闭的时候保留最后一次 CK 数据
     env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
             //1.4 指定从 CK 自动重启策略
             env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
             //1.5 设置状态后端
             env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));
             //1.6 设置访问 HDFS 的用户名
             System.setProperty("HADOOP_USER_NAME", "lgb");
            
            //2. 创建 FlinkCDC Source
            /*
            	StartupOptions 有 5 种类型:
            	1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取
            	2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog
            	3. latest:从 binlog 的最近位置监控读取
    			4. specificOffset:从 binlog 的指定位置读取
    			5. timestamp:从 binlog 的指定时间戳读取
            */
            DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder()
                .hostname("hadoop102") //Mysql所在主机名
                .port(3306) //mysql端口号
                .username("root") //登录mysql用户名
                .password("123456") //登录mysql密码
                .databaseList("cdc_test") //监控的数据库列表,可变参数
                .tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表
                .deserializer(new StringDebeziumDeserializationSchema()) //反序列化器
                .startupOptions(StartupOptions.initial()) //指定读取策略
                .build();
            
            //3. 通过 FlinkCDC Source 创建 DataStream
            DataStream<String> dataStream = env.addSource(mysqlSource);
            
            //4. 打印输出流
            dataStream.print();
            
            //5. 启动任务
            env.execute("FlinkCDC");
        }
    }
    
    1.3 测试
    1.3.1 本地测试
    • 开启 MySQL Binlog 并重启 MySQL
    • 在 Mysql 中创建对应的数据库和数据表并插入一条数据
    • 启动 FlinkCDC 程序,查看控制台结果,可以看到通过查询的方式获取到了数据表里的所有数据
    • 在数据表中进行增删改操作,查看程序控制台输出结果
    1.3.2 集群测试
    • 将 FlinkCDC 程序进行打包并上传到集群

    • 启动 Hadoop、zookeeper 和 Flink 集群

    • 运行 FlinkCDC 程序

      bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
      
    • 给当前的 Flink 程序创建 Savepoint

      bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
      
      
    • 停止 FlinkCDC 程序

    • 在Mysql数据表中进行增删改操作

    • 从 Savepoint 重启程序查看程序输出结果

      bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
      

    2. Flink SQL 实现

    2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持

    public class FlinkSQLCDC {
        public static void main(String[] args) throws Exception {
            //1. 创建 Flink 执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            
            //2. 创建 FlinkSQL 表环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            
            //3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offset
            tableEnv.executeSql(
            	"create table user_info (" +
                "id String primary key, name String, sex String) with (" +
                " 'connector' = 'mysql-cdc'," +
                " 'scan.startup.mode' = 'initial'," +
                " 'hostname' = 'hadoop102'," +
                " 'port' = '3306'," +
                " 'username' = 'root'," +
                " 'password' = '123456'," +
                " 'database-name' = 'cdc_test'," +
                " 'table-name' = 'user_info'" +
                ")"
            );
            
            //4. 查询输出表中数据
            Table table = tableEnv.sqlQuery("select * from user_info");
            DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);
            dataStream.print();
            
            //5. 启动任务
            env.execute("FlinkSqlCDC");
        }
    }
    

    3. 自定义反序列化器

    规范化数据输出格式,方便后续解析

    /**
    	自定义反序列化器:实现 DebeziumDeserializationSchema 接口并实现 deserialize 和 getProducedType 方法 
    */
    public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {
        /*
        	想要展示的数据格式:
        	{
        		"dbName":"",
        		"tableName":"",
        		"before":{"field1":"value1",...},
        		"after":{"field1":"value1",...},
        		"op":""
        	}
        */
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            JSONObject result = new JSONObject();
            
            //1.获取库名和表名
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            
            //2. 获取 before 数据
            Struct value = (Struct) sourceRecord.value();
            Struct before = value.getStruct("before");
            JSONObject beforeJSON = new JSONObject();
            if(before != null) {
                Schema schema = before.schema();
                List<Field> fields = schema.fields();
                
                for(Field field : fields) {
                    beforeJSON.put(field.name(), before.get(field));
                }
            }
            
            //3. 获取 after 数据
            Struct after = value.getStruct("after");
            JSONObject afterJSON = new JSONObject();
            if(after != null) {
                Schema schema = after.schema();
                List<Field> fields = schema.fields();
                
                for(Field field : fields) {
                    afterJSON.put(field.name(), after.get(field));
                }
            }
            
            //4. 获取操作类型 READ DELETE UPDATE CREATE
     		Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            
            result.put("dbName", fields[1]);
            result.put("tableName", fields[2]);
            result.put("before", beforeJSON);
            result.put("after", afterJSON);
            result.put("op", operation);
            
            collcetor.collect(result.toJSONString());
        }
        
        
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }	
    }
    
  • 相关阅读:
    【Git】git多人共享协作添加成员并授权与git拉取或提交冲突解决
    【前端实例代码】Html5+css3+JavaScript实现新拟态新拟物风格(Neumorphism)气泡图标泡泡网页效果!手把手教学!新手必会!超简单 ~
    HDFS的存储原理
    开发、部署系统环境 - docker 各常用镜像的使用
    小型ATC显示系统mini ATC Display
    108.firefly-sdk下生成recovery.img
    蓝桥杯第1390题——A Careful Approach
    css横向滚动条支持鼠标滚轮
    固态硬盘开盘数据恢复的方法
    谷粒商城一
  • 原文地址:https://blog.csdn.net/weixin_44480009/article/details/139678647