• CMU15445 (Fall 2020) 数据库系统 Project#3 - Query Execution 详解


    前言

    经过前两个实验的铺垫,终于到了执行 SQL 语句的时候了。这篇博客将会介绍 SQL 执行计划实验的实现过程,下面进入正题。

    总体架构

    一条 SQL 查询的处理流程如下为:

    1. SQL 被 Parser 解析为抽象语法树 AST
    2. Binber 将 AST转换为 Bustub 可以理解的更高级的 AST
    3. Tree rewriter 将语法树转换为逻辑执行计划
    4. Optimizer 对逻辑计划进行优化,生成最终要执行的物理执行计划
    5. 执行引擎执行物理执行计划,返回查询结果

    物理执行计划定义了具体的执行方式,比如逻辑计划中的 Join 可以被替换为 Nest loop join、 Hash join 或者 Index join。由于 Fall 2020 版本的代码没有 Parser 和 Optimizer,所以测试用例中都是手动构造的物理执行计划。

    系统目录

    目录结构

    数据库会维护一个内部目录,以跟踪有关数据库的元数据。目录中可以存放数据表的信息、索引信息和统计数据。Bustub 中使用 Catalog 类表示系统目录,内部存放 table_oid_tTableMetadata 的映射表以及 index_oid_tIndexInfo 的映射表。

    TableMetadata 描述了一张表的信息,包括表名、Schema、表 id 和表的指针。代码如下所示:

    复制struct TableMetadata {
      TableMetadata(Schema schema, std::string name, std::unique_ptr &&table, table_oid_t oid)
          : schema_(std::move(schema)), name_(std::move(name)), table_(std::move(table)), oid_(oid) {}
      Schema schema_;
      std::string name_;
      std::unique_ptr table_;
      table_oid_t oid_;
    };
    

    TableHeap 代表了一张表,实现了 tuple 的增删改查操作。它的内部存放了第一个表页 TablePage 的 id,由于每个 TablePage 都会存放前一个和下一个表页的 id,这样就将表组织为双向链表,可以通过 TableIterator 进行迭代。

    TablePage 使用分槽页结构(slotted page),tuple 从后往前插入,每个 tuple 由一个 RID 标识。

    复制class RID {
     public:
      RID() = default;
    
      /**
       * Creates a new Record Identifier for the given page identifier and slot number.
       */
      RID(page_id_t page_id, uint32_t slot_num) : page_id_(page_id), slot_num_(slot_num) {}
    
      explicit RID(int64_t rid) : page_id_(static_cast(rid >> 32)), slot_num_(static_cast(rid)) {}
    
      inline int64_t Get() const { return (static_cast<int64_t>(page_id_)) << 32 | slot_num_; }
    
      inline page_id_t GetPageId() const { return page_id_; }
    
      inline uint32_t GetSlotNum() const { return slot_num_; }
    
      bool operator==(const RID &other) const { return page_id_ == other.page_id_ && slot_num_ == other.slot_num_; }
    
     private:
      page_id_t page_id_{INVALID_PAGE_ID};
      uint32_t slot_num_{0};  // logical offset from 0, 1...
    };
    

    表管理

    Catalog 中有三个与表相关的方法:CreateTableGetTable(const std::string &table_name)GetTable(table_oid_t table_oid),第一个方法用于创建一个新的表,后面两个方法用于获取表元数据:

    (bpm_, lock_manager_, log_manager_, txn);
      tables_[tid] = std::make_unique(schema, table_name, std::move(table_heap), tid);
      names_[table_name] = tid;
    
      return tables_[tid].get();
    }
    
    /** @return table metadata by name */
    TableMetadata *GetTable(const std::string &table_name) {
      auto it = names_.find(table_name);
      if (it == names_.end()) {
        throw std::out_of_range("Table is not found");
      }
    
      return tables_[it->second].get();
    }
    
    /** @return table metadata by oid */
    TableMetadata *GetTable(table_oid_t table_oid) {
      auto it = tables_.find(table_oid);
      if (it == tables_.end()) {
        throw std::out_of_range("Table is not found");
      }
    
      return it->second.get();
    }
    ">复制/**
    * Create a new table and return its metadata.
    * @param txn the transaction in which the table is being created
    * @param table_name the name of the new table
    * @param schema the schema of the new table
    * @return a pointer to the metadata of the new table
    */
    TableMetadata *CreateTable(Transaction *txn, const std::string &table_name, const Schema &schema) {
      BUSTUB_ASSERT(names_.count(table_name) == 0, "Table names should be unique!");
      auto tid = next_table_oid_++;
    
      auto table_heap = std::make_unique(bpm_, lock_manager_, log_manager_, txn);
      tables_[tid] = std::make_unique(schema, table_name, std::move(table_heap), tid);
      names_[table_name] = tid;
    
      return tables_[tid].get();
    }
    
    /** @return table metadata by name */
    TableMetadata *GetTable(const std::string &table_name) {
      auto it = names_.find(table_name);
      if (it == names_.end()) {
        throw std::out_of_range("Table is not found");
      }
    
      return tables_[it->second].get();
    }
    
    /** @return table metadata by oid */
    TableMetadata *GetTable(table_oid_t table_oid) {
      auto it = tables_.find(table_oid);
      if (it == tables_.end()) {
        throw std::out_of_range("Table is not found");
      }
    
      return it->second.get();
    }
    

    索引管理

    创建索引

    Catalog 使用 CreateIndex() 方法创建索引,创建的时候需要将表中的数据转换为键值对插入索引中:

    (meta, bpm_);
    
      // 初始化索引
      auto table = GetTable(table_name)->table_.get();
      for (auto it = table->Begin(txn); it != table->End(); ++it) {
        index->InsertEntry(it->KeyFromTuple(schema, key_schema, key_attrs), it->GetRid(), txn);
      }
    
      indexes_[id] = std::make_unique(key_schema, index_name, std::move(index), id, table_name, keysize);
      index_names_[table_name][index_name] = id;
      return indexes_[id].get();
    }
    ">复制/**
      * Create a new index, populate existing data of the table and return its metadata.
      * @param txn the transaction in which the table is being created
      * @param index_name the name of the new index
      * @param table_name the name of the table
      * @param schema the schema of the table
      * @param key_schema the schema of the key
      * @param key_attrs key attributes
      * @param keysize size of the key
      * @return a pointer to the metadata of the new table
      */
    template <class KeyType, class ValueType, class KeyComparator>
    IndexInfo *CreateIndex(Transaction *txn, const std::string &index_name, const std::string &table_name,
                            const Schema &schema, const Schema &key_schema, const std::vector<uint32_t> &key_attrs,
                            size_t keysize) {
      BUSTUB_ASSERT(index_names_.count(index_name) == 0, "Index names should be unique!");
      auto id = next_index_oid_++;
    
      auto meta = new IndexMetadata(index_name, table_name, &schema, key_attrs);
      auto index = std::make_unique(meta, bpm_);
    
      // 初始化索引
      auto table = GetTable(table_name)->table_.get();
      for (auto it = table->Begin(txn); it != table->End(); ++it) {
        index->InsertEntry(it->KeyFromTuple(schema, key_schema, key_attrs), it->GetRid(), txn);
      }
    
      indexes_[id] = std::make_unique(key_schema, index_name, std::move(index), id, table_name, keysize);
      index_names_[table_name][index_name] = id;
      return indexes_[id].get();
    }
    

    查询索引

    数据库中有多个表,一个表可以拥有多个索引,但是每个索引对应一个全局唯一的 index_oid_t

    second.find(index_name);
      if (iit == it->second.end()) {
        throw std::out_of_range("Index is not found");
      }
    
      return indexes_[iit->second].get();
    }
    
    IndexInfo *GetIndex(index_oid_t index_oid) {
      auto it = indexes_.find(index_oid);
      if (it == indexes_.end()) {
        throw std::out_of_range("Index is not found");
      }
    
      return it->second.get();
    }
    
    std::vector GetTableIndexes(const std::string &table_name) {
      auto it = index_names_.find(table_name);
      if (it == index_names_.end()) {
        return {};
      };
    
      std::vector indexes;
      for (auto &[name, id] : it->second) {
        indexes.push_back(GetIndex(id));
      }
    
      return indexes;
    }
    ">复制IndexInfo *GetIndex(const std::string &index_name, const std::string &table_name) {
      auto it = index_names_.find(table_name);
      if (it == index_names_.end()) {
        throw std::out_of_range("Table is not found");
      }
    
      auto iit = it->second.find(index_name);
      if (iit == it->second.end()) {
        throw std::out_of_range("Index is not found");
      }
    
      return indexes_[iit->second].get();
    }
    
    IndexInfo *GetIndex(index_oid_t index_oid) {
      auto it = indexes_.find(index_oid);
      if (it == indexes_.end()) {
        throw std::out_of_range("Index is not found");
      }
    
      return it->second.get();
    }
    
    std::vector GetTableIndexes(const std::string &table_name) {
      auto it = index_names_.find(table_name);
      if (it == index_names_.end()) {
        return {};
      };
    
      std::vector indexes;
      for (auto &[name, id] : it->second) {
        indexes.push_back(GetIndex(id));
      }
    
      return indexes;
    }
    

    执行器

    如下图的右下角所示,执行计划由一系列算子组合而成,每个算子可以拥有自己的子算子,数据从子算子流向父算子,最终从根节点输出执行结果。执行计划有三种执行模型:

    • 迭代模型:每个算子都会实现 Next() 方法,父算子调用子算子的 Next() 方法获取一条记录,外部通过不断调用根节点的 Next() 方法直至没有更多数据输出。这种方法的优点就是一次只产生一条 Tuple,内存占用小

    • 物化模型:每个算子一次性返回所有记录

    • 向量模型:迭代模型和物化模型的折中版本,一次返回一批数据

    本次实验使用迭代模型,伪代码如下图所示:

    Bustub 使用执行引擎 ExecutionEngine 执行物理计划,这个类的代码很简洁,只有一个 Execute() 方法。可以看到这个方法会先将执行计划转换为对应的执行器 executor,使用 Init() 初始化后循环调用 executorNext() 方法获取查询结果:

    复制class ExecutionEngine {
     public:
      ExecutionEngine(BufferPoolManager *bpm, TransactionManager *txn_mgr, Catalog *catalog)
          : bpm_(bpm), txn_mgr_(txn_mgr), catalog_(catalog) {}
    
      DISALLOW_COPY_AND_MOVE(ExecutionEngine);
    
      bool Execute(const AbstractPlanNode *plan, std::vector *result_set, Transaction *txn,
                   ExecutorContext *exec_ctx) {
        // construct executor
        auto executor = ExecutorFactory::CreateExecutor(exec_ctx, plan);
    
        // prepare
        executor->Init();
    
        // execute
        try {
          Tuple tuple;
          RID rid;
          while (executor->Next(&tuple, &rid)) {
            if (result_set != nullptr) {
              result_set->push_back(tuple);
            }
          }
        } catch (Exception &e) {
          // TODO(student): handle exceptions
        }
    
        return true;
      }
    
     private:
      [[maybe_unused]] BufferPoolManager *bpm_;
      [[maybe_unused]] TransactionManager *txn_mgr_;
      [[maybe_unused]] Catalog *catalog_;
    };
    

    全表扫描

    SeqScanExecutor 用于进行全表扫描操作,内部带有 SeqScanPlan 执行计划:

    复制/**
     * SeqScanExecutor executes a sequential scan over a table.
     */
    class SeqScanExecutor : public AbstractExecutor {
     public:
      /**
       * Creates a new sequential scan executor.
       * @param exec_ctx the executor context
       * @param plan the sequential scan plan to be executed
       */
      SeqScanExecutor(ExecutorContext *exec_ctx, const SeqScanPlanNode *plan);
    
      void Init() override;
    
      bool Next(Tuple *tuple, RID *rid) override;
    
      const Schema *GetOutputSchema() override { return plan_->OutputSchema(); }
    
     private:
      /** The sequential scan plan node to be executed. */
      const SeqScanPlanNode *plan_;
      TableMetadata *table_metadata_;
      TableIterator it_;
    };
    

    SeqScanPlan 声明如下,Schema *output 指明了输出列,table_oid 代表被扫描的表,而 AbstractExpression *predicate 代表谓词算子:

    复制/**
     * SeqScanPlanNode identifies a table that should be scanned with an optional predicate.
     */
    class SeqScanPlanNode : public AbstractPlanNode {
     public:
      /**
       * Creates a new sequential scan plan node.
       * @param output the output format of this scan plan node
       * @param predicate the predicate to scan with, tuples are returned if predicate(tuple) = true or predicate = nullptr
       * @param table_oid the identifier of table to be scanned
       */
      SeqScanPlanNode(const Schema *output, const AbstractExpression *predicate, table_oid_t table_oid)
          : AbstractPlanNode(output, {}), predicate_{predicate}, table_oid_(table_oid) {}
    
      PlanType GetType() const override { return PlanType::SeqScan; }
    
      /** @return the predicate to test tuples against; tuples should only be returned if they evaluate to true */
      const AbstractExpression *GetPredicate() const { return predicate_; }
    
      /** @return the identifier of the table that should be scanned */
      table_oid_t GetTableOid() const { return table_oid_; }
    
     private:
      /** The predicate that all returned tuples must satisfy. */
      const AbstractExpression *predicate_;
      /** The table whose tuples should be scanned. */
      table_oid_t table_oid_;
    };
    

    举个栗子,SELECT name, age FROM t_student WHERE age > 16age > 16 部分就是 predicate ,实际数据类型为 ComparisonExpression ,而 predicate 又由 ColumnValueExpression(代表 age 列的值) 和 ConstantValueExpression(代表 16)组成。

    要实现全表扫描只需在 Next 函数中判断迭代器所指的 tuple 是否满足查询条件并递增迭代器,如果满足条件就返回该 tuple,不满足就接着迭代。

    复制SeqScanExecutor::SeqScanExecutor(ExecutorContext *exec_ctx, const SeqScanPlanNode *plan)
        : AbstractExecutor(exec_ctx),
    	  plan_(plan), table_metadata_(exec_ctx->GetCatalog()->GetTable(plan->GetTableOid())) {}
    
    void SeqScanExecutor::Init() { it_ = table_metadata_->table_->Begin(exec_ctx_->GetTransaction()); }
    
    bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
      auto predicate = plan_->GetPredicate();
    
      while (it_ != table_metadata_->table_->End()) {
        *tuple = *it_++;
        *rid = tuple->GetRid();
    
        if (!predicate || predicate->Evaluate(tuple, &table_metadata_->schema_).GetAs<bool>()) {
          // 只保留输出列
          std::vector values;
          for (auto &col : GetOutputSchema()->GetColumns()) {
            values.push_back(col.GetExpr()->Evaluate(tuple, &table_metadata_->schema_));
          }
    
          *tuple = {values, GetOutputSchema()};
          return true;
        }
      }
    
      return false;
    }
    

    测试用例中通过下述代码手动构造出 SELECT colA, colB FROM test_1 WHERE colA < 500 的全表扫描执行计划并执行:

    schema_;
    auto *colA = MakeColumnValueExpression(schema, 0, "colA");
    auto *colB = MakeColumnValueExpression(schema, 0, "colB");
    auto *const500 = MakeConstantValueExpression(ValueFactory::GetIntegerValue(500));
    auto *predicate = MakeComparisonExpression(colA, const500, ComparisonType::LessThan);
    auto *out_schema = MakeOutputSchema({{"colA", colA}, {"colB", colB}});
    SeqScanPlanNode plan{out_schema, predicate, table_info->oid_};
    
    // Execute
    std::vector result_set;
    GetExecutionEngine()->Execute(&plan, &result_set, GetTxn(), GetExecutorContext());
    ">复制// Construct query plan
    TableMetadata *table_info = GetExecutorContext()->GetCatalog()->GetTable("test_1");
    Schema &schema = table_info->schema_;
    auto *colA = MakeColumnValueExpression(schema, 0, "colA");
    auto *colB = MakeColumnValueExpression(schema, 0, "colB");
    auto *const500 = MakeConstantValueExpression(ValueFactory::GetIntegerValue(500));
    auto *predicate = MakeComparisonExpression(colA, const500, ComparisonType::LessThan);
    auto *out_schema = MakeOutputSchema({{"colA", colA}, {"colB", colB}});
    SeqScanPlanNode plan{out_schema, predicate, table_info->oid_};
    
    // Execute
    std::vector result_set;
    GetExecutionEngine()->Execute(&plan, &result_set, GetTxn(), GetExecutorContext());
    

    索引扫描

    上一节中实现了 B+ 树索引,使用索引可以减小查询范围,大大加快查询速度。由于 IndexScanExecutor 不是模板类,所以这里使用的 KeyTypeGenericKey<8>KeyComparatorGenericComparator<8>

    复制#define B_PLUS_TREE_INDEX_ITERATOR_TYPE IndexIterator, RID, GenericComparator<8>>
    #define B_PLUS_TREE_INDEX_TYPE BPlusTreeIndex, RID, GenericComparator<8>>
    
    class IndexScanExecutor : public AbstractExecutor {
     public:
      /**
       * Creates a new index scan executor.
       * @param exec_ctx the executor context
       * @param plan the index scan plan to be executed
       */
      IndexScanExecutor(ExecutorContext *exec_ctx, const IndexScanPlanNode *plan);
    
      const Schema *GetOutputSchema() override { return plan_->OutputSchema(); };
    
      void Init() override;
    
      bool Next(Tuple *tuple, RID *rid) override;
    
     private:
      /** The index scan plan node to be executed. */
      const IndexScanPlanNode *plan_;
      IndexInfo *index_info_;
      B_PLUS_TREE_INDEX_TYPE *index_;
      TableMetadata *table_metadata_;
      B_PLUS_TREE_INDEX_ITERATOR_TYPE it_;
    };
    

    索引扫描的代码和全表扫描几乎一样,只是迭代器换成了 B+ 树的迭代器:

    复制IndexScanExecutor::IndexScanExecutor(ExecutorContext *exec_ctx, const IndexScanPlanNode *plan)
        : AbstractExecutor(exec_ctx),
          plan_(plan),
          index_info_(exec_ctx->GetCatalog()->GetIndex(plan->GetIndexOid())),
          index_(dynamic_cast(index_info_->index_.get())),
          table_metadata_(exec_ctx->GetCatalog()->GetTable(index_info_->table_name_)) {}
    
    void IndexScanExecutor::Init() { it_ = index_->GetBeginIterator(); }
    
    bool IndexScanExecutor::Next(Tuple *tuple, RID *rid) {
      auto predicate = plan_->GetPredicate();
    
      while (it_ != index_->GetEndIterator()) {
        *rid = (*it_).second;
        table_metadata_->table_->GetTuple(*rid, tuple, exec_ctx_->GetTransaction());
        ++it_;
    
        if (!predicate || predicate->Evaluate(tuple, &table_metadata_->schema_).GetAs<bool>()) {
          // 只保留输出列
          std::vector values;
          for (auto &col : GetOutputSchema()->GetColumns()) {
            values.push_back(col.GetExpr()->Evaluate(tuple, &table_metadata_->schema_));
          }
    
          *tuple = {values, GetOutputSchema()};
          return true;
        }
      }
    
      return false;
    }
    

    插入

    插入操作分为两种:

    • raw inserts:插入数据直接来自插入执行器本身,比如 INSERT INTO tbl_user VALUES (1, 15), (2, 16)
    • not-raw inserts:插入的数据来自子执行器,比如 INSERT INTO tbl_user1 SELECT * FROM tbl_user2

    可以使用插入计划的 IsRawInsert() 判断插入操作的类型,这个函数根据子查询器列表是否为空进行判断:

    复制/** @return true if we embed insert values directly into the plan, false if we have a child plan providing tuples */
    bool IsRawInsert() const { return GetChildren().empty(); }
    

    如果是 raw inserts,我们直接根据插入执行器中的数据构造 tuple 并插入表中,否则调用子执行器的 Next 函数获取数据并插入表中。因为表中可能建了索引,所以插入数据之后需要更新索引:

    复制class InsertExecutor : public AbstractExecutor {
     public:
      /**
       * Creates a new insert executor.
       * @param exec_ctx the executor context
       * @param plan the insert plan to be executed
       * @param child_executor the child executor to obtain insert values from, can be nullptr
       */
      InsertExecutor(ExecutorContext *exec_ctx, const InsertPlanNode *plan,
                     std::unique_ptr &&child_executor);
    
      const Schema *GetOutputSchema() override { return plan_->OutputSchema(); };
    
      void Init() override;
    
      // Note that Insert does not make use of the tuple pointer being passed in.
      // We return false if the insert failed for any reason, and return true if all inserts succeeded.
      bool Next([[maybe_unused]] Tuple *tuple, RID *rid) override;
    
      void InsertTuple(Tuple *tuple, RID *rid);
    
     private:
      /** The insert plan node to be executed. */
      const InsertPlanNode *plan_;
      std::unique_ptr child_executor_;
      TableMetadata *table_metadata_;
      std::vector index_infos_;
      uint32_t index_{0};
    };
    
    
    InsertExecutor::InsertExecutor(ExecutorContext *exec_ctx, const InsertPlanNode *plan,
                                   std::unique_ptr &&child_executor)
        : AbstractExecutor(exec_ctx),
          plan_(plan),
          child_executor_(std::move(child_executor)),
          table_metadata_(exec_ctx->GetCatalog()->GetTable(plan->TableOid())),
          index_infos_(exec_ctx->GetCatalog()->GetTableIndexes(table_metadata_->name_)) {}
    
    void InsertExecutor::Init() {
      if (!plan_->IsRawInsert()) {
        child_executor_->Init();
      }
    }
    
    bool InsertExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
      if (plan_->IsRawInsert()) {
        if (index_ >= plan_->RawValues().size()) {
          return false;
        }
    
        *tuple = {plan_->RawValuesAt(index_++), &table_metadata_->schema_};
        InsertTuple(tuple, rid);
        return true;
      } else {
        auto has_data = child_executor_->Next(tuple, rid);
        if (has_data) {
          InsertTuple(tuple, rid);
        }
        return has_data;
      }
    }
    
    void InsertExecutor::InsertTuple(Tuple *tuple, RID *rid) {
      // 更新数据表
      table_metadata_->table_->InsertTuple(*tuple, rid, exec_ctx_->GetTransaction());
    
      // 更新索引
      for (auto &index_info : index_infos_) {
        index_info->index_->InsertEntry(
            tuple->KeyFromTuple(table_metadata_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
            exec_ctx_->GetTransaction());
      }
    }
    

    更新

    UpdateExecutor 从子执行器获取需要更新的 tuple,并调用 GenerateUpdatedTuple 生成更新之后的 tuple,同样也要更新索引。

    复制class UpdateExecutor : public AbstractExecutor {
      friend class UpdatePlanNode;
    
     public:
      UpdateExecutor(ExecutorContext *exec_ctx, const UpdatePlanNode *plan,
                     std::unique_ptr &&child_executor);
    
      const Schema *GetOutputSchema() override { return plan_->OutputSchema(); };
    
      void Init() override;
    
      bool Next([[maybe_unused]] Tuple *tuple, RID *rid) override;
    
      /* Given an old tuple, creates a new updated tuple based on the updateinfo given in the plan */
      Tuple GenerateUpdatedTuple(const Tuple &old_tup);
    
     private:
      const UpdatePlanNode *plan_;
      const TableMetadata *table_info_;
      std::unique_ptr child_executor_;
      std::vector index_infos_;
    };
    
    
    bool UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
      if (!child_executor_->Next(tuple, rid)) {
        return false;
      }
    
      // 更新数据表
      auto new_tuple = GenerateUpdatedTuple(*tuple);
      table_info_->table_->UpdateTuple(new_tuple, *rid, exec_ctx_->GetTransaction());
    
      // 更新索引
      for (auto &index_info : index_infos_) {
        // 删除旧的 tuple
        index_info->index_->DeleteEntry(
            tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
            exec_ctx_->GetTransaction());
    
        // 插入新的 tuple
        index_info->index_->InsertEntry(
            new_tuple.KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
            exec_ctx_->GetTransaction());
      }
    
      return true;
    }
    

    删除

    DeleteExecutor 的数据来自于子执行器,删除之后需要更新索引。

    复制DeleteExecutor::DeleteExecutor(ExecutorContext *exec_ctx, const DeletePlanNode *plan,
                                   std::unique_ptr &&child_executor)
        : AbstractExecutor(exec_ctx),
          plan_(plan),
          child_executor_(std::move(child_executor)),
          table_metadata_(exec_ctx->GetCatalog()->GetTable(plan->TableOid())),
          index_infos_(exec_ctx->GetCatalog()->GetTableIndexes(table_metadata_->name_)) {}
    
    void DeleteExecutor::Init() { child_executor_->Init(); }
    
    bool DeleteExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
      if (!child_executor_->Next(tuple, rid)) {
        return false;
      }
    
      table_metadata_->table_->MarkDelete(*rid, exec_ctx_->GetTransaction());
    
      // 更新索引
      for (auto &index_info : index_infos_) {
        index_info->index_->DeleteEntry(
            tuple->KeyFromTuple(table_metadata_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
            exec_ctx_->GetTransaction());
      }
    
      return true;
    }
    

    嵌套循环连接

    要实现连接操作,最简单粗暴的方法就是开个二重循环,外层循环是小表(指的是数据页较少),内层循环是大表,小表驱动大表。但是这种连接方法效率非常低,因为完全无法利用到缓存池(分块变成四重循环之后效果会好一些):

    假设一次磁盘 IO 的时间是 0.1ms,那么大表驱动小表耗时 1.3 小时,小表驱动大表耗时 1.1 小时,可见速度慢的感人。

    循环嵌套连接执行器 NestLoopJoinExecutor 的声明如下,可以看到数据成员包括 left_executor_right_executor,前者代表外表执行器,后者代表内表的执行器:

    复制class NestedLoopJoinExecutor : public AbstractExecutor {
     public:
      /**
       * Creates a new NestedLoop join executor.
       * @param exec_ctx the executor context
       * @param plan the NestedLoop join plan to be executed
       * @param left_executor the child executor that produces tuple for the left side of join
       * @param right_executor the child executor that produces tuple for the right side of join
       *
       */
      NestedLoopJoinExecutor(ExecutorContext *exec_ctx, const NestedLoopJoinPlanNode *plan,
                             std::unique_ptr &&left_executor,
                             std::unique_ptr &&right_executor);
    
      const Schema *GetOutputSchema() override { return plan_->OutputSchema(); };
    
      void Init() override;
    
      bool Next(Tuple *tuple, RID *rid) override;
    
     private:
      /** The NestedLoop plan node to be executed. */
      const NestedLoopJoinPlanNode *plan_;
      std::unique_ptr left_executor_;
      std::unique_ptr right_executor_;
      Tuple left_tuple_;
      bool is_done_;
    };
    

    由于一次只能返回一个 tuple,所以需要先保存外表的一个 tuple,然后循环调用内表执行器的 Next() 方法直至匹配,当内表遍历完一遍之后需要更新外表的 tuple。这个部分的代码写的比较奇怪,如果有 python 的 yield 关键字可能会好写很多:

    复制void NestedLoopJoinExecutor::Init() {
      left_executor_->Init();
      right_executor_->Init();
    
      RID left_rid;
      is_done_ = !left_executor_->Next(&left_tuple_, &left_rid);
    }
    
    bool NestedLoopJoinExecutor::Next(Tuple *tuple, RID *rid) {
      Tuple right_tuple;
      RID right_rid, left_rid;
      auto predicate = plan_->Predicate();
      auto left_schema = left_executor_->GetOutputSchema();
      auto right_schema = right_executor_->GetOutputSchema();
    
      while (!is_done_) {
        while (right_executor_->Next(&right_tuple, &right_rid)) {
          if (!predicate || predicate->EvaluateJoin(&left_tuple_, left_schema, &right_tuple, right_schema).GetAs<bool>()) {
            // 拼接 tuple
            std::vector values;
            for (auto &col : GetOutputSchema()->GetColumns()) {
              values.push_back(col.GetExpr()->EvaluateJoin(&left_tuple_, left_schema, &right_tuple, right_schema));
            }
    
            *tuple = {values, GetOutputSchema()};
            return true;
          }
        }
    
        is_done_ = !left_executor_->Next(&left_tuple_, &left_rid);
        right_executor_->Init();
      }
    
      return false;
    }
    

    索引循环连接

    索引循环连接可以减少内表的扫描范围和磁盘 IO 次数,大大提升连接效率。假设走一次索引的 IO 次数为常数 Cn" role="presentation">Cn,那么总共只需 M+mC" role="presentation">M+mC 次 IO:

    嵌套循环执行器 NestIndexJoinExecutor 的声明如下,child_executor_ 是外表的执行器,内表的数据由索引提供,所以不需要内表的执行器:

    复制class NestIndexJoinExecutor : public AbstractExecutor {
     public:
      NestIndexJoinExecutor(ExecutorContext *exec_ctx, const NestedIndexJoinPlanNode *plan,
                            std::unique_ptr &&child_executor);
    
      const Schema *GetOutputSchema() override { return plan_->OutputSchema(); }
    
      void Init() override;
    
      bool Next(Tuple *tuple, RID *rid) override;
    
     private:
      /** The nested index join plan node. */
      const NestedIndexJoinPlanNode *plan_;
      std::unique_ptr child_executor_;
      TableMetadata *inner_table_info_;
      IndexInfo *index_info_;
      Tuple left_tuple_;
      std::vector inner_result_;
    };
    

    在索引上寻找匹配值时需要将 left_tuple_ 转换为内表索引的 key

    复制bool NestIndexJoinExecutor::Next(Tuple *tuple, RID *rid) {
      Tuple right_tuple;
      RID left_rid, right_rid;
    
      auto left_schema = plan_->OuterTableSchema();
      auto right_schema = plan_->InnerTableSchema();
    
      while (true) {
        if (!inner_result_.empty()) {
          right_rid = inner_result_.back();
          inner_result_.pop_back();
          inner_table_info_->table_->GetTuple(right_rid, &right_tuple, exec_ctx_->GetTransaction());
    
          // 拼接 tuple
          std::vector values;
          for (auto &col : GetOutputSchema()->GetColumns()) {
            values.push_back(col.GetExpr()->EvaluateJoin(&left_tuple_, left_schema, &right_tuple, right_schema));
          }
    
          *tuple = {values, GetOutputSchema()};
          return true;
        }
    
        if (!child_executor_->Next(&left_tuple_, &left_rid)) {
          return false;
        }
    
        // 在内表的索引上寻找匹配值列表
        auto value = plan_->Predicate()->GetChildAt(0)->EvaluateJoin(&left_tuple_, left_schema, &right_tuple, right_schema);
        auto inner_key = Tuple({value}, index_info_->index_->GetKeySchema());
        index_info_->index_->ScanKey(inner_key, &inner_result_, exec_ctx_->GetTransaction());
      }
    
      return false;
    }
    

    聚合

    由于 Fall2020 没有要求实现哈希索引,所以聚合执行器 AggregationExecutor 内部维护的是直接放在内存中的哈希表 SimpleAggregationHashTable 以及哈希表迭代器 aht_iterator_。将键值对插入哈希表的时候会立刻更新哈希表中保存的聚合结果,最终的查询结果也从该哈希表获取:

    复制void AggregationExecutor::Init() {
      child_->Init();
    
      // 构造哈希表
      Tuple tuple;
      RID rid;
      while (child_->Next(&tuple, &rid)) {
        aht_.InsertCombine(MakeKey(&tuple), MakeVal(&tuple));
      }
    
      aht_iterator_ = aht_.Begin();
    }
    
    bool AggregationExecutor::Next(Tuple *tuple, RID *rid) {
      auto having = plan_->GetHaving();
    
      while (aht_iterator_ != aht_.End()) {
        auto group_bys = aht_iterator_.Key().group_bys_;
        auto aggregates = aht_iterator_.Val().aggregates_;
        ++aht_iterator_;
    
        if (!having || having->EvaluateAggregate(group_bys, aggregates).GetAs<bool>()) {
          std::vector values;
          for (auto &col : GetOutputSchema()->GetColumns()) {
            values.push_back(col.GetExpr()->EvaluateAggregate(group_bys, aggregates));
          }
    
          *tuple = {values, GetOutputSchema()};
          return true;
        }
      }
      
      return false;
    }
    

    测试

    在终端输入:

    复制cd build
    cmake ..
    make 
    
    make executor_test
    make grading_executor_test	# 从 grade scope 扒下来的测试代码
    
    ./test/executor_test
    ./test/grading_executor_test
    

    测试结果如下,成功通过了所有测试用例:

    后记

    通过这次实验,可以加深对目录、查询计划、迭代模型和 tuple 页布局的理解,算是收获满满的一次实验了,以上~~

  • 相关阅读:
    python经典百题之打印楼梯
    Numpy入门[19]——结构化数组与记录数组
    都说“存算分离”好,分布式数据库为何还要“进一步分离”?
    设计模式详解_外观模式
    VMware vCenter Server 6.7 安装
    部分准备金银行已经过时
    Dapper迁移SqlSugar问题汇总
    【hadoop】部署hadoop的伪分布模式
    8年经验面试官详解 Java 面试秘诀
    Python中的迭代器、生成器和装饰器
  • 原文地址:https://www.cnblogs.com/zhiyiYo/p/17487153.html