• Databend 源码阅读: Storage 概况和 Read Partitions


    作者:zhyass | Databend Labs 成员,数据库研发工程师

    ❤️ 友情提示:代码演进较快,请注意文档的时效性哦!

    引言

    Databend 将存储引擎抽象成一个名为 Table 的接口,源码位于 query/catalog/src/table.rs

    Table 接口定义了 readappendalteroptimizetruncate 以及 recluster 等方法,负责数据的读写和变更。解释器(interpreter)通过调用 Table trait 的方法生成物理执行的 pipeline

    通过实现 Table 接口的方法,可以定义 Databend 的存储引擎,不同的实现对应不同的引擎。

    Storage 主要关注 Table 接口的具体实现,涉及表的元信息,索引信息的管理,以及与底层 IO 的交互。

    目录

    包名作用
    common/cache定义与管理缓存,包括磁盘缓存和内存缓存。类型包含表 meta 缓存、查询结果缓存、表数据缓存等。
    common/index定义与使用索引,目前支持 bloom filter index、page index、range index。
    common/locks管理与使用锁,支持表级别的锁。
    common/pruner分区剪裁算法,包括 internal column pruner、limiter pruner、page pruner、topn pruner、range pruner。
    common/table_meta表 meta 的数据结构定义。
    hivehive 表的交互
    icebergiceberg 交互
    information_schema、system系统表定义
    memory、null、random用于开发和测试的引擎
    view视图相关
    stagestage 数据源的读取
    parquet把 parquet 文件作为数据源
    fusefuse 引擎模块
    fuse/src/iotable meta、index、block 的读写 IO 交互
    fuse/src/pruningfuse 分区裁剪
    fuse/src/statisticscolumn statistics 和 cluster statistics 等统计信息
    fuse/src/table_functionstable function 实现
    fuse/src/operationfuse 引擎对 table trait 方法的具体实现。并包含了如 ReadSource、CommitSink 等 processor 算子的定义

    Read Partitions

    以下以 fuse 引擎中 read partitions 的实现流程为例,简要分析 Storage 相关源码。

    Partitions 的定义位于 query/catalog/src/plan/partition.rs

    1. pub struct Partitions {
    2. // partitions 的分发类型。
    3. pub kind: PartitionsShuffleKind,
    4. // 一组实现了 PartInfo 接口的 partition,
    5. pub partitions: Vec,
    6. // partitions 是否为 lazy。
    7. pub is_lazy: bool,
    8. }

    Table 接口中的 read_partitions 通过分析查询中的过滤条件,剪裁掉不需要的分区,返回可能满足条件的 Partitions。

    1. #[async_trait::async_trait]
    2. impl Table for FuseTable {
    3. #[minitrace::trace]
    4. #[async_backtrace::framed]
    5. async fn read_partitions(
    6. &self,
    7. ctx: Arc<dyn TableContext>,
    8. push_downs: Option<PushDownInfo>,
    9. dry_run: bool,
    10. ) -> Result<(PartStatistics, Partitions)> {
    11. self.do_read_partitions(ctx, push_downs, dry_run).await
    12. }
    13. }

    Fuse 引擎会以 segment 为单位构建 lazy 类型的 FuseLazyPartInfo。通过这种方式,prune_snapshot_blocks 可以下推到 pipeline 初始化阶段执行,特别是在分布式集群模式下,可以有效提高剪裁执行效率。

    1. pub struct FuseLazyPartInfo {
    2. // segment 在 snapshot 中的索引位置。
    3. pub segment_index: usize,
    4. pub segment_location: Location,
    5. }

    分区剪裁流程的实现位于 query/storages/fuse/src/pruning/fuse_pruner.rs 文件中,具体流程如下:

    1. 基于 push_downs 条件构造各类剪裁器(pruner),并实例化 FusePruner
    2. 调用 FusePruner 中的 pruning 方法,创建 max_concurrency 个分批剪裁任务。每个批次包括多个 segment 位置,首先根据 internal_column_pruner 筛选出无需的 segments,再读取 SegmentInfo,并根据 segment 级别的 MinMax 索引进行范围剪裁。
    3. 读取过滤后的 SegmentInfo 中的 BlockMetas,并按照 internal_column_prunerlimit_prunerrange_prunerbloom_prunerpage_pruner 等算法的顺序,剔除无需的 blocks。
    4. 执行 TopNPrunner 进行过滤,从而得到最终剪裁后的 block_metas
    1. pub struct FusePruner {
    2. max_concurrency: usize,
    3. pub table_schema: TableSchemaRef,
    4. pub pruning_ctx: Arc<PruningContext>,
    5. pub push_down: Option<PushDownInfo>,
    6. pub inverse_range_index: Option<RangeIndex>,
    7. pub deleted_segments: Vec<DeletedSegmentInfo>,
    8. }
    9. pub struct PruningContext {
    10. pub limit_pruner: Arc<dyn Limiter + Send + Sync>,
    11. pub range_pruner: Arc<dyn RangePruner + Send + Sync>,
    12. pub bloom_pruner: Option<Arc<dyn BloomPruner + Send + Sync>>,
    13. pub page_pruner: Arc<dyn PagePruner + Send + Sync>,
    14. pub internal_column_pruner: Option<Arc<InternalColumnPruner>>,
    15. // Other Fields ...
    16. }
    17. impl FusePruner {
    18. pub async fn pruning(
    19. &mut self,
    20. mut segment_locs: Vec<SegmentLocation>,
    21. delete_pruning: bool,
    22. ) -> Result<Vec<(BlockMetaIndex, Arc<BlockMeta>)>> {
    23. ...
    24. }
    25. }

    剪裁结束后,以 Block 为单位构造 FusePartInfo,生成 partitions,接着调用 set_partitions 方法将 partitions 注入 QueryContext 的分区队列中。在执行任务时,可以通过 get_partition 方法从队列中取出。

    1. pub struct FusePartInfo {
    2. pub location: String,
    3. pub create_on: Option<DateTime<Utc>>,
    4. pub nums_rows: usize,
    5. pub columns_meta: HashMap<ColumnId, ColumnMeta>,
    6. pub compression: Compression,
    7. pub sort_min_max: Option<(Scalar, Scalar)>,
    8. pub block_meta_index: Option<BlockMetaIndex>,
    9. }

    Conclusion

    Databend 的存储引擎设计采用了抽象接口的方式,具有高度的可扩展性,可以很方便地支持多种不同的存储引擎。Storage 模块的主要职责是实现 Table 接口的方法,其中 Fuse 引擎部分尤为关键。

    通过对数据的并行处理,以及数据剪裁等手段,可以有效地提高数据的处理效率。鉴于篇幅限制,本文仅对读取分区的流程进行了简单阐述,更深入的解析将在后续的文章中逐步展开。

    关于 Databend

    Databend 是一款开源、弹性、低成本,基于对象存储也可以做实时分析的新式数仓。期待您的关注,一起探索云原生数仓解决方案,打造新一代开源 Data Cloud。

    👨‍💻‍ Databend Cloud:databend.cn

    📖 Databend 文档:databend.rs/

    💻 Wechat:Databend

    ✨ GitHub:github.com/datafuselab…

  • 相关阅读:
    用VSCODE启动Java项目
    Godot4实现游戏的多语言版本
    被DDOS了怎么办 要如何应对
    TensorRT-Plugin编写
    【愚公系列】2022年11月 .NET CORE工具案例-.NET 7中的WebTransport通信
    VMware虚拟机中ubuntu网络连接不上
    LabVIEW应用开发——控件的使用(三)
    机器学习中的数学原理——最小二乘法
    对中台的探索与思考
    化工集团数字化转型新契机双重预防机制数字化建设
  • 原文地址:https://blog.csdn.net/Databend/article/details/134447750