• 【Apache Hudi】一种基于增量日志文件数的压缩策略


    前言

    Hudi 的表格式分为 COW(copy on write)MOR(merge on read)COW 表每次的 upsert 操作都会根据不同的索引(bloom indexhbase indexbucket index 等等)查找到数据所在的基础数据文件,然后合并基础数据文件生成新的基础数据文件,有很明显的写放大的问题。为了提高 upsert的性能问题,Hudi 提供了一种新的表类型:MOR 表。该表在每次的 upsert 操作时,并不会实时的和基础数据文件进行合并,而是生成新的增量日志文件,而增量文件和基础文件的合并就叫做压缩(compaction)。所以当一个读操作读取 MOR 表时,此时会读取该表的基础数据文件和所有的增量日志文件,然后进行一个合并操作(比如根据主键聚合取更新时间最新的那条数据),最后将合并后的结果返回给终端。读时合并(merge on read)这也是为何叫 MOR 表的原因。
    在这里插入图片描述

    大家都知道,在 olap 引擎中,大量小文件对于查询的性能影响很大。首先是磁盘的多次寻址问题,其次是 mpp 架构的引擎对于表的读操作通常是根据文件来进行 task数的划分,大量小文件会导致一个 job 生成大量的 task 来读取数据,而每次 task 的启动、销毁也会占用大量时间。最终导致任务执行过慢的问题。所以通常在写入数据之前,我们都会进行小文件的合并,比如 SPARKrepartitioncoalesce。当然 Hudi 也有类似的操作,那就是compaction.

    Hudi之Compaction策略

    MOR 表每次的 upsert 等会新增一个 delta log 文件,而 compaction 就是将这些 delte log 文件和基础文件进行合并的操作。
    在这里插入图片描述

    Hudi 默认提供了多种压缩策略,比如

    • DayBasedCompactionStrategy
      • 按最近分区进行选择性压缩的策略,适用于只有最近的分区会进行才会更新的场景,可以通过 hoodie.compaction.daybased.target.partitions 参数配置
    • BoundedPartitionAwareCompactionStrategy
      • DayBasedCompactionStrategy 类似,只不过该策略会使用当前日期减去hoodie.compaction.daybased.target.partitions 参数的值获取一个 earliestPartitionPathToCompact,然后比较所有分区的日期是否大于 earliestPartitionPathToCompact,大于的话则加入 compaction 计划
    • BoundedIOCompactionStrategy
      • 该策略通过限制总的 read+writeIO 大小来控制每次的压缩计划,可以通过hoodie.compaction.target.io 参数控制
    • LogFileSizeBasedCompactionStrategy
      • 该策略主要是根据 delta log 的大小进行压缩,并且为 Hudi 当前版本(0.12.0)的默认压缩策略。该策略根据用户配置的 hoodie.compaction.logfile.size.threshold 来过滤出符合条件的 fileslice ,然后从大到小排序生成压缩计划。需要注意的是,该策略继承于 BoundedIOCompactionStrategy ,所以该策略也会控制每次的压缩 IO
    • UnBoundedCompactionStrategy
      • 该策略与 BoundedIOCompactionStrategy 相对,不会进行任何IO的控制,容易OOM,不建议使用。
    • UnBoundedPartitionAwareCompactionStrategy
      • 该策略和 BoundedPartitionAwareCompactionStrategy 相对,生成的执行计划与之互补

    虽然 Hudi 帮我们配置了默认的压缩策略(LogFileSizeBasedCompactionStrategy),但实际上我们可以根据不同的业务场景选择不同的压缩策略,此时可以获得最好的压缩性能。顺便提示一下,Hudi 的压缩策略可以通过 hoodie.compaction.strategy 配置。

    基于文件数的压缩策略

    在我这边的业务场景中,很早之前我就自定义了自己的压缩策略。具体压缩逻辑如下:

    public class LogFileNumBasedCompactionStrategy extends BoundedIOCompactionStrategy
        implements Comparator<HoodieCompactionOperation> {
    
      @Override
      public List<HoodieCompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig, List<HoodieCompactionOperation> operations, List<HoodieCompactionPlan> pendingCompactionPlans) {
        Long numThreshold = writeConfig.getCompactionLogFileNumThreshold();
        List<HoodieCompactionOperation> filterOperator = operations.stream()
            .filter(e -> e.getDeltaFilePaths().size() >= numThreshold)
            .sorted(this).collect(Collectors.toList());
        return super.orderAndFilter(writeConfig, filterOperator, pendingCompactionPlans);
      }
    
      @Override
      public int compare(HoodieCompactionOperation hco1, HoodieCompactionOperation hco2) {
        return hco2.getDeltaFilePaths().size() - hco1.getDeltaFilePaths().size();
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    该压缩策略首先会获取用户配置的 hoodie.compaction.logfile.num.threshold 最小增量文件数,凡是大于等于该值的 fileslice 都会被选择,然后根据文件数的大小进行从大到小排序。并且该策略继承于 BoundedIOCompactionStrategy,所以该策略同时也限制了压缩的最大 IO
    需要注意是的,考虑到很多新手用户在启动 Hudi Job 一定时间内可能找不到压缩触发而疑惑,所以我配置了hoodie.compaction.logfile.num.threshold 默认值为 0,使用该策略时,可以酌情增加该值的大小(过大会有小文件问题,影响读取的性能),比如配置为 3,一个既不频繁触发compaction,又能接受的小文件大小。
    该策略我也已经贡献到了Hudi 社区,估计在 0.12.1 版本中可以使用,想提前使用的,可以跟踪下这个 PRhttps://github.com/apache/hudi/pull/6670,在自己项目里新建一个压缩类就可以了。

  • 相关阅读:
    Vue History模式的Nginx配置
    【HTML】元婴篇
    Vue2_人力资源管理系统项目笔记
    C#-反射
    LeetCode每日一题:1462. 课程表 IV(2023.9.12 C++)
    【译】A unit of profiling makes the allocations go away
    阿里EasyExcel动态头模板下载,以及下拉框设置
    Spring(01)
    matplotlib与django如何集成? matplotlib生成的图可以嵌入到网页吗?
    机器视觉选型-什么时候用远心镜头
  • 原文地址:https://blog.csdn.net/su20145104009/article/details/126943510