Iceberg表每一次write都会产生一个新的snapshot,同时也会产生一个新的version版本。所以对于流式写入,会产生大量的snapshot。因此需要将老的snapshot标记为过期。这样新生成的vN.metadata.json文件,就不会包含过期的snapshot
当一个data file没有被snapshot引用时,就会被删除
下面是一个Java/Scala API示例,对一天前的snapshots标记为过期
import org.apache.iceberg.Table
val timestampToExpire:Long = System.currentTimeMillis() - (1000L * 60 * 60 * 24) // 一天
val table:Table = null
table.expireSnapshots()
.expireOlderThan(timestampToExpire)
// .commit()
如果想并行进行处理,可以使用Spark API,示例程序如下:
val table:Table = ...
SparkActions
.get()
.expireSnapshots(table)
.expireOlderThan(timestampToExpire)
.execute()
Iceberg表每一次write都会产生一个新的snapshot,同时也会产生一个新的version版本。所以对于流式写入,会产生大量的snapshot。
为了保证原子性,每一个对表产生改变的操作commit后,都会产生一份新的metadata files,这包括一个vN.metadata.json + 一个manifest list(例如:snap-242516093407225541-1-4bbf0565-406b-428a-aad7-e32993df0fef.avro) + 一个或多个manifest file(例如:4bbf0565-406b-428a-aad7-e32993df0fef-mN.avro)
所以对于流式写入,会产生大量的metadata files。所以需要给表设置属性write.metadata.delete-after-commit.enabled=true
和write.metadata.previous-versions-max
。参数具体含义参考Iceberg数据湖的Table、Catalog、Hadoop配置Configuration
这样当每次对表产生改变的操作commit后,都会自动删除老的metadata files,只保留指定版本数量的metadata files
产生孤立文件的原因
删除孤立文件的方法
Java/Scala API暂时未发现执行方法。使用Spark API进行并行处理,示例程序如下:
val table:Table = ...
SparkActions
.get()
.deleteOrphanFiles(table)
.execute()
注意事项
可能表下面有许多data和metadata files需要被删除,会花费大量的时间,所以不要频繁进行孤立文件的删除
有时一个write操作可能会花费很长的时间,比如一天。在这期间就会产生很多被认为是孤立的文件。所以孤立文件的保留时间要大于write的时间,默认是3天
在一些文件系统,一份文件的路径相关属性发生改变,就会被误认为是孤立文件而被删除。例如在HDFS中,将一个data file的权限进行修改,该data file可能不能和其他metadata file进行关联,就被误认为是孤立文件而被删除
data files的数量越多,则需要储存更多的metadata到manifest files;而且读取数据时,打开一个data file也需要成本。所以需要将小的data files,合并成大的data files。Iceberg利用files metadata table检查data files的文件大小,并进行data files的合并
Java/Scala API暂时未发现执行方法。使用Spark API进行并行处理,示例程序如下:
val table:Table = ...
SparkActions
.get()
.rewriteDataFiles(table)
.filter(Expressions.equal("birthday", "2022-03-01"))
.option("target-file-size-bytes", Long.toString(500 * 1024 * 1024)) // 500 MB
.execute()
metadata files的数量越多,则需要储存更多的metadata到manifest list;而且查询时,打开一个metadata file也需要成本。所以需要将小的metadata files,合并成大的metadata files
下面是通过Java/Scala API,进行Rewrite manifests的一个示例
import org.apache.iceberg.Table
val table:Table = null
table.rewriteManifests()
.rewriteIf(file => file.length() < 10L * 1024 * 1024) // 10MB
// .commit()
如果使用Spark API进行并行处理,示例程序如下:
val table:Table = ...
SparkActions
.get()
.rewriteManifests(table)
.rewriteIf(file -> file.length() < 10 * 1024 * 1024) // 10 MB
.execute()