• HUDI(搭建详细记录附加jar)


    ​​​​​​​


    前言

    Apache Hudi(发音为“hoodie”)是下一代流数据湖平台。Apache Hudi 将核心仓库和数据库功能直接引入数据湖。Hudi 提供、 事务高效的 upserts/deletes高级索引、 流式摄取服务、数据集群/压缩优化和并发,同时将您的数据保持为开源文件格式。

    Apache Hudi 不仅非常适合流式工作负载,而且还允许您创建高效的增量批处理管道。阅读文档以获取更多用例描述并查看谁在使用 Hudi,了解世界上一些最大的数据湖,包括UberAmazon、 ByteDance、 Robinhood等如何使用 Hudi 转变他们的生产数据湖。

    Apache Hudi 可以轻松地在任何云存储平台上使用。Hudi 的高级性能优化,使用任何流行的查询引擎(包括 Apache Spark、Flink、Presto、Trino、Hive 等)使分析工作负载更快。


    一、Hudi核心概念

    如果您对 Apache Hudi 比较陌生,熟悉一些核心概念很重要:

    二、使用步骤

    1.拉取code 并编译。

    # Checkout code and build
    git clone https://gitee.com/apache/Hudi.git && cd Hudi
    mvn clean package -DskipTests
    如果编译失败可以切换分支
    

    1.拉取远程分支

    git pull origin release-0.11.0:release-0.11.0

    2.切换

    git checkout release-0.11.0

    3.编译这里有参数可以设置

    mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12

    这里编译我的机器比较慢几个小时所有我把编译好的包也会贴到网盘有需要可以去下载

    hudi-spark3.2-bundle_2.12-0.11.0.jar-其它文档类资源-CSDN下载

    hudi-hadoop-mr-bundle-0.11.0.jar-Hadoop文档类资源-CSDN下载

    hudi-flink1.14-bundle_2.12-0.11.0.jar-Hadoop文档类资源-CSDN下载

    编译 完成进入cd packaging/

    hadoop,hive,spark,等等的部署就不贴了。


    使用不同的 Spark 版本构建

    支持的默认 Spark 版本是 2.4.4。有关使用不同 Spark 和 Scala 版本进行构建的信息,请参阅下表。

    Maven build optionsExpected Spark bundle jar nameNotes
    (empty)hudi-spark-bundle_2.11 (legacy bundle name)For Spark 2.4.4 and Scala 2.11 (default options)
    -Dspark2.4hudi-spark2.4-bundle_2.11For Spark 2.4.4 and Scala 2.11 (same as default)
    -Dspark2.4 -Dscala-2.12hudi-spark2.4-bundle_2.12For Spark 2.4.4 and Scala 2.12
    -Dspark3.1 -Dscala-2.12hudi-spark3.1-bundle_2.12For Spark 3.1.x and Scala 2.12
    -Dspark3.2 -Dscala-2.12hudi-spark3.2-bundle_2.12For Spark 3.2.x and Scala 2.12
    -Dspark3hudi-spark3-bundle_2.12 (legacy bundle name)For Spark 3.2.x and Scala 2.12
    -Dscala-2.12hudi-spark-bundle_2.12 (legacy bundle name)For Spark 2.4.4 and Scala 2.12

     例如

    1. # 针对 Spark 3.2.x 构建
    2. mvn clean package -DskipTests -Dspark3.2 -Dscala-2.12
    3. # 针对 Spark 3.1.x 构建
    4. mvn clean package -DskipTests -Dspark3.1 -Dscala-2.12
    5. # 针对 Spark 2.4.4 和 Scala 2.12 构建
    6. mvn clean package -DskipTests -Dspark2.4 -Dscala-2.12


    那么“spark-avro”模块呢?

    从 0.11 版本开始,不再需要spark-avro使用指定Hudi--packages

    使用不同的 Flink 版本构建

    支持的默认 Flink 版本是 1.14。有关使用不同 Flink 和 Scala 版本进行构建的信息,请参阅下表。

    Maven build optionsExpected Flink bundle jar nameNotes
    (empty)hudi-flink1.14-bundle_2.11For Flink 1.14 and Scala 2.11 (default options)
    -Dflink1.14hudi-flink1.14-bundle_2.11For Flink 1.14 and Scala 2.11 (same as default)
    -Dflink1.14 -Dscala-2.12hudi-flink1.14-bundle_2.12For Flink 1.14 and Scala 2.12
    -Dflink1.13hudi-flink1.13-bundle_2.11For Flink 1.13 and Scala 2.11
    -Dflink1.13 -Dscala-2.12hudi-flink1.13-bundle_2.12For Flink 1.13 and Scala 2.12

    2.Hudi + hive

    1.将编译好的包copy到hive/lib

    cp /apps/Hudi/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.11.0.jar /apps/hive/lib

    2.启动Hive MetaStore与Hive Server2服务

    nohup hive --service metastore >> metastore.log 2>&1 &

    nohup hive --service hiveserver2 >> hiveserver2.log 2>&1 &

    3.数据查询 

    正常的查询hive 

    3.Hudi + spark

    Hudi 支持使用 Spark SQL 通过HoodieSparkSessionExtension sql 扩展来写入和读取数据。从提取的目录中使用 Hudi 运行 Spark SQL:

    代码如下(示例):

    1. # Spark 3.2
    2. spark-sql --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1 \
    3. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    4. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
    5. --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

    运行中

    出现这个情况是因为包的配置--packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1

    可以把jar包提前拷贝到spark/jars目录下这样可以把这个参数取消如下

    1. spark-sql --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    2. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
    3. --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

     # Spark 3.1
    spark-shell \
      --packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.11.1 \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

    # Spark 2.4
    spark-shell \
      --packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.11.1 \
      --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

    请注意以下事项

    • 对于 Spark 3.2,需要额外的 spark_catalog 配置:--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
    • 我们使用了为 scala 2.12 构建的 hudi-spark-bundle,因为使用的 spark-avro 模块也可以依赖于 2.12。

    开始

    // spark-shell
    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode._
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._

    val tableName = "hudi_trips_cow"
    val basePath = "file:///tmp/hudi_trips_cow"
    val dataGen = new DataGenerator

    数据保存

    // spark-shell
    val inserts = convertToStringList(dataGen.generateInserts(10))
    val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Overwrite).
      save(basePath)

    查询数据

    // spark-shell
    val tripsSnapshotDF = spark.
      read.
      format("hudi").
      load(basePath)
    tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

    spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()
    spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

    Time Travel Query

    spark.read.
      format("hudi").
      option("as.of.instant", "20210728141108100").
      load(basePath)

    spark.read.
      format("hudi").
      option("as.of.instant", "2021-07-28 14:11:08.200").
      load(basePath)

    // It is equal to "as.of.instant = 2021-07-28 00:00:00"
    spark.read.
      format("hudi").
      option("as.of.instant", "2021-07-28").
      load(basePath)
     

    Update data 

    // spark-shell
    val updates = convertToStringList(dataGen.generateUpdates(10))
    val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath)

    Incremental query

     

    // spark-shell
    // reload data
    spark.
      read.
      format("hudi").
      load(basePath).
      createOrReplaceTempView("hudi_trips_snapshot")

    val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from  hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
    val beginTime = commits(commits.length - 2) // commit time we are interested in

    // incrementally query data
    val tripsIncrementalDF = spark.read.format("hudi").
      option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
      load(basePath)
    tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from  hudi_trips_incremental where fare > 20.0").show()

     Point in time query

     

    // spark-shell
    val beginTime = "000" // Represents all commits > this time.
    val endTime = commits(commits.length - 2) // commit time we are interested in

    //incrementally query data
    val tripsPointInTimeDF = spark.read.format("hudi").
      option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
      option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
      option(END_INSTANTTIME_OPT_KEY, endTime).
      load(basePath)
    tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
    spark.sql("select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts from hudi_trips_point_in_time where fare > 20.0").show()

     Delete data

    // spark-shell
    // fetch total records count
    spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
    // fetch two records to be deleted
    val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)

    // issue deletes
    val deletes = dataGen.generateDeletes(ds.collectAsList())
    val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(OPERATION_OPT_KEY,"delete").
      option(PRECOMBINE_FIELD_OPT_KEY, "ts").
      option(RECORDKEY_FIELD_OPT_KEY, "uuid").
      option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
      option(TABLE_NAME, tableName).
      mode(Append).
      save(basePath)

    // run the same read query as above.
    val roAfterDeleteViewDF = spark.
      read.
      format("hudi").
      load(basePath)

    roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
    // fetch should return (total - 2) records
    spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

    Insert Overwrite 

     

    // spark-shell
    spark.
      read.format("hudi").
      load(basePath).
      select("uuid","partitionpath").
      sort("partitionpath","uuid").
      show(100, false)

    val inserts = convertToStringList(dataGen.generateInserts(10))
    val df = spark.
      read.json(spark.sparkContext.parallelize(inserts, 2)).
      filter("partitionpath = 'americas/united_states/san_francisco'")
    df.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(OPERATION.key(),"insert_overwrite").
      option(PRECOMBINE_FIELD.key(), "ts").
      option(RECORDKEY_FIELD.key(), "uuid").
      option(PARTITIONPATH_FIELD.key(), "partitionpath").
      option(TBL_NAME.key(), tableName).
      mode(Append).
      save(basePath)

    // Should have different keys now for San Francisco alone, from query before.
    spark.
      read.format("hudi").
      load(basePath).
      select("uuid","partitionpath").
      sort("partitionpath","uuid").
      show(100, false)

     

  • 相关阅读:
    【5年保更新】Python爬虫复盘案例,精彩文案多多多多
    实战演练 | 在 MySQL 中计算每日平均日期或时间间隔
    Java InputStream.markSupported()具有什么功能呢?
    菜鸟教程《Python 3 教程》笔记(17):输入和输出
    在仿真环境中运行lio-sam
    《向量数据库指南》——什么是 向量数据库Milvus Cloud的Range Search?
    微信小程序手机授权报错:pad block corrupted
    大数据平台开发经验
    linux进阶-构建deb软件安装包
    智能穿戴终端设备安卓主板方案_MTK平台智能手表PCBA定制开发
  • 原文地址:https://blog.csdn.net/hxiaowang/article/details/125392644