• flink操作hudi数据表


    基于flink1.14、spark3.2、hudi0.11,演示flink往hudi数据湖流式地写数据,hive和spark从数据湖读数据

    一、为hadoop、hive、flink添加hudi存储格式的支持

    1、编译hudi bundle

    cd /apps/src/hudi-release-0.11.1
    export JAVA_HOME=/apps/svr/jdk1.8.0_144
    export MAVEN_OPTS="-Xss64m -Xmx4g -XX:ReservedCodeCacheSize=2g -Dhadoop.version=2.9.2 -Dhive.version=2.3.6"
    mvn -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true -Pflink-bundle-shade-hive2 -Pscala-2.11 package
    
    • 1
    • 2
    • 3
    • 4

    各个bundle的jar包在packaging目录下

    2、hudi-flink1.14-bundle_2.11-0.11.1.jar放入flink的lib目录;然后重新部署一个flink on yarn实例

    3、hudi-hadoop-mr-bundle-0.11.1.jar放入hadoop的share/hadoop/hdfs/lib目录;然后重启所有nodeManager、resourceManager节点

    4、hudi-hadoop-mr-bundle-0.11.1.jar、hudi-hive-sync-bundle-0.11.1.jar放入hive的auxlib目录;然后重新启动hiveServer2服务

    二、flink写入hudi

    重新部署一个flink on yarn实例后,使用flink sql-client创建hudi格式数据表,支持同步元数据信息到hive可解析的数据表定义里,让hive mr也可以读取同一份parquet列存数据:

    -- 创建kafka动态source
    CREATE TABLE wzp.kafka_monitor (
      `partition` INT METADATA VIRTUAL,
      `offset` BIGINT METADATA VIRTUAL,
      `timestamp` TIMESTAMP(3) METADATA VIRTUAL,
      `svc_id` STRING,
      `type` STRING,
      `app` STRING,
      `url` STRING,
      `span` INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'monitor-log',
      'properties.bootstrap.servers' = '10.0.0.1:9092,10.0.0.2:9092',
      'properties.group.id' = 'wzp_test_flink',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json',
      'json.fail-on-missing-field' = 'false',
      'json.ignore-parse-errors' = 'true'
    );
     
    -- 创建hudi sink
    USE hudi_flink;
    CREATE TABLE rest_response_mor(
      `svc_id` STRING PRIMARY KEY NOT ENFORCED,
      `url` STRING,
      `span` INT,
      `timestamp` TIMESTAMP(3),
      `app` STRING
    )
    PARTITIONED BY (`app`)
    WITH (
      'connector' = 'hudi',
      'path' = 'hdfs://flink/huditables/rest_response_mor',
      'table.type' = 'MERGE_ON_READ', -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
      'hoodie.datasource.write.recordkey.field' = 'svc_id',
      'hoodie.datasource.write.hive_style_partitioning' = 'true',
      'hoodie.datasource.write.partitionpath.urlencode' = 'true',
      'hoodie.parquet.compression.codec'= 'snappy',
      'write.operation' = 'upsert',
      'write.precombine' = 'true',
      'write.precombine.field' = 'timestamp',
      'compaction.async.enabled' = 'true',
      'compaction.trigger.strategy' = 'num_and_time',
      'compaction.delta_commits' = '3',
      'compaction.delta_seconds' = '30',
      'hive_sync.enable' = 'true',
      'hive_sync.use_jdbc' = 'false',
      'hive_sync.mode' = 'hms',
      'hive_sync.metastore.uris' = 'thrift://machine1:9083',
      'hive_sync.db' = 'hudi_external',
      'hive_sync.table' = 'rest_response_mor',
      'hive_sync.assume_date_partitioning' = 'false',
      'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.HiveStylePartitionValueExtractor', -- 默认按日期分区
      'hive_sync.partition_fields' = 'app',
      'hive_sync.support_timestamp'= 'true'
    );
    
    -- 开启checkpoint才会写入数据到hudi sink
    set execution.checkpointing.interval=10sec;
    insert into hudi_flink.rest_response_mor(`svc_id`,`url`,`span`,`timestamp`,`app`) select `svc_id`,`url`,`span`,`timestamp`,`app` from wzp.kafka_monitor;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    可以看到metastore的hudi_external库里自动额外创建了ro和rt两张外部表,分区信息也同步到了metastore里,让其他引擎(hive mr、spark)能够读取到flink写入到hdfs的hudi格式数据:
    在这里插入图片描述

    三、flink查询hudi

    (一)Streaming Query

    读取实时变更记录:

    USE hudi_flink;
    SET 'execution.runtime-mode' = 'streaming';
    select * from rest_response_mor/*+ OPTIONS('read.streaming.enabled'='true', 'read.start-commit'='20220718000000')*/;
    
    • 1
    • 2
    • 3

    (二)Snapshot Query

    读取最新数据快照,默认hoodie.datasource.query.type就是snapshot:

    USE hudi_flink;
    SET 'execution.runtime-mode' = 'batch';
    select * from rest_response_mor/*+ OPTIONS('hoodie.datasource.query.type'='snapshot')*/;
    
    • 1
    • 2
    • 3

    (三)Read Optimized Query

    仅读取列存数据:

    USE hudi_flink;
    SET 'execution.runtime-mode' = 'batch';
    select * from rest_response_mor/*+ OPTIONS('hoodie.datasource.query.type'='read_optimized')*/;
    
    • 1
    • 2
    • 3

    (四)Incremental Query

    可实现时间旅行,即读取指定历史时间段内提交的数据:

    USE hudi_flink;
    SET 'execution.runtime-mode' = 'batch';
    select * from rest_response_mor/*+ OPTIONS('read.start-commit'='20220719101500','read.end-commit'='20220719101900')*/;
    
    • 1
    • 2
    • 3

    四、hive查询hudi

    必须hudi-hadoop-mr-bundle-0.11.1.jar放入hadoop的share/hadoop/hdfs/lib目录;然后重启所有nodeManager、resourceManager节点

    set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
    -- 仅查询已按列存优化的数据
    SELECT * FROM hudi_external.rest_response_mor_ro;
    -- 查询列存、行存合并后的最新数据
    SELECT * FROM hudi_external.rest_response_mor_rt;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    五、spark查询hudi

    (一)重新编译spark要使用的hudi bundle:

    mvn -DskipTests -DskipITs -Dcheckstyle.skip=true -Drat.skip=true -Pspark-bundle-shade-hive -Pspark3.2 -Pscala-2.12 package
    
    • 1

    把hudi-spark3.2-bundle_2.12-0.11.1.jar放入spark的jars目录

    (二)准备spark-sql启动参数:

    spark安装目录下的conf/spark-defaults.conf文件里,添加hudi要求的启动参数:

    spark.sql.catalogImplementation=hive
    # metastore服务地址
    spark.hadoop.hive.metastore.uris=thrift://machine1:9083
    # hive数据表的namenode,要和hiveServer2使用的namenode一致
    spark.hadoop.fs.defaultFS=hdfs://hive
    # hive数据表存储路径
    spark.sql.warehouse.dir=/hivetables
     
    # cli工具的内存
    spark.driver.memory=1g
    # yarn的application master的内存
    spark.yarn.am.memory=2g
    spark.executor.memory=3g
     
    # hudi要求的配置
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension
    spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    然后spark就可以直接使用hive metastore的hudi_external库里自动同步的数据表定义,读取flink写在hdfs上的hudi格式数据

  • 相关阅读:
    我的开源之路:耗时 6 个月发布线程池框架,GitHub 1.7k Star!
    7、JProfiler工具分析OOM原因
    Spark的基础
    uniapp 使用 z-paging组件
    MongoDB在Linux下的安装及其环境部署配置
    长文本翻译-免费长文本翻译软件
    编码与测试
    Mode Field Diameter(MFD)(模场直径)
    【算法题解】2022河南萌新联赛第(三)场:河南大学
    Swift Combine — Publisher、Operator、Subscriber概念介绍
  • 原文地址:https://blog.csdn.net/wzp1986/article/details/125894473