• 大数据下一代变革之必研究数据湖技术Hudi原理实战双管齐下-后续


    集成Flink

    编程示例

    本节通过一个简单Flink写入Hudi表的编程示例,后续可结合自身业务拓展,先创建一个Maven项目,这次就使用Java来编写Flink程序。

    由于中央仓库没有scala2.12版本的资源,前面文章已经编译好相关jar,那这里就将hudi-flink1.15-bundle-0.12.1.jar手动安装到本地maven仓库

    mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.1 -Dpackaging=jar -Dfile=./hudi-flink1.15-bundle-0.12.1.jar
    
    • 1

    Pom文件内容添加如下内容:

    <?xml version="1.0" encoding="UTF-8"?>
    
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>cn.itxs</groupId>
      <artifactId>hudi-flink-demo</artifactId>
      <version>1.0</version>
    
      <name>hudi-flink-demo</name>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hoodie.version>0.12.1</hoodie.version>
        <hadoop.version>3.3.4</hadoop.version>
        <flink.version>1.15.1</flink.version>
        <slf4j.version>2.0.5</slf4j.version>
      </properties>
      <dependencies>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-runtime-web</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
          <scope>provided</scope>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-statebackend-rocksdb</artifactId>
          <version>${flink.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
          <version>${hadoop.version}</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.hudi</groupId>
          <artifactId>hudi-flink_${scala.binary.version}</artifactId>
          <version>${hoodie.version}</version>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.10.1</version>
            <configuration>
              <source>1.8</source>
              <target>1.8</target>
              <encoding>${project.build.sourceEncoding}</encoding>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.4.1</version>
            <executions>
              <execution>
                <phase>package</phase>
                <goals>
                  <goal>shade</goal>
                </goals>
                <configuration>
                  <filters>
                    <filter>
                      <artifact>*:*</artifact>
                      <excludes>
                        <exclude>META-INF/*.SF
                        META-INF/*.DSA
                        META-INF/*.RSA
                      
                    
                  
                
              
            
          
        
      
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128

    创建一个HudiDemo的Java文件实现一个简单写入hudi表流程

    package cn.itxs;
    
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
    import org.apache.flink.contrib.streaming.state.PredefinedOptions;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    import java.util.concurrent.TimeUnit;
    
    public class HudiDemo
    {
        public static void main( String[] args )
        {
            //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 本地启动flink的web页面
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    
            EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);
            embeddedRocksDBStateBackend.setDbStoragePath("file:///D:/rocksdb");
            embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
            env.setStateBackend(embeddedRocksDBStateBackend);
    
            env.enableCheckpointing(TimeUnit.SECONDS.toMillis(5), CheckpointingMode.EXACTLY_ONCE);
            CheckpointConfig checkpointConfig = env.getCheckpointConfig();
            checkpointConfig.setCheckpointStorage("hdfs://hadoop1:9000/checkpoints/flink");
            checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(2));
            checkpointConfig.setTolerableCheckpointFailureNumber(5);
            checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
            checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            tableEnv.executeSql("CREATE TABLE source_a2 (\n" +
                    " uuid varchar(20),\n" +
                    " name varchar(10),\n" +
                    " age int,\n" +
                    " ts timestamp(3),\n" +
                    " `partition` varchar(20),\n" +
                    " PRIMARY KEY(uuid) NOT ENFORCED\n" +
                    " ) WITH (\n" +
                    " 'connector' = 'datagen',\n" +
                    " 'rows-per-second' = '1'\n" +
                    ")"
            );
    
            tableEnv.executeSql("CREATE TABLE a2 (\n" +
                    " uuid varchar(20),\n" +
                    " name varchar(10),\n" +
                    " age int,\n" +
                    " ts timestamp(3),\n" +
                    " `partition` varchar(20),\n" +
                    "PRIMARY KEY(uuid) NOT ENFORCED\n" +
                    " ) WITH (\n" +
                    " 'connector' = 'hudi',\n" +
                    " 'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/a2',\n" +
                    " 'table.type' = 'MERGE_ON_READ'\n" +
                    ")"
            );
    
            tableEnv.executeSql("insert into a2 select * from source_a2");
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    通过使用createLocalEnvironmentWithWebUI开启动FlinkWebUI,也即是可以在本地上查看flink的web页面

    image-20221130182807037

    本地rocksdb状态后端也有对应的存储数据

    image-20221130183047716

    HDFS上也可以查看到刚刚创建的hudi表信息

    image-20221130182926955

    打包运行

    对上面小修改一下代码,将最前面的环境中注释createLocalEnvironmentWithWebUI和setDbStoragePath,放开getExecutionEnvironment;将表名改为a3,执行mvn package编译打包,将打包的文件上传

    flink run -t yarn-per-job -c cn.itxs.HudiDemo /home/commons/flink-1.15.1/otherjars/hudi-flink-demo-1.0.jar
    
    • 1

    运行日志如下

    image-20221201150040363

    查看Yarn的application_1669357770610_0019

    image-20221201150231163

    查看HDFS也可以查看到刚刚创建的hudi表信息

    image-20221201150321749

    CDC入湖

    概述

    CDC 即 Change Data Capture 变更数据捕获,可以通过 CDC 得知数据源表的更新内容(包含Insert Update 和 Delete),并将这些更新内容作为数据流发送到下游系统。捕获到的数据操作具有一个标识符,分别对应数据的增加,修改和删除。

    • +I:新增数据。
    • -U:一条数据的修改会产生两个U标识符数据。其中-U含义为修改前数据。
    • +U:修改之后的数据。
    • -D:删除的数据。

    CDC数据保存了完整的数据库变更,可以通过以下任意一种方式将数据导入Hudi:

    • 对接CDC Format,消费Kafka数据的同时导入Hudi。支持debezium-json、canal-json和maxwell-json三种格式,该方式优点是可扩展性强,缺点是需要依赖Kafka和Debezium数据同步工具。
    • 通过Flink-CDC-Connector直接对接DB的Binlog,将数据导入Hudi。该方式优点是轻量化组件依赖少。

    说明

    • 如果无法保证上游数据顺序,则需要指定write.precombine.field字段。
    • 在CDC场景下,需要开启changelog模式,即changelog.enabled设为true。

    image-20221201173141437

    下面则演示上面第一种方式方式的使用

    MySQL 启用 binlog

    下面以 MySQL 5.7 版本为例说明。修改my.cnf文件,增加:

    server_id=1
    log_bin=mysql-bin
    binlog_format=ROW
    expire_logs_days=30
    
    • 1
    • 2
    • 3
    • 4

    初始化MySQL 源数据表

    先创建演示数据库 test和一张 student 表

    create database test;
    use test;
    CREATE TABLE `student` (
    	`id` INT NOT NULL AUTO_INCREMENT,
    	`name` varchar(10) NOT NULL,
    	`age` int NOT NULL,
    	`class` varchar(10) DEFAULT NULL,
    	PRIMARY KEY (`id`)
    ) ENGINE = InnoDB CHARSET = utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    准备Jar包依赖

    将flink-sql-connector-mysql-cdc-2.3.0.jar和flink-sql-connector-kafka-1.15.1.jar上传到flink的lib目录下

    flink-sql-connector-mysql-cdc-2.3.0.jar可以从github上下载 https://github.com/ververica/flink-cdc-connectors

    flink-sql-connector-kafka-1.15.1.jar直接在maven仓库下

    image-20221202093350817

    flink读取mysql binlog写入kafka

    • 创建mysql表
    CREATE TABLE student_binlog (
     id INT NOT NULL,
     name STRING,
     age INT,
     class STRING,
     PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = 'mysqlserver',
     'port' = '3308',
     'username' = 'root',
     'password' = '123456',
     'database-name' = 'test',
     'table-name' = 'student'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 创建kafka表
    create table student_binlog_sink_kafka(
     id INT NOT NULL,
     name STRING,
     age INT,
     class STRING,
     PRIMARY KEY (`id`) NOT ENFORCED
    ) with (
      'connector'='upsert-kafka',
      'topic'='data_test',
      'properties.bootstrap.servers' = 'kafka1:9092',
      'properties.group.id' = 'testGroup',
      'key.format'='json',
      'value.format'='json'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    image-20221202112915486

    • 将mysql binlog日志写入kafka
    insert into student_binlog_sink_kafka select * from student_binlog;
    
    • 1

    image-20221202093022660

    查看Flink的Web UI,可以看到刚才提交的job

    image-20221202091513257

    开启tableau方式查询表

    set 'sql-client.execution.result-mode' = 'tableau';select * from student_binlog_sink_kafka;
    
    • 1

    往mysql的student表插入和更新数据测试下

    INSERT INTO student VALUES(1,'张三',16,'高一3班');
    COMMIT;
    INSERT INTO student VALUES(2,'李四',18,'高三3班');
    COMMIT;
    UPDATE student SET NAME='李四四' WHERE id = 2;
    COMMIT;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    image-20221202092737840

    flink读取kafka数据并写入hudi数据湖

    • 创建Kafka源表
    CREATE TABLE student_binlog_source_kafka (
     id INT NOT NULL,
     name STRING,
     age INT,
     class STRING
    )
    WITH(
        'connector' = 'kafka',
        'topic'='data_test',
        'properties.bootstrap.servers' = 'kafka1:9092',
        'properties.group.id' = 'testGroup',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'json'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 创建hudi目标表
    CREATE TABLE student_binlog_sink_hudi (
     id INT NOT NULL,
     name STRING,
     age INT,
     class STRING,
     PRIMARY KEY (`id`) NOT ENFORCED
    )
    PARTITIONED BY (`class`)
    WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/student_binlog_sink_hudi',
      'table.type' = 'MERGE_ON_READ',
      'write.option' = 'insert',
      'write.precombine.field' = 'class'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 将kafka数据写入hudi表
    insert into student_binlog_sink_hudi select * from student_binlog_source_kafka;
    
    • 1

    mysql中student表新增加2条数据

    INSERT INTO student VALUES(3,'韩梅梅',16,'高二2班');
    INSERT INTO student VALUES(4,'李雷',16,'高二2班');
    COMMIT;
    
    • 1
    • 2
    • 3

    查看HDFS中已经有相应的分区和数据了

    image-20221202113439161

    调优

    Memory

    参数名称描述默认值备注
    write.task.max.size每个write task使用的最大内存,超过则对数据进行flush1024MBwrite buffer使用的内存 = write.task.max.size - compaction.max_memory,当write buffer总共使用的内存超过限制,则将最大的buffer进行flush
    write.batch.size数据写入batch的大小64MB推荐使用默认配置
    write.log_block.sizeHudi的log writer将数据进行缓存,等达到该参数限制,才将数据flush到disk形成LogBlock128MB推荐使用默认配置
    write.merge.max_memoryCOW类型的表,进行incremental data和data file能使用的最大heap size100MB推荐使用默认配置
    compaction.max_memory每个write task进行compaction能使用的最大heap size100MB如果是online compaction,且资源充足,可以调大该值,如1024MB

    Parallelism

    参数名称描述默认值备注
    write.taskswrite task的并行度,每一个write task写入1~N个顺序buckets4增加该值,对小文件的数据没有影响
    write.bucket_assign.tasksbucket assigner operators的并行度Flink的parallelism.default参数增加该值,会增加bucket的数量,所以也会增加小文件的数量
    write.index_boostrap.tasksindex bootstrap的并行度Flink的parallelism.default参数
    read.tasksread operators的并行度4
    compaction.tasksonline compaction的并行度4推荐使用offline compaction

    Compaction

    只适用于online compaction

    参数名称描述默认值备注
    compaction.schedule.enabled是否定期生成compaction plantrue即使compaction.async.enabled = false,也推荐开启该值
    compaction.async.enabledMOR类型表默认开启Async Compactiontruefalse表示关闭online compaction
    compaction.trigger.strategy触发compaction的Strategynum_commits可选参数值:1. num_commits:delta commits数量达到多少;2. time_elapsed:上次compaction过后多少秒;3. num_and_time:同时满足num_commits和time_elapsed;4. num_or_time:满足num_commits或time_elapsed
    compaction.delta_commits5
    compaction.delta_seconds3600
    compaction.target_io每个compaction读写合计的目标IO,默认500GB512000

    集成Hive

    hudi源表对应一份hdfs数据,可以通过spark,flink 组件或者hudi客户端将hudi表的数据映射为hive外部表,基于该外部表, hive可以方便的进行实时视图,读优化视图以及增量视图的查询。

    集成步骤

    这里以hive3.1.3(关于hive可以详细查看前面的文章)、 hudi 0.12.1为例, 其他版本类似

    将hudi-hadoop-mr-bundle-0.9.0xxx.jar , hudi-hive-sync-bundle-0.9.0xx.jar 放到hiveserver 节点的lib目录下

    cd /home/commons/apache-hive-3.1.3-bin
    cp -rf /home/commons/hudi-release-0.12.1/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.1.jar lib/
    cp -rf /home/commons/hudi-release-0.12.1/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.1.jar lib/
    
    • 1
    • 2
    • 3

    按照需求选择合适的方式并重启hive

    nohup hive --service metastore &
    nohup hive --service hiveserver2 &
    
    • 1
    • 2

    image-20221202140239157

    连接jdbc hive2测试,显示所有数据库

    image-20221202140441756

    Flink同步Hive

    Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置示例如下

    CREATE TABLE t7(
      id int,
      num int,
      ts int,
      primary key (id) not enforced
    )
    PARTITIONED BY (num)
    with(
      'connector'='hudi',
      'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t7',
      'table.type'='COPY_ON_WRITE', 
      'hive_sync.enable'='true', 
      'hive_sync.table'='h7', 
      'hive_sync.db'='default', 
      'hive_sync.mode' = 'hms',
      'hive_sync.metastore.uris' = 'thrift://hadoop2:9083'
    );
    insert into t7 values(1,1,1);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    Hive Catalog

    Flink官网的找到对应文档版本找到connector-hive,下载flink-sql-connector-hive-3.1.2_2.12-1.15.1.jar,上传到flink的lib目录下,建表示例

    CREATE CATALOG hive_catalog WITH (
        'type' = 'hive',
        'default-database' = 'default',
        'hive-conf-dir' = '/home/commons/apache-hive-3.1.3-bin/conf/'
    );
    
    use catalog hive_catalog;
    CREATE TABLE t8(
      id int,
      num int,
      ts int,
      primary key (id) not enforced
    )
    PARTITIONED BY (num)
    with(
      'connector'='hudi',
      'path' = 'hdfs://hadoop1:9000/tmp/hudi_flink/t8',
      'table.type'='COPY_ON_WRITE', 
      'hive_sync.enable'='true', 
      'hive_sync.table'='h8', 
      'hive_sync.db'='default', 
      'hive_sync.mode' = 'hms',
      'hive_sync.metastore.uris' = 'thrift://hadoop2:9083'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    本人博客网站IT小神 www.itxiaoshen.com

  • 相关阅读:
    备战金九银十!!! 互联网Java岗面试手册,收藏=offer拿到手软!!
    [RK3568 Android11] Binder驱动结构体
    基于SpringBoot的大学生租房平台
    ASUS华硕灵耀X2 Duo UX481FA(FL,FZ)_UX4000F工厂模式原装出厂Windows10系统
    yolo v5 与 yolo v7 在一个项目中混合使用是否可行?
    Toronto Research Chemicals BTK甜味剂配方丨D-Abequose
    客户端存储localStorage和sessionStorage以及Cookie
    js面试题==和===
    ElasticSearch学习总结(二):ES介绍与架构说明
    算法修养--A*寻路算法
  • 原文地址:https://blog.csdn.net/qq_20949471/article/details/128156640