• 【基础架构】Flink/Flink-CDC的部署和配置(MySQL / ES)


    简介

    综合资源包

    方法 / 步骤

    一: 部署Flink

    • 添加Flink 到环境变量
    #flink
    export FLINK_HOME=/usr/local/flink/flink-1.15.0/
    export PATH=$FLINK_HOME/bin:$PATH
    
    # 重载环境变量配置
    source /etc/profile
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • Flink 配置
    # 开启外网访问
    rest.bind-address: 0.0.0.0
    
    • 1
    • 2
    • 启动Flink
    # 启动Flink集群
    ./start-cluster.sh
    
    # 停止Flink集群
    #./stop-cluster.sh
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    启动成功以后访问 服务的8081端口,可看到Flink Web UI 界面:
    在这里插入图片描述

    二: 配置同步插件Flink_CDC并配置Demo

    2.1 上传Flink_CDC驱动包和MySQL驱动包:放到主文件的lib目录下

    elasticsearch连接器-后面的是flink的版本,要保证和flink版本一致。
    ES SQL驱动包
    在这里插入图片描述

    2.2流式ETL作业demo

    # 校验是否开启binlog 如果显示OFF则代表未开启
    show variables like 'log_bin';
    
    • 1
    • 2

    如果没有开启,找到配置文件添加配置

    [mysqld]
    #开启binlog
    log-bin = mysql-bin
    
    #选择row模式
    binlog-format = ROW
    
    #配置mysql replication需要定义,不能喝canal的slaveId重复
    server_id = 1 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.2.1 MySQL创建数据库和表 products,orders,并插入数据

    -- MySQL
    CREATE DATABASE flinkcdc;
    USE flinkcdc;
    CREATE TABLE products (
      id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      name VARCHAR(255) NOT NULL,
      description VARCHAR(512)
    );
    ALTER TABLE products AUTO_INCREMENT = 101;
    
    INSERT INTO products
    VALUES (default,"scooter","Small 2-wheel scooter"),
           (default,"car battery","12V car battery"),
           (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
           (default,"hammer","12oz carpenter's hammer"),
           (default,"hammer","14oz carpenter's hammer"),
           (default,"hammer","16oz carpenter's hammer"),
           (default,"rocks","box of assorted rocks"),
           (default,"jacket","water resistent black wind breaker"),
           (default,"spare tire","24 inch spare tire");
    
    CREATE TABLE orders (
      order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
      order_date DATETIME NOT NULL,
      customer_name VARCHAR(255) NOT NULL,
      price DECIMAL(10, 5) NOT NULL,
      product_id INTEGER NOT NULL,
      order_status BOOLEAN NOT NULL -- Whether order has been placed
    ) AUTO_INCREMENT = 10001;
    
    INSERT INTO orders
    VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
           (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
           (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    三: 创建Flink_CDC虚拟表

    3.1 启动Flink SQL CLI

    ./bin/sql-client.sh
    
    • 1

    在这里插入图片描述

    • 开启 checkpoint,每隔3秒做一次 checkpoint
    Flink SQL> SET execution.checkpointing.interval = 3s;
    
    • 1

    3.2 控制台创建CDC虚拟表

    • 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据:
    CREATE TABLE products (
        id INT,
        name STRING,
        description STRING,
        PRIMARY KEY (id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.3.23',
        'port' = '3307',
        'username' = 'root',
        'password' = 'My123456',
        'database-name' = 'flinkcdc',
        'table-name' = 'products'
      );
    
    CREATE TABLE orders (
        order_id INT,
        order_date TIMESTAMP(0),
        customer_name STRING,
        price DECIMAL(10, 5),
        product_id INT,
        order_status BOOLEAN,
        PRIMARY KEY (order_id) NOT ENFORCED
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '192.168.3.23',
        'port' = '3307',
        'username' = 'root',
        'password' = 'My123456',
        'database-name' = 'flinkcdc',
        'table-name' = 'orders'
      );
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 查询是否添加成功
    select * from orders;
    select * from products;
    
    • 1
    • 2

    四: Flink_CDC创建ES数据

    4.1 创建ES数据

    • 创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
    CREATE TABLE enriched_orders (
       order_id INT,
       order_date TIMESTAMP(0),
       customer_name STRING,
       price DECIMAL(10, 5),
       product_id INT,
       order_status BOOLEAN,
       product_name STRING,
       product_description STRING,
       PRIMARY KEY (order_id) NOT ENFORCED
     ) WITH (
         'connector' = 'elasticsearch-7',
         'hosts' = 'http://192.168.1.71:9200',
         'index' = 'enriched_orders'
     );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
    insert into enriched_orders
    select 
    	o.order_id      as order_id,
    	o.order_date    as order_date,
    	o.customer_name as customer_name,
    	o.price         as price,
    	o.product_id    as product_id,
    	o.order_status  as order_status,
    	p.name          as product_name,
    	p.description   as product_description
    from orders as o
    left join products as p on o.product_id=p.id;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    在这里插入图片描述

    4.2 访问 Kibana 可看到订单宽表的数据:

    在这里插入图片描述
    接下来,修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:

    参考资料 & 致谢

    [1] [GitHub仓库] flink
    [2] [GitHub仓库] link-cdc-connectors
    [3] [FlinkCDC实时同步MySQL数据到ES
    [4] 从 MySQL 到 ElasticSearch

  • 相关阅读:
    贪心算法一:最优装载问题
    matlab 读写ENVI标准数据
    基于ssm实验室管理系统源码(含文档)
    搭建个人知识付费应用系统-(6)Sanity 集成
    el-dialog关闭后表单数据缓存没清空【已解决】
    Pytorch2.0发布了,向下兼容,加一句代码,性能翻番
    【数据挖掘】百度机器学习-数据挖掘-自然语言处理工程师 历史笔试详解
    小白网络安全学习手册(黑客技术)
    5.中间件
    mybatis缓存-二级缓存
  • 原文地址:https://blog.csdn.net/YangCheney/article/details/125561893