UPSERT:默认行为,数据先通过 index 打标(INSERT/UPDATE),有一些启发式算法决定消息的组织以优化文件的大小 => CDC 导入
INSERT:跳过 index,写入效率更高 => Log Deduplication
BULK_INSERT:写排序,对大数据量的 Hudi 表初始化友好,对文件大小的限制 best effort(写 HFile)
1)Copy On Write (COW)
(1)先对 records 按照 record key 去重
(2)首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)
(3)对于 update 消息,会直接找到对应 key 所在的最新 FileSlice 的 base 文件,并做 merge 后写新的 base file (新的 FileSlice)
(4)对于 insert 消息,会扫描当前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 写新的 FileSlice;如果没有 SmallFile,直接写新的 FileGroup + FileSlice
2)Merge On Read (MOR)
(1)先对 records 按照 record key 去重(可选)
(2)首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)
(3)如果是 insert 消息,如果 log file 不可建索引(默认),会尝试 merge 分区内最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果没有 base file 就新写一个 FileGroup + FileSlice + base file;如果 log file 可建索引,尝试 append 小的 log file,如果没有就新写一个 FileGroup + FileSlice + base file
(4)如果是 update 消息,写对应的 file group + file slice,直接 append 最新的 log file(如果碰巧是当前最小的小文件,会 merge base file,生成新的 file slice)
(5)log file 大小达到阈值会 roll over 一个新的
1)Copy On Write
(1)先对 records 按照 record key 去重(可选)
(2)不会创建 Index
(3)如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file
2)Merge On Read
(1)先对 records 按照 record key 去重(可选)
(2)不会创建 Index
(3)如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一个新的 FileSlice + base file
在同一分区中创建新的文件组集。现有的文件组被标记为 “删除”。根据新记录的数量创建新的文件组
COW:

MOR:

优点
(1)COW和MOR在执行方面非常相似。不干扰MOR的compaction。
(2)减少parquet文件大小。
(3)不需要更新关键路径中的外部索引。索引实现可以检查文件组是否无效(类似于在HBaseIndex中检查commit是否无效的方式)。
(4)可以扩展清理策略,在一定的时间窗口后删除旧文件组。
缺点
用来生成 HoodieKey(record key + partition path),目前支持以下策略:
1)逻辑删:将 value 字段全部标记为 null
2)物理删:
(1)通过 OPERATION_OPT_KEY 删除所有的输入记录
(2)配置 PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload 删除所有的输入记录
(3)在输入记录添加字段:_hoodie_is_deleted
通过对写流程的梳理可以了解到 Apache Hudi 相对于其他数据湖方案的核心优势:
(1)写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。
(2)对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。
读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件
当前的 Spark data source 可以指定消费的起始和结束 commit 时间,读取 commit 增量的数据集。但是内部的实现不够高效:拉取每个 commit 的全部目标文件再按照系统字段 hoodie_commit_time apply 过滤条件。
0.8.0 版本的 HUDI Flink writer 支持实时的增量订阅,可用于同步 CDC 数据,日常的数据同步 ETL pipeline。Flink 的 streaming 读做到了真正的流式读取,source 定期监控新增的改动文件,将读取任务下派给读 task。
(1)没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file
(2)有 base file:走 copy on write upsert 流程,先读 log file 建 index,再读 base file,最后读 log file 写新的 base file
Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。