• Hudi从内核到实战介绍


    Hudi实战

    Hudi名称概念

    • Time Line

    Hudi的核心是维护不同时间对表执行的所有操作的事件表,这有助于提供表的即时视图,同时还有效地支持按到达顺序进行数据检索。Hudi包含以下组件:

    (1)Instant action:在表上的操作类型

    (2)Instant time:操作开始的一个时间戳,该时间戳会按照开始时间顺序单调递增

    (3)state:即时状态

    Hudi保证在时间轴上执行的操作都是原先性的,所有执行的操作包括:

    (1)commits:原子的写入一张表的操作

    (2)cleans:后台消除了表中的旧版本数据,即表中不再需要的数据

    (3)delta_commit:增量提交,将一批数据原子写入到MergeOnRead表中,并且只记录到增量日志中

    (4)compaction:后台协调Hudi中的差异数据

    (5)rollback:回滚,删除在写入过程中的数据

    (6)savepoint:将某些文件标记“已保存”,以便清理数据时不会删除它们,一般用于表的还原,可以将数据还原到某个时间点

    任何操作都可以处于以下状态:

    (1)Requested:表示已安排操作行为,但是尚未开始

    (2)Inflight:表示正在执行当前操作

    (3)Completed:表示已完成操作

    • File Managerment

    Hudi将表组织成DFS上基本路径下的目录结构。表分为几个分区,与hive类似,每个分区均有唯一标示。

    在每个分区内,有多个数据组,每个数据组包含几个文件片,其中文件片包含基本文件和日志文件。Hudi采用MVCC设计,其中压缩操作将日志文件和基本数据文件合并成新的文件片,而清除操作则将未使用的文件片去除。

    • 索引

    Hudi通过使用索引机制,生成hoodie密钥映射对应文件ID,从而提供高效upsert操作。

    • 表类型

    1. Copy on Write:仅使用列式存储,例如parquet。仅更新版本号,通过写入过程中执行同步合并来重写文件。

    2. Merge on Read:基于列式存储(parquet)和行式存储(arvo)结合的文件更始进行存储。更新记录到增量文件,压缩同步和异步生成新版本的文件。

    以下是对比:

    • 查询类型

    快照查询(Snapshot Queries):查询操作将查询最新快照的表数据。如果是Merge on Read类型的表,它将动态合并最新文件版本的基本数据和增量数据用于显示查询。如果是Copy On Write类型的表,它直接查询parquet表,同时提供upsert/delete操作。

    增量查询(Incremental Queries):查询只能看到写入表的新数据。这有效地提供了changestreams来启用增量数据管道。

    优化读查询(Read Optimized Queries):查询将查看给定提交/压缩操作表的最新快照。

    以下是对比:

    Hudi操作

    • pom.xml
    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.hudigroupId>
    4. <artifactId>hudi-clientartifactId>
    5. <version>0.5.3version>
    6. dependency>
    7. <dependency>
    8. <groupId>org.apache.hudigroupId>
    9. <artifactId>hudi-hiveartifactId>
    10. <version>0.5.3version>
    11. dependency>
    12. <dependency>
    13. <groupId>org.apache.hudigroupId>
    14. <artifactId>hudi-spark-bundle_2.11artifactId>
    15. <version>0.5.3version>
    16. dependency>
    17. <dependency>
    18. <groupId>org.apache.hudigroupId>
    19. <artifactId>hudi-commonartifactId>
    20. <version>0.5.3version>
    21. dependency>
    22. <dependency>
    23. <groupId>org.apache.hudigroupId>
    24. <artifactId>hudi-hadoop-mr-bundleartifactId>
    25. <version>0.5.3version>
    26. dependency>
    27. <dependency>
    28. <groupId>org.apache.sparkgroupId>
    29. <artifactId>spark-core_2.11artifactId>
    30. <version>2.4.5version>
    31. dependency>
    32. <dependency>
    33. <groupId>org.apache.sparkgroupId>
    34. <artifactId>spark-sql_2.11artifactId>
    35. <version>2.4.5version>
    36. dependency>
    37. <dependency>
    38. <groupId>org.apache.sparkgroupId>
    39. <artifactId>spark-hive_2.11artifactId>
    40. <version>2.4.5version>
    41. dependency>
    42. <dependency>
    43. <groupId>org.apache.sparkgroupId>
    44. <artifactId>spark-avro_2.11artifactId>
    45. <version>2.4.5version>
    46. dependency>
    47. <dependency>
    48. <groupId>org.scala-langgroupId>
    49. <artifactId>scala-libraryartifactId>
    50. <version>${scala.version}version>
    51. dependency>
    52. <dependency>
    53. <groupId>org.apache.hadoopgroupId>
    54. <artifactId>hadoop-clientartifactId>
    55. <version>2.7.2version>
    56. dependency>
    57. <dependency>
    58. <groupId>com.alibabagroupId>
    59. <artifactId>fastjsonartifactId>
    60. <version>1.2.47version>
    61. dependency>
    62. <dependency>
    63. <groupId>org.apache.sparkgroupId>
    64. <artifactId>spark-hive_2.11artifactId>
    65. <version>2.4.5version>
    66. dependency>
    67. <dependency>
    68. <groupId>org.spark-project.hivegroupId>
    69. <artifactId>hive-jdbcartifactId>
    70. <version>1.2.1.spark2version>
    71. dependency>
    72. dependencies>
    • case class
    1. packagecom.atguigu.bean
    2. case class DwsMember(
    3. uid: Int,
    4. ad_id: Int,
    5. var fullname:String,
    6. iconurl: String,
    7. dt: String,
    8. dn: String
    9. )
    • 配置文件

    将集群配置文件复制到,项目resource源码包下,使本地环境可以访问hadoop集群。

    • Hudi写入Hdfs
    1. packagecom.atguigu.hudi.test
    2. import com.atguigu.bean.DwsMember
    3. import com.atguigu.hudi.util.ParseJsonData
    4. import org.apache.spark.SparkConf
    5. import org.apache.spark.sql.{SaveMode, SparkSession}
    6. object HoodieDataSourceExample {
    7. def main(args: Array[String]): Unit = {
    8. System.setProperty("HADOOP_USER_NAME", "root")
    9. val sparkConf = newSparkConf().setAppName("dwd_member_import")
    10. .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    11. .setMaster("local[*]")
    12. val sparkSession =SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    13. val ssc = sparkSession.sparkContext
    14. ssc.hadoopConfiguration.set("fs.defaultFS","hdfs://mycluster")
    15. ssc.hadoopConfiguration.set("dfs.nameservices","mycluster")
    16. // insertData(sparkSession)
    17. queryData(sparkSession)
    18. }
    19. /**
    20. * 读取hdfs日志文件通过hudi写入hdfs
    21. *
    22. * @param sparkSession
    23. */
    24. def insertData(sparkSession:SparkSession) = {
    25. import org.apache.spark.sql.functions._
    26. import sparkSession.implicits._
    27. val commitTime =System.currentTimeMillis().toString //生成提交时间
    28. val df =sparkSession.read.text("/user/atguigu/ods/member.log")
    29. .mapPartitions(partitions => {
    30. partitions.map(item => {
    31. val jsonObject =ParseJsonData.getJsonData(item.getString(0))
    32. DwsMember(jsonObject.getIntValue("uid"),jsonObject.getIntValue("ad_id")
    33. ,jsonObject.getString("fullname"),jsonObject.getString("iconurl")
    34. , jsonObject.getString("dt"),jsonObject.getString("dn"))
    35. })
    36. })
    37. val result =df.withColumn("ts", lit(commitTime)) //添加ts 时间戳列
    38. .withColumn("uuid",col("uid")) //添加uuid列如果数据中uuid相同hudi会进行去重
    39. .withColumn("hudipartition", concat_ws("/",col("dt"), col("dn"))) //增加hudi分区列
    40. result.write.format("org.apache.hudi")
    41. // .options(org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs)
    42. .option("hoodie.insert.shuffle.parallelism", 12)
    43. .option("hoodie.upsert.shuffle.parallelism",12)
    44. .option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交时间列
    45. .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一标示列
    46. .option("hoodie.table.name", "testTable")
    47. // .option(DataSourceWriteOptions.DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,"dt") // 发现api方式不起作用分区列
    48. .option("hoodie.datasource.write.partitionpath.field","hudipartition") //分区列
    49. .mode(SaveMode.Overwrite)
    50. .save("/user/atguigu/hudi")
    51. }

    测试发现,Hudi不能指定多分区列,所以代码上分区列采用两列拼接成一列的方式,提交操作时必须指定ts和uuid。写入成功后查看hadoop路径上的文件。

    询Hdfs上Hudi数据

    1. /**
    2. * 查询hdfs上的hudi数据
    3. *
    4. * @param sparkSession
    5. */
    6. def queryData(sparkSession: SparkSession) = {
    7. val df =sparkSession.read.format("org.apache.hudi")
    8. .load("/user/atguigu/hudi/*/*")
    9. df.show()
    10. }
    • 修改Hdfs上Hudi数据
    1. def updateData(sparkSession: SparkSession) = {
    2. import org.apache.spark.sql.functions._
    3. import sparkSession.implicits._
    4. val commitTime =System.currentTimeMillis().toString //生成提交时间
    5. val df =sparkSession.read.text("/user/atguigu/ods/member.log")
    6. .mapPartitions(partitions => {
    7. partitions.map(item => {
    8. val jsonObject =ParseJsonData.getJsonData(item.getString(0))
    9. DwsMember(jsonObject.getIntValue("uid"),jsonObject.getIntValue("ad_id")
    10. , jsonObject.getString("fullname"),jsonObject.getString("iconurl")
    11. ,jsonObject.getString("dt"), jsonObject.getString("dn"))
    12. })
    13. })
    14. val result =df.withColumn("ts", lit(commitTime)) //添加ts 时间戳列
    15. .withColumn("uuid",col("uid")) //添加uuid列如果数据中uuid相同hudi会进行去重
    16. .withColumn("hudipartition", concat_ws("/",col("dt"), col("dn"))) //增加hudi分区列
    17. result.write.format("org.apache.hudi")
    18. // .options(org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs)
    19. .option("hoodie.insert.shuffle.parallelism",12)
    20. .option("hoodie.upsert.shuffle.parallelism", 12)
    21. .option("PRECOMBINE_FIELD_OPT_KEY", "ts") //指定提交时间列
    22. .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一标示列
    23. .option("hoodie.table.name", "testTable")
    24. // .option(DataSourceWriteOptions.DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,"dt") // 发现api方式不起作用分区列
    25. .option("hoodie.datasource.write.partitionpath.field","hudipartition") //分区列
    26. .mode(SaveMode.Append)
    27. .save("/user/atguigu/hudi")
    28. }

    虽然代码操作和新增一样只是修改了插入模式为append,但是hudi会根据uid判断进行更新数据,操作完毕后,生成一份最新的修改后的数据。同时hdfs路径上写入一份数据。

    提交时间发生了变化

    数据条数为94175

    • 增量查询
    1. def incrementalQuery(sparkSession: SparkSession) = {
    2. val beginTime = 20200703130000l
    3. val df =sparkSession.read.format("org.apache.hudi")
    4. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) //指定模式为增量查询
    5. .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime) //设置开始查询的时间戳 不需要设置结束时间戳
    6. .load("/user/atguigu/hudi")
    7. df.show()
    8. println(df.count())
    9. }

    根据haoodie_commit_time,时间进行查询,查询增量修改数据,注意参数begintime是和hadoop_commit_time对比而不是跟ts对比。如果beginitime填了比haoodie_commit_time大则会过滤所有数据。

    指定特定时间查询

    1. def updateData(sparkSession: SparkSession) = {
    2. import org.apache.spark.sql.functions._
    3. import sparkSession.implicits._
    4. val commitTime =System.currentTimeMillis().toString //生成提交时间
    5. val df =sparkSession.read.text("/user/atguigu/ods/member.log")
    6. .mapPartitions(partitions => {
    7. partitions.map(item => {
    8. val jsonObject =ParseJsonData.getJsonData(item.getString(0))
    9. DwsMember(jsonObject.getIntValue("uid"),jsonObject.getIntValue("ad_id")
    10. ,jsonObject.getString("fullname"),jsonObject.getString("iconurl")
    11. , jsonObject.getString("dt"),jsonObject.getString("dn"))
    12. })
    13. }).limit(100)
    14. val result =df.withColumn("ts", lit(commitTime)) //添加ts 时间戳列
    15. .withColumn("uuid",col("uid")) //添加uuid列如果数据中uuid相同hudi会进行去重
    16. .withColumn("hudipartition", concat_ws("/",col("dt"), col("dn"))) //增加hudi分区列
    17. result.write.format("org.apache.hudi")
    18. // .options(org.apache.hudi.QuickstartUtils.getQuickstartWriteConfigs)
    19. .option("hoodie.insert.shuffle.parallelism", 12)
    20. .option("hoodie.upsert.shuffle.parallelism", 12)
    21. .option("PRECOMBINE_FIELD_OPT_KEY","ts") //指定提交时间列
    22. .option("RECORDKEY_FIELD_OPT_KEY", "uuid") //指定uuid唯一标示列
    23. .option("hoodie.table.name", "testTable")
    24. // .option(DataSourceWriteOptions.DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,"dt") // 发现api方式不起作用分区列
    25. .option("hoodie.datasource.write.partitionpath.field","hudipartition") //分区列
    26. .mode(SaveMode.Append)
    27. .save("/user/atguigu/hudi")
    28. }
    1. def pointInTimeQuery(sparkSession: SparkSession) = {
    2. val beginTime = 20200703150000l
    3. val endTime = 20200703160000l
    4. val df =sparkSession.read.format("org.apache.hudi")
    5. .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY,DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) //指定模式为增量查询
    6. .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, beginTime) //设置开始查询的时间戳
    7. .option(DataSourceReadOptions.END_INSTANTTIME_OPT_KEY, endTime)
    8. .load("/user/atguigu/hudi")
    9. df.show()
    10. println(df.count())
    11. }

    演示,update时limit只修改100条数据,然后根据时间戳进行查询,只会查询出进行修改的100条符合时间的数据.

  • 相关阅读:
    Vue3组合式API
    springBoot 过滤器去除请求参数前后空格(附源码)
    【手写数据库toadb】SQL字符串如何被数据库认识? 词法语法分析基础原理,常用工具
    什么是SpringMVC?它有哪些优点?又是如何执行的?
    java序列回显学习
    计算机毕业设计Java城市停车位管理系统(源码+系统+mysql数据库+lw文档)
    KIE - Graph Convolution Network
    啊哈算法--堆排序 (python)
    J2EE从入门到入土07.XML建模
    聊聊Hotspot内存屏障如何禁止指令重排
  • 原文地址:https://blog.csdn.net/zjjcchina/article/details/126230975