• CMU 15-445 Project #3 - Query Execution(Task #1、Task #2)


    在这里插入图片描述

    一、题目链接


    二、准备工作

    Project #3 中的执行器需要通过 sqllogictest 进行测试,执行以下三条命令即可完成编译及运行:

    atreus@AtreusMBP 9:18 bustub % cd build
    atreus@AtreusMBP 9:18 build % make -j$(nproc) sqllogictest
    atreus@AtreusMBP 9:19 build % ./bin/bustub-sqllogictest ../test/sql/p3.06-simple-agg.slt --verbose
    
    • 1
    • 2
    • 3

    此外也可以通过 CLion 进行运行和调试,具体配置如下,其中程序实参中的 slt 文件路径工作目录需要根据自己的项目结构进行修改,建议自己新建一个 slt 文件专门用于调试,这样每次只要修改这个自定义 slt 文件的内容即可,而不需要修改编译参数。

    在这里插入图片描述

    其他准备工作见 CMU 15-445 Project #0 - C++ Primer 中的准备工作。


    三、SQL 语句执行流程

    在这里插入图片描述

    根据 bustub 的架构图,在收到一条 SQL 语句后,查询处理层首先会通过解析器 Parser 将 SQL 语句解析为一颗抽象句法树 AST(Abstracct Syntax Tree)。接下来绑定器 Binder 会遍历这棵句法树,将表名、列名等映射到数据库中的实际对象上,并由计划器 Planner 生成初步的查询计划。查询计划会以树的形式表示,数据从叶子节点流向父节点。最后,优化器 Optimizer 会优化生成最终的查询计划,然后交由查询执行层的执行器执行,而这里面的部分执行器需要我们来实现。

    以下面的 explain select * from test_1; 为例,绑定器将表名 test_1 绑定到 oid 为 20 的 test_1 实体表中,将 * 绑定到该表的所有列上。接下来计划器会生成对应的查询计划,这个查询计划包括 ProjectionSeqScan 两个计划节点,数据会先由 SeqScan 处理,然后交给 Projection 处理。不过最后优化器会优化掉这里的投影操作,因为要查找的是所有字段,并不需要进行投影运算。最终,查询执行层会执行 SeqScan 对应的执行器完成实际的查询。

    bustub> explain select * from test_1;
    === BINDER ===
    BoundSelect {
      table=BoundBaseTableRef { table=test_1, oid=20 },
      columns=[test_1.colA, test_1.colB, test_1.colC, test_1.colD],
      groupBy=[],
      having=,
      where=,
      limit=,
      offset=,
      order_by=[],
      is_distinct=false,
      ctes=,
    }
    === PLANNER ===
    Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
      SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    === OPTIMIZER ===
    SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)bustub> explain select * from test_1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    四、BusTub 表结构

    在这里插入图片描述
    以上图片来自于 https://blog.eleven.wiki/posts/cmu15-445-project3-query-execution/,这位博主对 BusTub 中的表结构进行了直观的说明。

    首先,我们从所有执行器的基类抽象执行器 AbstractExecutor 入手,抽象执行器实现了一个 tuple-at-a-time 形式的火山迭代器模型,它主要包括 Init()Next() 两个成员函数,tuple-at-a-time 表明每次调用 Next() 函数会从当前执行器获取一个 tuple,火山模型则表明数据从子执行器向父执行器传递,根节点的输出即为最终的执行结果。

    AbstractExecutor 只有执行器上下文 ExecutorContext 这一个成员变量,它包括当前事务 Transaction、缓冲池管理器 BufferPoolManager、事务管理器 TransactionManager、锁管理器 LockManager 以及目录 Catalog。不过在抽象执行器的实现类中,除了执行器上下文往往还会有一个计划节点 AbstractPlanNode 的实现类,用于表明将要执行的计划,这个后续也会被用到。

    与表管理直接相关的就是执行器上下文中的目录 Catalog(见上图),在 DBMS 中,目录通常是非持久的,主要为数据库执行引擎中的执行器而设计。它可以解决表创建、表查找、索引创建和索引查找等操作。目录中主要包含了一个表标识符表信息 TableInfo 的映射 std::unordered_map> 和一个表名到表标识符的映射 std::unordered_map

    表信息 TableInfo 主要保存了有关表的一些元数据,其中最主要的就是 TableHeapTableHeap 表示磁盘上的物理表,它是一个双向链表的页集合,通过 TableHeap 即可得到首个表页面 TablePage 中首个元组 Tuple(每个元组对应表中的一行数据)的迭代器,从而实现表遍历。

    一个物理页面的大小为 4096 字节,其内部结构大致如下:

    +-------------------+-------------------+ -----------------
    |     PageID(4)     |      LSN(4)       |
    +-------------------+-------------------+
    |   PrevPageId(4)   |   NextPageId(4)   |
    +-------------------+-------------------+
    |FreeSpacePointer(4)|   TupleCount(4)   |
    +-------------------+-------------------+
    | Tuple_1 offset(4) |  Tuple_1 size(4)  |      HEADER
    +-------------------+-------------------+
    | Tuple_2 offset(4) |  Tuple_2 size(4)  |
    +-------------------+-------------------+
    | Tuple_3 offset(4) |  Tuple_3 size(4)  |
    +-------------------+-------------------+ -----------------
    |                  ...                  |
    +---------------------------------------+    FREE SPACE
    |                  ...                  |
    +---------------------------------------+ -----------------
    |                Tuple_3                |
    +-------------------------+-------------+  INSERTED TUPLES
    |         Tuple_2         |   Tuple_1   |
    +-------------------------+-------------+ -----------------
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    五、Task #1 - Access Method Executors

    5.1 顺序扫描执行器

    bustub> explain select * from test_1;
    === BINDER ===
    BoundSelect {
      table=BoundBaseTableRef { table=test_1, oid=20 },
      columns=[test_1.colA, test_1.colB, test_1.colC, test_1.colD],
      groupBy=[],
      having=,
      where=,
      limit=,
      offset=,
      order_by=[],
      is_distinct=false,
      ctes=,
    }
    === PLANNER ===
    Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
      SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    === OPTIMIZER ===
    SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)bustub> explain select * from test_1;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    /**
     * The SeqScanExecutor executor executes a sequential table scan.
     */
    class SeqScanExecutor : public AbstractExecutor {
    public:
        /**
         * Construct a new SeqScanExecutor instance.
         * @param exec_ctx The executor context
         * @param plan The sequential scan plan to be executed
         */
        SeqScanExecutor(ExecutorContext *exec_ctx, const SeqScanPlanNode *plan)
                : AbstractExecutor(exec_ctx), plan_(plan), table_iter_(nullptr, RID(), nullptr) {}
    
        /** Initialize the sequential scan */
        void Init() override {
            TableInfo *table_info = exec_ctx_->GetCatalog()->GetTable(plan_->GetTableOid());
    
            table_heap_ = dynamic_cast<TableHeap *>(table_info->table_.get());
            table_iter_ = table_heap_->Begin(exec_ctx_->GetTransaction());
        }
    
        /**
         * Yield the next tuple from the sequential scan.
         * @param[out] tuple The next tuple produced by the scan
         * @param[out] rid The next tuple RID produced by the scan
         * @return `true` if a tuple was produced, `false` if there are no more tuples
         */
        auto Next(Tuple *tuple, RID *rid) -> bool override {
            // 遍历结束返回 false
            if (table_iter_ == table_heap_->End()) {
                return false;
            }
    
            // 填充元组信息及元组的 rid
            *tuple = *table_iter_;
            *rid = tuple->GetRid();
            table_iter_++;
            return true;
        }
    
        /** @return The output schema for the sequential scan */
        auto GetOutputSchema() const -> const Schema & override {
            return plan_->OutputSchema();
        }
    
    private:
        /** The sequential scan plan node to be executed */
        const SeqScanPlanNode *plan_;
        /** 待扫描表的表堆 */
        TableHeap *table_heap_;
        /** 待扫描表的迭代器 */
        TableIterator table_iter_;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    5.2 插入执行器

    bustub> explain insert into test_1 values (1000, 1, 1, 1);
    === BINDER ===
    BoundInsert {
      table=BoundBaseTableRef { table=test_1, oid=20 },
      select=  BoundSelect {
        table=BoundExpressionListRef { identifier=__values#0, values=[[1000, 1, 1, 1]] },
        columns=[__values#0.0, __values#0.1, __values#0.2, __values#0.3],
        groupBy=[],
        having=,
        where=,
        limit=,
        offset=,
        order_by=[],
        is_distinct=false,
        ctes=,
      }
    }
    === PLANNER ===
    Insert { table_oid=20 } | (__bustub_internal.insert_rows:INTEGER)
      Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)
        Values { rows=1 } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)
    === OPTIMIZER ===
    Insert { table_oid=20 } | (__bustub_internal.insert_rows:INTEGER)
      Values { rows=1 } | (__values#0.0:INTEGER, __values#0.1:INTEGER, __values#0.2:INTEGER, __values#0.3:INTEGER)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    /**
     * InsertExecutor executes an insert on a table.
     * Inserted values are always pulled from a child executor.
     */
    class InsertExecutor : public AbstractExecutor {
    public:
        /**
         * Construct a new InsertExecutor instance.
         * @param exec_ctx The executor context
         * @param plan The insert plan to be executed
         * @param child_executor The child executor from which inserted tuples are pulled
         */
        InsertExecutor(ExecutorContext *exec_ctx, const InsertPlanNode *plan,
                       std::unique_ptr<AbstractExecutor> &&child_executor)
                : AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {}
    
        /** Initialize the insert */
        void Init() override {
            child_executor_->Init();
        }
    
        /**
         * Yield the number of rows inserted into the table.
         * @param[out] tuple The integer tuple indicating the number of rows inserted into the table
         * @param[out] rid The next tuple RID produced by the insert (ignore, not used)
         * @return `true` if a tuple was produced, `false` if there are no more tuples
         *
         * NOTE: InsertExecutor::Next() does not use the `rid` out-parameter.
         * NOTE: InsertExecutor::Next() returns true with number of inserted rows produced only once.
         */
        auto Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool override {
            // 插入完毕返回 false
            if (has_inserted_) {
                return false;
            }
            has_inserted_ = true;
    
            // 获取待插入的表信息及其索引列表
            TableInfo *table_info = exec_ctx_->GetCatalog()->GetTable(plan_->TableOid());
            std::vector<IndexInfo *> index_info = exec_ctx_->GetCatalog()->GetTableIndexes(table_info->name_);
    
            // 从子执行器 Values 中逐个获取元组并插入到表中,同时更新所有的索引
            int insert_count = 0;
            while (child_executor_->Next(tuple, rid)) {
                table_info->table_->InsertTuple(*tuple, rid, exec_ctx_->GetTransaction());
                for (const auto &index: index_info) {
                    // 根据索引的模式从数据元组中构造索引元组,并插入到索引中
                    Tuple key_tuple = tuple->KeyFromTuple(child_executor_->GetOutputSchema(), index->key_schema_,
                                                          index->index_->GetMetadata()->GetKeyAttrs());
                    index->index_->InsertEntry(key_tuple, *rid, exec_ctx_->GetTransaction());
                }
                insert_count++;
            }
    
            // 这里的 tuple 不再对应实际的数据行,而是用来存储插入操作的影响行数
            std::vector<Value> result{Value(INTEGER, insert_count)};
            *tuple = Tuple(result, &plan_->OutputSchema());
    
            return true;
        }
    
        /** @return The output schema for the insert */
        auto GetOutputSchema() const -> const Schema & override {
            return plan_->OutputSchema();
        };
    
    private:
        /** The insert plan node to be executed */
        const InsertPlanNode *plan_;
        /** 子执行器,对于插入操作来说通常是 Values 执行器 */
        std::unique_ptr<AbstractExecutor> child_executor_;
        /** 标识是否已经完成插入操作 */
        bool has_inserted_ = false;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    5.3 删除执行器

    bustub> explain delete from test_1 where colA = 1000;
    === BINDER ===
    Delete { table=BoundBaseTableRef { table=test_1, oid=20 }, expr=(test_1.colA=1000) }
    === PLANNER ===
    Delete { table_oid=20 } | (__bustub_internal.delete_rows:INTEGER)
      Filter { predicate=(#0.0=1000) } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
        SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    === OPTIMIZER ===
    Delete { table_oid=20 } | (__bustub_internal.delete_rows:INTEGER)
      SeqScan { table=test_1, filter=(#0.0=1000) } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    /**
     * DeletedExecutor executes a delete on a table.
     * Deleted values are always pulled from a child.
     */
    class DeleteExecutor : public AbstractExecutor {
    public:
        /**
         * Construct a new DeleteExecutor instance.
         * @param exec_ctx The executor context
         * @param plan The delete plan to be executed
         * @param child_executor The child executor that feeds the delete
         */
        DeleteExecutor(ExecutorContext *exec_ctx, const DeletePlanNode *plan,
                       std::unique_ptr<AbstractExecutor> &&child_executor)
                : AbstractExecutor(exec_ctx), plan_(plan), child_executor_(std::move(child_executor)) {}
    
        /** Initialize the delete */
        void Init() override {
            child_executor_->Init();
        }
    
        /**
         * Yield the number of rows deleted from the table.
         * @param[out] tuple The integer tuple indicating the number of rows deleted from the table
         * @param[out] rid The next tuple RID produced by the delete (ignore, not used)
         * @return `true` if a tuple was produced, `false` if there are no more tuples
         *
         * NOTE: DeleteExecutor::Next() does not use the `rid` out-parameter.
         * NOTE: DeleteExecutor::Next() returns true with the number of deleted rows produced only once.
         */
        auto Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool override {
            // 删除完毕返回 false
            if (has_deleted_) {
                return false;
            }
            has_deleted_ = true;
    
            // 获取待删除的表信息及其索引列表
            TableInfo *table_info = exec_ctx_->GetCatalog()->GetTable(plan_->TableOid());
            std::vector<IndexInfo *> index_info = exec_ctx_->GetCatalog()->GetTableIndexes(table_info->name_);
    
            // 从子执行器 Values 中逐个获取元组并插入到表中,同时更新所有的索引
            int delete_count = 0;
            while (child_executor_->Next(tuple, rid)) {
                table_info->table_->MarkDelete(*rid, exec_ctx_->GetTransaction());
                for (const auto &index: index_info) {
                    // 根据索引的模式从数据元组中构造索引元组,并从索引中删除
                    Tuple key_tuple = tuple->KeyFromTuple(child_executor_->GetOutputSchema(), index->key_schema_,
                                                          index->index_->GetMetadata()->GetKeyAttrs());
                    index->index_->DeleteEntry(key_tuple, *rid, exec_ctx_->GetTransaction());
                }
                delete_count++;
            }
    
            // 这里的 tuple 不再对应实际的数据行,而是用来存储插入操作的影响行数
            std::vector<Value> result{Value(INTEGER, delete_count)};
            *tuple = Tuple(result, &plan_->OutputSchema());
    
            return true;
        }
    
        /** @return The output schema for the delete */
        auto GetOutputSchema() const -> const Schema & override {
            return plan_->OutputSchema();
        };
    
    private:
        /** The delete plan node to be executed */
        const DeletePlanNode *plan_;
        /** The child executor from which RIDs for deleted tuples are pulled */
        std::unique_ptr<AbstractExecutor> child_executor_;
        /** 标识是否已经完成删除操作 */
        bool has_deleted_ = false;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    5.4 索引扫描执行器

    bustub> create index index_colA on test_1(colA);
    Index created with id = 0
    bustub> explain select * from test_1 order by colA;
    === BINDER ===
    BoundSelect {
      table=BoundBaseTableRef { table=test_1, oid=20 },
      columns=[test_1.colA, test_1.colB, test_1.colC, test_1.colD],
      groupBy=[],
      having=,
      where=,
      limit=,
      offset=,
      order_by=[BoundOrderBy { type=Default, expr=test_1.colA }],
      is_distinct=false,
      ctes=,
    }
    === PLANNER ===
    Sort { order_bys=[(Default, #0.0)] } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
      Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
        SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    === OPTIMIZER ===
    IndexScan { index_oid=0 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    /**
     * IndexScanExecutor executes an index scan over a table.
     */
    class IndexScanExecutor : public AbstractExecutor {
    public:
        /**
         * Creates a new index scan executor.
         * @param exec_ctx the executor context
         * @param plan the index scan plan to be executed
         */
        IndexScanExecutor(ExecutorContext *exec_ctx, const IndexScanPlanNode *plan)
                : AbstractExecutor(exec_ctx), plan_(plan) {}
    
        void Init() override {
            IndexInfo *index_info = exec_ctx_->GetCatalog()->GetIndex(plan_->GetIndexOid());
    
            tree_ = dynamic_cast<BPlusTreeIndexForOneIntegerColumn *>(index_info->index_.get());
            table_iter_ = tree_->GetBeginIterator();
            table_heap_ = exec_ctx_->GetCatalog()->GetTable(index_info->table_name_)->table_.get();
        }
    
        auto Next(Tuple *tuple, RID *rid) -> bool override {
            // 遍历结束返回 false
            if (table_iter_ == tree_->GetEndIterator()) {
                return false;
            }
    
            // 获取元组的 rid 并填充元组内容
            *rid = (*table_iter_).second;
            table_heap_->GetTuple(*rid, tuple, exec_ctx_->GetTransaction());
            ++table_iter_;
            return true;
        }
    
        auto GetOutputSchema() const -> const Schema & override {
            return plan_->OutputSchema();
        }
    
    private:
        /** The index scan plan node to be executed. */
        const IndexScanPlanNode *plan_;
        /** 待扫描表的 B+ 树索引 */
        BPlusTreeIndexForOneIntegerColumn *tree_;
        /** 待扫描表的 B+ 树索引迭代器 */
        BPlusTreeIndexIteratorForOneIntegerColumn table_iter_;
        /** 待扫描表的表堆 */
        TableHeap *table_heap_;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    六、Task #2 - Aggregation & Join Executors

    6.1 聚合执行器

    bustub> explain select sum(colA) from test_1;
    === BINDER ===
    BoundSelect {
      table=BoundBaseTableRef { table=test_1, oid=20 },
      columns=[sum([test_1.colA])],
      groupBy=[],
      having=,
      where=,
      limit=,
      offset=,
      order_by=[],
      is_distinct=false,
      ctes=,
    }
    === PLANNER ===
    Projection { exprs=[#0.0] } | (:INTEGER)
      Agg { types=[sum], aggregates=[#0.0], group_by=[] } | (agg#0:INTEGER)
        SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    === OPTIMIZER ===
    Agg { types=[sum], aggregates=[#0.0], group_by=[] } | (:INTEGER)
      SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    idnamepositioncamp
    0EzrealADCPiltover
    1ViJUNPiltover
    2JayceTOPPiltover
    3ZedMIDIonia
    4XayahADCIonia
    5ThreshSUPShadow Isles

    我们以分组聚合查询语句 select count(name) from table group by camp; 为例简要说明一下聚合执行器的执行流程:

    • Init() 函数首先从子执行器中逐行获取数据,并根据每行数据构建聚合键和聚合值。其中聚合键用于标识该行数据属于哪一个聚合组,这里是按照阵营 camp 分组,因此聚合键会有 PiltoverIoniaShadow Isles 三种取值,这样所有数据被分成三个聚合组。当然,如果没 group by 子句,那么所有数据都会被分到同一个聚合组中,这个聚合组的聚合键为一个 group_bys_.size() = 0AggregateKey。而聚合值就是待聚合的列的值,这里的聚合列是 name,因此这五个 Tuple 中生成的聚合值即为对应的 name 属性的值。
    • 对于每个提取的数据行,Init() 函数还会通过 InsertCombine() 将相应的聚合值聚合到到相应的聚合组中。具体的聚合规则由 CombineAggregateValues() 函数中的实现来决定。
    • 经过 Init() 函数的处理,以上六条数据会被整理为 [{"Piltover": 3}, {"Ionia": 2}, {"Shadow Isles": 1}] 三个聚合组(对应于聚合哈希表中的三个键值对)。
    • 最后,Next() 函数会通过哈希迭代器依次获取每个聚合组的键与值,返回给父执行器。
    /**
     * A simplified hash table that has all the necessary functionality for aggregations.
     */
    class SimpleAggregationHashTable {
    public:
        /**
         * Construct a new SimpleAggregationHashTable instance.
         * @param agg_exprs the aggregation expressions
         * @param agg_types the types of aggregations
         */
        SimpleAggregationHashTable(const std::vector<AbstractExpressionRef> &agg_exprs,
                                   const std::vector<AggregationType> &agg_types)
                : agg_exprs_{agg_exprs}, agg_types_{agg_types} {}
    
        /** @return The initial aggregate value for this aggregation executor */
        auto GenerateInitialAggregateValue() -> AggregateValue {
            std::vector<Value> values{};
            for (const auto &agg_type: agg_types_) {
                switch (agg_type) {
                    case AggregationType::CountStarAggregate:
                        // Count start starts at zero.
                        values.emplace_back(ValueFactory::GetIntegerValue(0));
                        break;
                    case AggregationType::CountAggregate:
                    case AggregationType::SumAggregate:
                    case AggregationType::MinAggregate:
                    case AggregationType::MaxAggregate:
                        // Others starts at null.
                        values.emplace_back(ValueFactory::GetNullValueByType(TypeId::INTEGER));
                        break;
                }
            }
            return {values};
        }
    
        /**
         * Combines the input into the aggregation result.
         * @param[out] result The output aggregate value
         * @param input The input value
         */
        void CombineAggregateValues(AggregateValue *result, const AggregateValue &input) {
            for (uint32_t i = 0; i < agg_exprs_.size(); i++) {
                auto &result_value = result->aggregates_[i];
                auto &input_value = input.aggregates_[i];
                switch (agg_types_[i]) {
                    case AggregationType::CountStarAggregate:
                        // count(*) 对每一行都执行 +1 操作
                        result_value = result_value.Add(ValueFactory::GetIntegerValue(1));
                        break;
                    case AggregationType::CountAggregate:
                        // count(column) 只对非空行进行 +1 操作,同时 bustub 不支持 count(distinct column) 聚合操作
                        if (!input_value.IsNull()) {
                            // 如果 result_value 为空需要从 ValueFactory 中获取
                            if (result_value.IsNull()) {
                                result_value = ValueFactory::GetIntegerValue(0);
                            }
                            result_value = result_value.Add(ValueFactory::GetIntegerValue(1));
                        }
                        break;
                    case AggregationType::SumAggregate:
                        // sum(column) 只处理非空行
                        if (!input_value.IsNull()) {
                            // 如果 result_value 为空初始化为 input_value ,否则直接累加
                            if (result_value.IsNull()) {
                                result_value = input_value;
                            } else {
                                result_value = result_value.Add(input_value);
                            }
                        }
                        break;
                    case AggregationType::MinAggregate:
                        // min(column) 只处理非空行
                        if (!input_value.IsNull()) {
                            // 如果 result_value 为空初始化为 input_value ,否则取最小值
                            if (result_value.IsNull()) {
                                result_value = input_value;
                            } else {
                                result_value = result_value.Min(input_value);
                            }
                        }
                        break;
                    case AggregationType::MaxAggregate:
                        // max(column) 只处理非空行
                        if (!input_value.IsNull()) {
                            // 如果 result_value 为空初始化为 input_value ,否则取最大值
                            if (result_value.IsNull()) {
                                result_value = input_value;
                            } else {
                                result_value = result_value.Max(input_value);
                            }
                        }
                        break;
                    default:
                        throw Exception("unknown AggregationType");
                }
            }
        }
    
        /**
         * 检查当前表是否为空,如果为空生成默认的聚合值,对于 count(*) 来说是 0,对于其他聚合函数来说是 integer_null
         * 等指定了具体类型的空值。 之所以要进行这次额外检查,是因为空表的聚合操作不是没有返回值,而是返回了一个默认的聚合值。
         *
         * @param[out] value 对应于不同聚合操作生成的初始聚合值。
         * @return `true` 当前表为空,需要返回一个初始聚合值,`false` 不需要返回初始聚合值。
         */
        auto CheckIsNullTable(AggregateValue *value) -> bool {
            // 从未检查过或者表不为空
            if (is_checked_ || !ht_.empty()) {
                return false;
            }
    
            is_checked_ = true;
            // 遍历聚合表达式列表,为每个聚合函数生成初始的聚合值
            for (size_t i = 0; i < agg_exprs_.size(); i++) {
                *value = GenerateInitialAggregateValue();
            }
            return true;
        }
    
        /**
         * Inserts a value into the hash table and then combines it with the current aggregation.
         * @param agg_key the key to be inserted
         * @param agg_val the value to be inserted
         */
        void InsertCombine(const AggregateKey &agg_key, const AggregateValue &agg_val) {
            if (ht_.count(agg_key) == 0) {
                ht_.insert({agg_key, GenerateInitialAggregateValue()});
            }
            CombineAggregateValues(&ht_[agg_key], agg_val);
        }
    
        /**
         * Clear the hash table
         */
        void Clear() { ht_.clear(); }
    
        /** An iterator over the aggregation hash table */
        class Iterator {
        public:
            /** Creates an iterator for the aggregate map. */
            explicit Iterator(std::unordered_map<AggregateKey, AggregateValue>::const_iterator iter) : iter_{iter} {}
    
            /** @return The key of the iterator */
            auto Key() -> const AggregateKey & { return iter_->first; }
    
            /** @return The value of the iterator */
            auto Val() -> const AggregateValue & { return iter_->second; }
    
            /** @return The iterator before it is incremented */
            auto operator++() -> Iterator & {
                ++iter_;
                return *this;
            }
    
            /** @return `true` if both iterators are identical */
            auto operator==(const Iterator &other) -> bool { return this->iter_ == other.iter_; }
    
            /** @return `true` if both iterators are different */
            auto operator!=(const Iterator &other) -> bool { return this->iter_ != other.iter_; }
    
        private:
            /** Aggregates map */
            std::unordered_map<AggregateKey, AggregateValue>::const_iterator iter_;
        };
    
        /** @return Iterator to the start of the hash table */
        auto Begin() -> Iterator { return Iterator{ht_.cbegin()}; }
    
        /** @return Iterator to the end of the hash table */
        auto End() -> Iterator { return Iterator{ht_.cend()}; }
    
    private:
        /** The hash table is just a map from aggregate keys to aggregate values */
        std::unordered_map<AggregateKey, AggregateValue> ht_{};
        /** The aggregate expressions that we have */
        const std::vector<AbstractExpressionRef> &agg_exprs_;
        /** The types of aggregations that we have */
        const std::vector<AggregationType> &agg_types_;
        /** 标识是否进行过空表检查 */
        bool is_checked_ = false;
    };
    
    /**
     * AggregationExecutor executes an aggregation operation (e.g. COUNT, SUM, MIN, MAX)
     * over the tuples produced by a child executor.
     */
    class AggregationExecutor : public AbstractExecutor {
    public:
        /**
         * Construct a new AggregationExecutor instance.
         * @param exec_ctx The executor context
         * @param plan The insert plan to be executed
         * @param child_executor The child executor from which inserted tuples are pulled (may be `nullptr`)
         */
        AggregationExecutor(ExecutorContext *exec_ctx, const AggregationPlanNode *plan,
                            std::unique_ptr<AbstractExecutor> &&child)
                : AbstractExecutor(exec_ctx),
                  plan_(plan),
                  child_(std::move(child)),
                  aht_(plan->GetAggregates(), plan->GetAggregateTypes()),
                  aht_iterator_(aht_.Begin()) {}
    
        /** Initialize the aggregation */
        void Init() override {
            child_->Init();
    
            Tuple tuple;
            RID rid;
            // 将每个 tuple 分配到指定的聚合分组,并更新该分组的聚合值
            while (child_->Next(&tuple, &rid)) {
                // 获取该 tuple 对应聚合键,这个聚合键表明了它的所属分组,聚合键的本质是用于分组的列的值
                AggregateKey agg_key = this->MakeAggregateKey(&tuple);
                // 获取该 tuple 对应的聚合值,聚合值的本质是待聚合的列的值
                AggregateValue agg_value = this->MakeAggregateValue(&tuple);
                // 将该 tuple 的聚合值合并到对应聚合分组的聚合值中,对于不同的聚合函数有不同的聚合方式
                aht_.InsertCombine(agg_key, agg_value);
            }
            aht_iterator_ = aht_.Begin();
        }
    
        /**
         * Yield the next tuple from the insert.
         * @param[out] tuple The next tuple produced by the aggregation
         * @param[out] rid The next tuple RID produced by the aggregation
         * @return `true` if a tuple was produced, `false` if there are no more tuples
         */
        auto Next(Tuple *tuple, RID *rid) -> bool override {
            if (aht_iterator_ == aht_.End()) {
                // 如果存在 group by 子句,则不需要对每个分组的聚合结果进行进一步的聚合
                if (!plan_->GetGroupBys().empty()) {
                    return false;
                }
    
                AggregateValue result_value;
                // 如果对空表执行聚合操作直接返回初始化后的元组
                if (aht_.CheckIsNullTable(&result_value)) {
                    *tuple = Tuple(result_value.aggregates_, &plan_->OutputSchema());
                    *rid = tuple->GetRid();
                    return true;
                }
    
                return false;
            }
    
            // 获取当前迭代器对应聚合分组的聚合键和和聚合值
            AggregateKey agg_key = aht_iterator_.Key();
            AggregateValue agg_value = aht_iterator_.Val();
            std::vector<Value> values;
            values.reserve(agg_key.group_bys_.size() + agg_value.aggregates_.size());
    
            // 向结果集中追加聚合分组的键
            for (const auto &group_by_key: agg_key.group_bys_) {
                values.emplace_back(group_by_key);
            }
            // 向结果集中追加聚合分组的聚合值
            for (const auto &group_by_value: agg_value.aggregates_) {
                values.emplace_back(group_by_value);
            }
    
            *tuple = Tuple(values, &plan_->OutputSchema());
            *rid = tuple->GetRid();
            ++aht_iterator_;
            return true;
        }
    
        /** @return The output schema for the aggregation */
        auto GetOutputSchema() const -> const Schema & override { return plan_->OutputSchema(); };
    
        /** Do not use or remove this function, otherwise you will get zero points. */
        auto GetChildExecutor() const -> const AbstractExecutor * { return child_.get(); }
    
    private:
        /** @return The tuple as an AggregateKey */
        auto MakeAggregateKey(const Tuple *tuple) -> AggregateKey {
            std::vector<Value> keys;
            for (const auto &expr: plan_->GetGroupBys()) {
                keys.emplace_back(expr->Evaluate(tuple, child_->GetOutputSchema()));
            }
            return {keys};
        }
    
        /** @return The tuple as an AggregateValue */
        auto MakeAggregateValue(const Tuple *tuple) -> AggregateValue {
            std::vector<Value> vals;
            for (const auto &expr: plan_->GetAggregates()) {
                vals.emplace_back(expr->Evaluate(tuple, child_->GetOutputSchema()));
            }
            return {vals};
        }
    
    private:
        /** The aggregation plan node */
        const AggregationPlanNode *plan_;
        /** The child executor that produces tuples over which the aggregation is computed */
        std::unique_ptr<AbstractExecutor> child_;
        /** Simple aggregation hash table */
        SimpleAggregationHashTable aht_;
        /** Simple aggregation hash table iterator */
        SimpleAggregationHashTable::Iterator aht_iterator_;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300

    一个用于演示分组聚合空表聚合执行流程的测试用例:

    statement ok
    create table legends(name varchar(128), camp varchar(128));
    
    statement ok
    insert into legends values ('Ezreal', 'Piltover'), ('Vi', 'Piltover'), ('Jayce', 'Piltover'), ('Zed', 'Ionia'), ('Xayah', 'Ionia');
    
    statement ok
    select count(name) from legends group by camp;
    
    statement ok
    delete from legends;
    
    query
    select count(*) from legends;
    ----
    0
    
    query
    select count(camp) from legends;
    ----
    integer_null
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    6.2 循环连接执行器

    bustub> EXPLAIN SELECT * FROM test_1 LEFT OUTER JOIN test_2 ON test_1.colA = test_2.colA;
    === BINDER ===
    BoundSelect {
      table=BoundJoin { type=Left, left=BoundBaseTableRef { table=test_1, oid=20 }, right=BoundBaseTableRef { table=test_2, oid=21 }, condition=(test_1.colA=test_2.colA) },
      columns=[test_1.colA, test_1.colB, test_1.colC, test_1.colD, test_2.colA, test_2.colB, test_2.colC],
      groupBy=[],
      having=,
      where=,
      limit=,
      offset=,
      order_by=[],
      is_distinct=false,
      ctes=,
    }
    === PLANNER ===
    Projection { exprs=[#0.0, #0.1, #0.2, #0.3, #0.4, #0.5, #0.6] } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER, test_2.colA:INTEGER, test_2.colB:INTEGER, test_2.colC:INTEGER)
      NestedLoopJoin { type=Left, predicate=(#0.0=#1.0) } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER, test_2.colA:INTEGER, test_2.colB:INTEGER, test_2.colC:INTEGER)
        SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
        SeqScan { table=test_2 } | (test_2.colA:INTEGER, test_2.colB:INTEGER, test_2.colC:INTEGER)
    === OPTIMIZER ===
    HashJoin { type=Left, left_key=#0.0, right_key=#0.0 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER, test_2.colA:INTEGER, test_2.colB:INTEGER, test_2.colC:INTEGER)
      SeqScan { table=test_1 } | (test_1.colA:INTEGER, test_1.colB:INTEGER, test_1.colC:INTEGER, test_1.colD:INTEGER)
      SeqScan { table=test_2 } | (test_2.colA:INTEGER, test_2.colB:INTEGER, test_2.colC:INTEGER)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    数据库中的连接运算

    /**
     * NestedLoopJoinExecutor executes a nested-loop JOIN on two tables.
     */
    class NestedLoopJoinExecutor : public AbstractExecutor {
    public:
        /**
         * Construct a new NestedLoopJoinExecutor instance.
         * @param exec_ctx The executor context
         * @param plan The NestedLoop join plan to be executed
         * @param left_executor The child executor that produces tuple for the left side of join
         * @param right_executor The child executor that produces tuple for the right side of join
         */
        NestedLoopJoinExecutor(ExecutorContext *exec_ctx, const NestedLoopJoinPlanNode *plan,
                               std::unique_ptr<AbstractExecutor> &&left_executor,
                               std::unique_ptr<AbstractExecutor> &&right_executor)
                : AbstractExecutor(exec_ctx),
                  plan_(plan),
                  left_executor_(std::move(left_executor)),
                  right_executor_(std::move(right_executor)) {}
    
        /** Initialize the join */
        void Init() override {
            left_executor_->Init();
            right_executor_->Init();
    
            // 从 left_executor_ 和 right_executor_ 中获取两张表中的所有元组
            Tuple tuple;
            RID rid;
            std::vector<Tuple> left_tuples;
            std::vector<Tuple> right_tuples;
            while (left_executor_->Next(&tuple, &rid)) {
                left_tuples.emplace_back(tuple);
            }
            while (right_executor_->Next(&tuple, &rid)) {
                right_tuples.emplace_back(tuple);
            }
    
            // 依次连接 left_tuples 和 right_tuples 中的所有元组
            Schema left_schema = left_executor_->GetOutputSchema();
            Schema right_schema = right_executor_->GetOutputSchema();
            for (auto &left_tuple: left_tuples) {
                // 标识 left_tuple 是否需要进行左外连接
                bool need_left_join = true;
    
                for (auto &right_tuple: right_tuples) {
                    // 如果 left_tuple 中的连接字段与 right_tuple 中的连接字段的值相匹配,则 left_tuple 不需要进行左外连接
                    Value join_result = plan_->predicate_->EvaluateJoin(&left_tuple, left_schema, &right_tuple, right_schema);
                    if (!join_result.IsNull() && join_result.GetAs<bool>()) {
                        // left_tuple 不需要进行左外连接
                        need_left_join = false;
    
                        // 依次将 left_tuple 和 right_tuple 中的值追加到结果集中
                        std::vector<Value> values;
                        for (size_t i = 0; i < left_schema.GetColumnCount(); i++) {
                            values.emplace_back(left_tuple.GetValue(&left_schema, i));
                        }
                        for (size_t i = 0; i < right_schema.GetColumnCount(); i++) {
                            values.emplace_back(right_tuple.GetValue(&right_schema, i));
                        }
                        results_.emplace(values, &plan_->OutputSchema());
                    }
                }
    
                // 对 left_tuple 进行左外连接,right_tuple 中的字段均以对应的空值追加到结果集中
                if (need_left_join && plan_->GetJoinType() == JoinType::LEFT) {
                    std::vector<Value> values;
                    for (size_t i = 0; i < left_schema.GetColumnCount(); i++) {
                        values.emplace_back(left_tuple.GetValue(&left_schema, i));
                    }
                    for (size_t i = 0; i < right_schema.GetColumnCount(); i++) {
                        values.emplace_back(ValueFactory::GetNullValueByType(right_schema.GetColumn(i).GetType()));
                    }
                    results_.emplace(values, &plan_->OutputSchema());
                }
            }
        }
    
        /**
         * Yield the next tuple from the join.
         * @param[out] tuple The next tuple produced by the join
         * @param[out] rid The next tuple RID produced, not used by nested loop join.
         * @return `true` if a tuple was produced, `false` if there are no more tuples.
         */
        auto Next(Tuple *tuple, RID *rid) -> bool override {
            // 从结果集队列中依次获取所有连接后的元组
            if (results_.empty()) {
                return false;
            }
            *tuple = results_.front();
            results_.pop();
            *rid = tuple->GetRid();
            return true;
        }
    
        /** @return The output schema for the insert */
        auto GetOutputSchema() const -> const Schema & override { return plan_->OutputSchema(); };
    
    private:
        /** The NestedLoopJoin plan node to be executed. */
        const NestedLoopJoinPlanNode *plan_;
        /** 左表子执行器,对于循环连接操作来说通常是左表上的 Scan 执行器 */
        std::unique_ptr<AbstractExecutor> left_executor_;
        /** 右表子执行器,对于循环连接操作来说通常是右表上的 Scan 执行器 */
        std::unique_ptr<AbstractExecutor> right_executor_;
        /** 连接结果集 */
        std::queue<Tuple> results_;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    statement ok
    create table provinces
    (
        province_name varchar(255),
        country_name  varchar(255),
        capital_name  varchar(255)
    );
    
    statement ok
    insert into provinces
    values ('Guangdong', 'China', 'Guangzhou'),
           ('Sichuan', 'China', 'Chengdu'),
           ('Jiangsu', 'China', 'Nanjing'),
           ('California', 'USA', 'Sacramento'),
           ('Hawaii', 'USA', 'Honolulu'),
           ('Texas', 'USA', 'Houston');
    
    statement ok
    create table capital
    (
        capital_name varchar(255),
        population   int
    );
    
    statement ok
    insert into capital
    values ('Guangzhou', 15000000),
           ('Nanjing', 8000000),
           ('Sacramento', 500000),
           ('Honolulu', 380000),
           ('Tokyo', 14000000),
           ('London', 9000000);
    
    statement ok
    create index capital_name_index on capital(capital_name);
    
    query rowsort
    select *
    from provinces
    join capital on provinces.capital_name = capital.capital_name;
    ----
    Guangdong China Guangzhou Guangzhou 15000000
    Jiangsu China Nanjing Nanjing 8000000
    California USA Sacramento Sacramento 500000
    Hawaii USA Honolulu Honolulu 380000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    6.3 索引连接执行器

    bustub> CREATE TABLE t1(v1 int, v2 int);
    Table created with id = 22
    bustub> CREATE TABLE t2(v3 int, v4 int);
    Table created with id = 23
    bustub> CREATE INDEX t2v3 on t2(v3);
    Index created with id = 0
    bustub> EXPLAIN SELECT * FROM t2 INNER JOIN t1 ON v1 = v3;
    === BINDER ===
    BoundSelect {
      table=BoundJoin { type=Inner, left=BoundBaseTableRef { table=t2, oid=23 }, right=BoundBaseTableRef { table=t1, oid=22 }, condition=(t1.v1=t2.v3) },
      columns=[t2.v3, t2.v4, t1.v1, t1.v2],
      groupBy=[],
      having=,
      where=,
      limit=,
      offset=,
      order_by=[],
      is_distinct=false,
      ctes=,
    }
    === PLANNER ===
    Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (t2.v3:INTEGER, t2.v4:INTEGER, t1.v1:INTEGER, t1.v2:INTEGER)
      NestedLoopJoin { type=Inner, predicate=(#1.0=#0.0) } | (t2.v3:INTEGER, t2.v4:INTEGER, t1.v1:INTEGER, t1.v2:INTEGER)
        SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)
        SeqScan { table=t1 } | (t1.v1:INTEGER, t1.v2:INTEGER)
    === OPTIMIZER ===
    HashJoin { type=Inner, left_key=#0.0, right_key=#0.0 } | (t2.v3:INTEGER, t2.v4:INTEGER, t1.v1:INTEGER, t1.v2:INTEGER)
      SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)
      SeqScan { table=t1 } | (t1.v1:INTEGER, t1.v2:INTEGER)
    bustub> EXPLAIN SELECT * FROM t1 INNER JOIN t2 ON v1 = v3;
    === BINDER ===
    BoundSelect {
      table=BoundJoin { type=Inner, left=BoundBaseTableRef { table=t1, oid=22 }, right=BoundBaseTableRef { table=t2, oid=23 }, condition=(t1.v1=t2.v3) },
      columns=[t1.v1, t1.v2, t2.v3, t2.v4],
      groupBy=[],
      having=,
      where=,
      limit=,
      offset=,
      order_by=[],
      is_distinct=false,
      ctes=,
    }
    === PLANNER ===
    Projection { exprs=[#0.0, #0.1, #0.2, #0.3] } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
      NestedLoopJoin { type=Inner, predicate=(#0.0=#1.0) } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
        SeqScan { table=t1 } | (t1.v1:INTEGER, t1.v2:INTEGER)
        SeqScan { table=t2 } | (t2.v3:INTEGER, t2.v4:INTEGER)
    === OPTIMIZER ===
    NestedIndexJoin { type=Inner, key_predicate=#0.0, index=t2v3, index_table=t2 } | (t1.v1:INTEGER, t1.v2:INTEGER, t2.v3:INTEGER, t2.v4:INTEGER)
      SeqScan { table=t1 } | (t1.v1:INTEGER, t1.v2:INTEGER)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    /**
     * IndexJoinExecutor executes index join operations.
     */
    class NestIndexJoinExecutor : public AbstractExecutor {
    public:
        /**
         * Creates a new nested index join executor.
         * @param exec_ctx the context that the hash join should be performed in
         * @param plan the nested index join plan node
         * @param child_executor the outer table
         */
        NestIndexJoinExecutor(ExecutorContext *exec_ctx, const NestedIndexJoinPlanNode *plan,
                              std::unique_ptr<AbstractExecutor> &&child_executor)
                : AbstractExecutor(exec_ctx), plan_(plan), left_executor_(std::move(child_executor)) {}
    
        auto GetOutputSchema() const -> const Schema & override {
            return plan_->OutputSchema();
        }
    
        void Init() override {
            left_executor_->Init();
    
            Schema left_schema = left_executor_->GetOutputSchema();
    
            // 获取右表中的索引
            IndexInfo *right_index_info = exec_ctx_->GetCatalog()->GetIndex(plan_->GetIndexOid());
            Index *right_index = right_index_info->index_.get();
    
            TableInfo *right_table_info = exec_ctx_->GetCatalog()->GetTable(plan_->GetInnerTableOid());
            Schema right_schema = right_table_info->schema_;
            TableHeap *right_table = right_table_info->table_.get();
    
            Tuple left_tuple;
            RID left_rid;
            while (left_executor_->Next(&left_tuple, &left_rid)) {
                // 标识 left_tuple 是否需要进行左外连接
                bool need_left_join = true;
    
                // 根据左表中的连接字段的值在 right_index 中查找所有匹配的 RID
                Value left_key_value = plan_->KeyPredicate()->Evaluate(&left_tuple, left_schema);
                Tuple left_key_tuple = Tuple(std::vector<Value>{left_key_value}, &right_index_info->key_schema_);
                std::vector<RID> result_rids;
                right_index->ScanKey(left_key_tuple, &result_rids, nullptr);
    
                // 依次处理右表中所有匹配的 RID
                for (auto &right_rid: result_rids) {
                    // 当前左元组不需要进行左外连接
                    need_left_join = false;
    
                    // 依次将左元组和右元组中的值追加到结果集中
                    Tuple right_tuple;
                    right_table->GetTuple(right_rid, &right_tuple, exec_ctx_->GetTransaction());
                    std::vector<Value> values;
                    for (size_t i = 0; i < left_schema.GetColumnCount(); i++) {
                        values.emplace_back(left_tuple.GetValue(&left_schema, i));
                    }
                    for (size_t i = 0; i < right_schema.GetColumnCount(); i++) {
                        values.emplace_back(right_tuple.GetValue(&right_schema, i));
                    }
                    results_.emplace(values, &plan_->OutputSchema());
                }
    
                // 对当前左元组进行左外连接
                if (need_left_join && plan_->join_type_ == JoinType::LEFT) {
                    std::vector<Value> values;
                    for (size_t i = 0; i < left_schema.GetColumnCount(); i++) {
                        values.emplace_back(left_tuple.GetValue(&left_schema, i));
                    }
                    for (size_t i = 0; i < right_schema.GetColumnCount(); i++) {
                        values.emplace_back(ValueFactory::GetNullValueByType(right_schema.GetColumn(i).GetType()));
                    }
                    results_.emplace(values, &plan_->OutputSchema());
                }
            }
        }
    
        auto Next(Tuple *tuple, RID *rid) -> bool override {
            // 从结果集队列中依次获取所有连接后的元组
            if (results_.empty()) {
                return false;
            }
            *tuple = results_.front();
            results_.pop();
            *rid = tuple->GetRid();
            return true;
        }
    
    private:
        /** The nested index join plan node. */
        const NestedIndexJoinPlanNode *plan_;
        /** 左表子执行器,对于索引连接操作来说通常是左表上的 Scan 执行器 */
        std::unique_ptr<AbstractExecutor> left_executor_;
        /** 连接结果集 */
        std::queue<Tuple> results_;
    };
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    statement ok
    create table provinces
    (
        province_name varchar(255),
        country_name  varchar(255),
        capital_id    int
    );
    
    statement ok
    insert into provinces
    values ('Guangdong', 'China', 1),
           ('Sichuan', 'China', 2),
           ('Jiangsu', 'China', 3),
           ('California', 'USA', 4),
           ('Hawaii', 'USA', 5),
           ('Texas', 'USA', 6);
    
    statement ok
    create table capital
    (
        capital_id int,
        population int
    );
    
    statement ok
    insert into capital
    values (1, 15000000),
           (3, 8000000),
           (4, 500000),
           (5, 380000),
           (7, 14000000),
           (8, 9000000);
    
    statement ok
    create index capital_id_index on capital(capital_id);
    
    query rowsort
    select *
    from provinces
    join capital on provinces.capital_id = capital.capital_id;
    ----
    Guangdong China 1 1 15000000
    Jiangsu China 3 3 8000000
    California USA 4 4 500000
    Hawaii USA 5 5 380000
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    七、评测结果

    在这里插入图片描述


    参考:

    https://blog.eleven.wiki/posts/cmu15-445-project3-query-execution/
    https://blog.csdn.net/AntiO2/article/details/131147264
    https://www.bilibili.com/video/BV1184y137rG/

    在这里插入图片描述

  • 相关阅读:
    编译原理实验-词法分析
    外汇天眼:外汇走势图的三种图表,看外汇图表这三种就够了
    Pytorch intermediate(三) RNN分类
    将编译过的C++库迅速部署在Visual Studio新项目中
    MySQL5.7版本在CentOS系统安装
    京东商品数据:8月京东环境电器行业数据分析
    vue3.0自定义搜索 带历史数据
    Flux脚本语言基础使用-数据类型(InFluxDB 查询语言)
    使用kubeadm快速部署一个K8s集群
    【leaflet】学习笔记1-4
  • 原文地址:https://blog.csdn.net/qq_43686863/article/details/132711982