• 数据湖(十四):Spark与Iceberg整合查询操作


    文章目录

    Spark与Iceberg整合查询操作

    一、DataFrame API加载Iceberg中的数据

    二、查询表快照

    三、查询表历史 

    四、​​​​​​​​​​​​​​查询表data files

    五、​​​​​​​​​​​​​​查询Manifests

    六、​​​​​​​​​​​​​​查询指定快照数据

    七、根据时间戳查询数据

    八、​​​​​​​​​​​​​​回滚快照

    九、​​​​​​​​​​​​​​合并Iceberg表的数据文件

    十、删除历史快照


    Spark与Iceberg整合查询操作

    一、DataFrame API加载Iceberg中的数据

    Spark操作Iceberg不仅可以使用SQL方式查询Iceberg中的数据,还可以使用DataFrame方式加载Iceberg表中的数据,可以通过spark.table(Iceberg表名)或者spark.read.format("iceberg").load("iceberg data path")来加载对应Iceberg表中的数据,操作如下:

    1. val spark: SparkSession = SparkSession.builder().master("local").appName("test")
    2. //指定hadoop catalog,catalog名称为hadoop_prod
    3. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    4. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
    5. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
    6. .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    7. .getOrCreate()
    8. //1.创建Iceberg表,并插入数据
    9. spark.sql(
    10. """
    11. |create table hadoop_prod.mydb.mytest (id int,name string,age int) using iceberg
    12. """.stripMargin)
    13. spark.sql(
    14. """
    15. |insert into hadoop_prod.mydb.mytest values (1,"zs",18),(2,"ls",19),(3,"ww",20)
    16. """.stripMargin)
    17. //1.SQL 方式读取Iceberg中的数据
    18. spark.sql("select * from hadoop_prod.mydb.mytest").show()
    19. /**
    20. * 2.使用Spark查询Iceberg中的表除了使用sql 方式之外,还可以使用DataFrame方式,建议使用SQL方式
    21. */
    22. //第一种方式使用DataFrame方式查询Iceberg表数据
    23. val frame1: DataFrame = spark.table("hadoop_prod.mydb.mytest")
    24. frame1.show()
    25. //第二种方式使用DataFrame加载 Iceberg表数据
    26. val frame2: DataFrame = spark.read.format("iceberg").load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
    27. frame2.show()

    二、​​​​​​​查询表快照

    每次向Iceberg表中commit数据都会生成对应的一个快照,我们可以通过查询“${catalog名称}.${库名}.${Iceberg表}.snapshots”来查询对应Iceberg表中拥有的所有快照,操作如下:

    1. //向表 hadoop_prod.mydb.mytest 中再次插入以下数据
    2. spark.sql(
    3. """
    4. |insert into hadoop_prod.mydb.mytest values (4,"ml",18),(5,"tq",19),(6,"gb",20)
    5. """.stripMargin)
    6. //3.查看Iceberg表快照信息
    7. spark.sql(
    8. """
    9. |select * from hadoop_prod.mydb.mytest.snapshots
    10. """.stripMargin).show(false)

    结果如下:

    三、查询表历史 

    对Iceberg表查询表历史就是查询Iceberg表快照信息内容,与查询表快照类似,通过“${catalog名称}.${库名}.${Iceberg表}.history”命令进行查询,操作如下:

    1. //4.查询表历史,实际上就是表快照的部分内容
    2. spark.sql(
    3. """
    4. |select * from hadoop_prod.mydb.mytest.history
    5. """.stripMargin).show(false)

     结果如下:

    四、​​​​​​​​​​​​​​查询表data files

    我们可以通过”${catalog名称}.${库名}.${Iceberg表}.files”命令来查询Iceberg表对应的data files 信息,操作如下:

    1. //5.查看表对应的data files
    2. spark.sql(
    3. """
    4. |select * from hadoop_prod.mydb.mytest.files
    5. """.stripMargin).show(false)

    结果如下:

    五、​​​​​​​​​​​​​​查询Manifests

    我们可以通过“${catalog名称}.${库名}.${Iceberg表}.manifests”来查询表对应的manifests信息,具体操作如下:

    1. //6.查看表对应的 Manifests
    2. spark.sql(
    3. """
    4. |select * from hadoop_prod.mydb.mytest.manifests
    5. """.stripMargin).show(false)

    结果如下:

    六、​​​​​​​​​​​​​​查询指定快照数据

    查询Iceberg表数据还可以指定snapshot-id来查询指定快照的数据,这种方式可以使用DataFrame Api方式来查询,Spark3.x版本之后也可以通过SQL 方式来查询,操作如下:

    1. //7.查询指定快照数据,快照ID可以通过读取json元数据文件获取
    2. spark.read
    3. .option("snapshot-id",3368002881426159310L)
    4. .format("iceberg")
    5. .load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
    6. .show()

    结果如下:

     Spark3.x 版本之后,SQL指定快照语法为:

    CALL ${Catalog 名称}.system.set_current_snapshot("${库名.表名}",快照ID)

    操作如下:

    1. //SQL 方式指定查询快照ID 数据
    2. spark.sql(
    3. """
    4. |call hadoop_prod.system.set_current_snapshot('mydb.mytest',3368002881426159310)
    5. """.stripMargin)
    6. spark.sql(
    7. """
    8. |select * from hadoop_prod.mydb.mytest
    9. """.stripMargin).show()

     结果如下:

    七、根据时间戳查询数据

    Spark读取Iceberg表可以指定“as-of-timestamp”参数,通过指定一个毫秒时间参数查询Iceberg表中数据,iceberg会根据元数据找出timestamp-ms <= as-of-timestamp 对应的 snapshot-id ,也只能通过DataFrame Api把数据查询出来,Spark3.x版本之后支持SQL指定时间戳查询数据。具体操作如下:

    1. //8.根据时间戳查询数据,时间戳指定成毫秒,iceberg会根据元数据找出timestamp-ms <= as-of-timestamp 对应的 snapshot-id ,把数据查询出来
    2. spark.read.option("as-of-timestamp","1640066148000")
    3. .format("iceberg")
    4. .load("hdfs://mycluster/sparkoperateiceberg/mydb/mytest")
    5. .show()

     结果如下:

     Spark3.x 版本之后,SQL根据时间戳查询最近快照语法为:

    CALL ${Catalog 名称}.system.rollback_to_timestamp("${库名.表名}",TIMESTAMP '日期数据')

    操作如下:

    1. //省略重新创建表mytest,两次插入数据
    2. //SQL 方式查询指定 时间戳 快照数据
    3. spark.sql(
    4. """
    5. |CALL hadoop_prod.system.rollback_to_timestamp('mydb.mytest', TIMESTAMP '2021-12-23 16:56:40.000')
    6. """.stripMargin)
    7. spark.sql(
    8. """
    9. |select * from hadoop_prod.mydb.mytest
    10. """.stripMargin).show()

    结果如下:

    八、​​​​​​​​​​​​​​回滚快照

    在Iceberg中可以回滚快照,可以借助于Java 代码实现,Spark DataFrame Api 不能回滚快照,在Spark3.x版本之后,支持SQL回滚快照。回滚快照之后,Iceberg对应的表中会生成新的Snapshot-id,重新查询,回滚生效,具体操作如下:

    1. //9.回滚到某个快照,rollbackTo(snapshot-id),指定的是固定的某个快照ID,回滚之后,会生成新的Snapshot-id, 重新查询生效。
    2. val conf = new Configuration()
    3. val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
    4. catalog.setConf(conf)
    5. val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
    6. table.manageSnapshots().rollbackTo(3368002881426159310L).commit()

    注意:回滚快照之后,在对应的Iceberg表中会生成新的Snapshot-id,再次查询后,会看到数据是回滚快照之后的数据。

    1. //查询表 hadoop_prod.mydb.mytest 数据,已经是历史数据
    2. spark.sql(
    3. """
    4. |select * from hadoop_prod.mydb.mytest
    5. """.stripMargin).show(100)

    结果如下:

    Spark3.x 版本之后,SQL回滚快照语法为:

    CALL ${Catalog 名称}.system.rollback_to_snapshot("${库名.表名}",快照ID)

    操作如下:

    1. //省略重新创建表mytest,两次插入数据
    2. //SQL方式回滚快照ID,操作如下:
    3. spark.sql(
    4. """
    5. |Call hadoop_prod.system.rollback_to_snapshot("mydb.mytest",5440886662709904549)
    6. """.stripMargin)
    7. //查询表 hadoop_prod.mydb.mytest 数据,已经是历史数据
    8. spark.sql(
    9. """
    10. |select * from hadoop_prod.mydb.mytest
    11. """.stripMargin).show(100)

     结果如下:

    九、​​​​​​​​​​​​​​合并Iceberg表的数据文件

    针对Iceberg表每次commit都会生成一个parquet数据文件,有可能一张Iceberg表对应的数据文件非常多,那么我们通过Java Api 方式对Iceberg表可以进行数据文件合并,数据文件合并之后,会生成新的Snapshot且原有数据并不会被删除,如果要删除对应的数据文件需要通过“Expire Snapshots来实现”,具体操作如下:

    1. //10.合并Iceberg表的数据文件
    2. // 1) 首先向表 mytest 中插入一批数据,将数据写入到表mytest中
    3. import spark.implicits._
    4. val df: DataFrame = spark.read.textFile("D:\\2018IDEA_space\\Iceberg-Spark-Flink\\SparkIcebergOperate\\data\\nameinfo")
    5. .map(line => {
    6. val arr: Array[String] = line.split(",")
    7. (arr(0).toInt, arr(1), arr(2).toInt)
    8. }).toDF("id","name","age")
    9. df.writeTo("hadoop_prod.mydb.mytest").append()

     经过以上插入数据,我们可以看到Iceberg表元数据目录如下:

     数据目录如下:

    1. //2) 合并小文件数据,Iceberg合并小文件时并不会删除被合并的文件,Compact是将小文件合并成大文件并创建新的Snapshot。如果要删除文件需要通过Expire Snapshots来实现,targetSizeInBytes 指定合并后的每个文件大小
    2. val conf = new Configuration()
    3. val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
    4. val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
    5. Actions.forTable(table).rewriteDataFiles().targetSizeInBytes(1024)//1kb,指定生成合并之后文件大小
    6. .execute()

    合并小文件后,Iceberg对应表元数据目录如下:

    数据目录如下:

    十、删除历史快照

    目前我们可以通过Java Api 删除历史快照,可以通过指定时间戳,当前时间戳之前的所有快照都会被删除(如果指定时间比最后一个快照时间还大,会保留最新快照数据),可以通过查看最新元数据json文件来查找要指定的时间。例如,表mytest 最新的json元数据文件信息如下:

    这里删除时间为“1640070000000”之前的所有快照信息,在删除快照时,数据data目录中过期的数据parquet文件也会被删除(例如:快照回滚后不再需要的文件),代码操作如下:

    1. //11.删除历史快照,历史快照是通过ExpireSnapshot来实现的,设置需要删除多久的历史快照
    2. val conf = new Configuration()
    3. val catalog = new HadoopCatalog(conf,"hdfs://mycluster/sparkoperateiceberg")
    4. val table: Table = catalog.loadTable(TableIdentifier.of("mydb","mytest"))
    5. table.expireSnapshots().expireOlderThan(1640070000000L).commit()

     以上代码执行完成之后,可以看到只剩下最后一个快照信息:

     数据目录如下:

    注意:删除对应快照数据时,Iceberg表对应的Parquet格式数据也会被删除,到底哪些parquet文件数据被删除决定于最后的“snap-xx.avro”中对应的manifest list数据对应的parquet数据,如下图所示:

    随着不断删除snapshot,在Iceberg表不再有manifest文件对应的parquet文件也会被删除。

    除了以上这种使用Java Api方式来删除表旧快照外,在Spark3.x版本之后,我们还可以使用SQL方式来删除快照方式,SQL删除快照语法为:

    删除早于某个时间的快照,但保留最近N个快照

    CALL ${Catalog 名称}.system.expire_snapshots("${库名.表名}",TIMESTAMP '年-月-日 时-分-秒.000',N)

    注意:以上使用SQL方式采用上述方式进行操作时,SparkSQL执行会卡住,最后报错广播变量广播问题(没有找到好的解决方式,目测是个bug问题)

    每次Commit生成对应的Snapshot之外,还会有一份元数据文件“Vx-metadata.json”文件产生,我们可以在创建Iceberg表时执行对应的属性决定Iceberg表保留几个元数据文件,属性如下:

    Property

    Description

    write.metadata.delete-after-commit.enabled

    每次表提交后是否删除旧的元数据文件

    write.metadata.previous-version-max

    要保留旧的元数据文件数量

    例如,在Spark中创建表 test ,指定以上两个属性,建表语句如下:

    1. CREATE TABLE ${CataLog名称}.${库名}.${表名} (
    2. id bigint,
    3. name string
    4. ) using iceberg
    5. PARTITIONED BY (
    6. loc string
    7. ) TBLPROPERTIES (
    8. 'write.metadata.delete-after-commit.enabled'= true,
    9. 'write.metadata.previous-version-max' = 3
    10. )

    • 📢博客主页:https://lansonli.blog.csdn.net
    • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
    • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
    • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
  • 相关阅读:
    让数据使用自由而安全,安华金和“三驾马车”驱动数据安全治理
    redis教程
    关键的服务器硬件组件及其基本功能
    开赛在即 | 赛宁网安技术支撑第七届“蓝帽杯”决赛
    应急响应靶机训练-Linux1
    详解利用高斯混合模型拆解多模态分布 + 精美可视化
    前端例程20220802:玻璃背光按钮
    Proxmox VE 近期有ARM 版本发布么?
    react项目配置(类组件、函数式组件)
    BAT大厂面试的100道考题【算法、源码、架构、中间件、设计模式、网络、项目】,过60分的不到10%
  • 原文地址:https://blog.csdn.net/xiaoweite1/article/details/125617283