• Flink_CDC搭建及简单使用


    Flink_CDC搭建及简单使用

    1.CDC简介:

    CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC 。但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术。

    目前市面上的CDC技术非常多,常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等。DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求。这种作业方式无法保证数据的一致性,实时性也较差。Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术。这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据。

    2.Flink_CDC简介:

    目前公司主要是通过canal监控mysql的binlog日志,然后将日志数据实时发送到kafka中,通过flink程序,将日志数据实时下发到其他服务中。这种方式,数据链路长,实时性效果较差,运维也比较复杂。

    Flink_CDC技术的出现,解决了传统数据库实时同步的痛点。Flink_CDC通过伪装成msql的slave节点,实时读取master节点全量和增量数据,它能够捕获所有数据的变化,捕获完整的变更记录,无需像查询CDC那样发起全表的扫描过滤,高效且无需入侵代码,完全与业务解耦,运维及其简单。

    3.Flink_CDC部署:

    3.1 依赖版本
    环境:Linux(Centos7)
    Flink : 1.31.1
    Flink_CDC: flink-sql-connector-mysql-cdc-2.1.0.jar
    mysql版本:8.0.13
    mysql驱动包:mysql-connector-java-8.0.13.jar
    
    • 1
    • 2
    • 3
    • 4
    • 5
    3.2环境搭建

    3.2.1安装java环境(不再赘述);

    3.2.2安装数据库(不在赘述);

    3.2.3搭建Flink环境(单机模式);

    1.获取flink版本。

      cd /home
      wget https://archive.apache.org/dist/flink/flink-1.9.0/flink-1.13.1-bin-scala_2.11.tgz
    
    • 1
    • 2

    2.解压flink:

     tar -zxvf flink-1.13.1-bin-scala_2.11.tgz 
    
    • 1

    3.编辑flink配置文件,配置java环境

    cd flink-1.13.1
    vim conf/flink-conf.yaml
    添加配置:env.java.home=/home/jdk/jdk1.8.0_291
    
    • 1
    • 2
    • 3

    4.上传flink_CDC驱动包和mysql驱动包:

    cd flink-1.13.1/lib
    上传:
    flink-sql-connector-mysql-cdc-2.1.0.jar
    mysql-connector-java-8.0.13.jar
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    5.启动flink集群:

    /bin/start-cluster.sh
    
    • 1
    3.3创建mysql表:
    CREATE TABLE `products` (
       `id` int NOT NULL,
       `name` varchar(45) DEFAULT NULL,
       `description` varchar(45) DEFAULT NULL,
       `weight` decimal(10,3) DEFAULT NULL,
       PRIMARY KEY (`id`)
     ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    3.4启动flink-sql-client

    本次主要是通过flink的sql客户端来测试的。

    ./flink-1.13.1/bin/sql-client.sh
    
    • 1
    3.5创建Flink_CDC虚拟表:
    CREATE TABLE `products_cdc` (
     id INT NOT NULL,
     name varchar(32),
     description varchar(45),
     weight DECIMAL(10,3)
    ) WITH (
     'scan.incremental.snapshot.enabled' = 'false',
     'connector' = 'mysql-cdc',
     'hostname' = '0.0.0.0',
     'port' = '3306',
     'username' = 'root',
     'password' = '123456',
     'database-name' = 'test_db',
     'table-name' = 'products'
    );
    
    
    ###如果未设置'scan.incremental.snapshot.enabled' = 'false',会报错:
    
    Caused by: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.validatePrimaryKeyIfEnableParallel(MySqlTableSourceFactory.java:186)
        at com.alibaba.ververica.cdc.connectors.mysql.table.MySqlTableSourceFactory.createDynamicTableSource(MySqlTableSourceFactory.java:85)
        at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:134)
        ... 30 more
        
    ###报错原因:
    
      MySQL CDC源表在Flink 1.13版本会进行语法检查,在MySQL CDC DDL WITH参数中,未设置主键(Primary Key)信息。因为Flink 1.13版本,新增支持按PK分片,进行多并发读取数据的功能。
      
    ###解决方案:
    
    如果在Flink 1.13版本,您需要多并发读取MySQL数据,则在DDL中添加PK信息。
    如果在Flink 1.13版本,您不需要多并发读取MySQL数据,则在DDL中添加scan.incremental.snapshot.enabled 参数,且把该参数值设置为false,无需设置PK信息。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    3.6查询CDC表数据:

    3.6.1 查看表数据
    select * from products_cdc;
    在这里插入图片描述
    3.6.2 在数据库中新增一条数据:
    insert into products(id,name,description,weight) values(5,‘gg’,‘haha’,60);
    在这里插入图片描述
    3.6.3观察products_cdc表数据变化;
    在这里插入图片描述

    到此,通过flink-sql-client来增量获取mysql全量和增量数据变化的样例已结束。

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    【分享】“钉钉第三方“ 在集简云集成应用的常见问题与解决方案
    使用vscode实现远程开发,并通过内网穿透在公网环境下远程连接
    培美曲塞肌白蛋白纳米粒|磺胺地索辛豆清白蛋白纳米粒|茴香酰胺蓖麻蛋白纳米粒(化学品)
    Taurus.MVC WebAPI 入门开发教程4:控制器方法及参数定义、获取及基础校验属性【Require】。
    Springboot毕设项目码头船只出行及配套货柜码放管理4ab7d(java+VUE+Mybatis+Maven+Mysql)
    PHP代码审计工具
    LINUX安装openssl
    数据结构与算法【Java】03---栈
    (DenseNet)Densely Connected Convolutional Networks--Gao Huang
    深度解析纽约时报个人叙事赛
  • 原文地址:https://blog.csdn.net/m0_67394006/article/details/126083066