• Arrow parquet 之 String Reader


    Switch

    cpp/src/parquet/column_reader.cc

    TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool)
          : BASE(descr, pool) {
        leaf_info_ = leaf_info;
        nullable_values_ = leaf_info.HasNullableValues();
        at_record_start_ = true;
        records_read_ = 0;
        values_written_ = 0;
        values_capacity_ = 0;
        null_count_ = 0;
        levels_written_ = 0;
        levels_position_ = 0;
        levels_capacity_ = 0;
        uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY);
    
        if (uses_values_) {
          values_ = AllocateBuffer(pool);
        }
        valid_bits_ = AllocateBuffer(pool);
        def_levels_ = AllocateBuffer(pool);
        rep_levels_ = AllocateBuffer(pool);
        Reset();
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    NextBatch 顺序

    以下逻辑均在 cpp/src/parquet/arrow/reader.cc

      ::arrow::Iterator batches = ::arrow::MakeFunctionIterator(
          [readers, batch_schema, num_rows,
           this]() mutable -> ::arrow::Result {
            ::arrow::ChunkedArrayVector columns(readers.size());
    
            // don't reserve more rows than necessary
            int64_t batch_size = std::min(properties().batch_size(), num_rows);
            num_rows -= batch_size;
    
            RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
                reader_properties_.use_threads(), static_cast(readers.size()),
                [&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); }));
    
            for (const auto& column : columns) {
              if (column == nullptr || column->length() == 0) {
                return ::arrow::IterationTraits::End();
              }
            }
    
            auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
            auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
    
            // NB: explicitly preserve table so that table_reader doesn't outlive it
            return ::arrow::MakeFunctionIterator(
                [table, table_reader] { return table_reader->Next(); });
          });
    
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    先通过 metadata 算出了 num_rows

    
      int64_t num_rows = 0;
      for (int row_group : row_groups) {
        num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后计算 真实的batch_size

    // don't reserve more rows than necessary
            int64_t batch_size = std::min(properties().batch_size(), num_rows);
            num_rows -= batch_size;
    
    • 1
    • 2
    • 3
     ::arrow::Status NextBatch(int64_t batch_size,
                                std::shared_ptr<::arrow::ChunkedArray>* out) final {
        RETURN_NOT_OK(LoadBatch(batch_size));
        RETURN_NOT_OK(BuildArray(batch_size, out));
        for (int x = 0; x < (*out)->num_chunks(); x++) {
          RETURN_NOT_OK((*out)->chunk(x)->Validate());
        }
        return Status::OK();
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    NextRowGroup and TransferColumnData 位置
      Status LoadBatch(int64_t records_to_read) final {
        BEGIN_PARQUET_CATCH_EXCEPTIONS
        out_ = nullptr;
        record_reader_->Reset();
        // Pre-allocation gives much better performance for flat columns
        record_reader_->Reserve(records_to_read);
        while (records_to_read > 0) {
          if (!record_reader_->HasMoreData()) {
            break;
          }
          int64_t records_read = record_reader_->ReadRecords(records_to_read);
          records_to_read -= records_read;
          if (records_read == 0) {
            NextRowGroup();
          }
        }
        RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), descr_,
                                         ctx_->pool, &out_));
        return Status::OK();
        END_PARQUET_CATCH_EXCEPTIONS
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
  • 相关阅读:
    Python(四十三)——下载PDF
    pytorch如何将bin格式模型导出pt格式模型?
    【接口自动化测试】第一节.接口自动化测试基础和框架介绍
    〖Python 数据库开发实战 - MySQL篇⑮〗- 数据表结果集的排序与去除重复(去重)
    实现内网穿透的最佳解决方案(无实名认证,完全免费)
    scala 转换、过滤、分组、排序
    关于#html5#的问题:设计一个真正的大数据与科技传播的响应式网站(相关搜索:设计网站)
    华为云王楠楠:分布式云原生全域调度的技术和实践
    C++ 11 新特性 汇总篇
    【人话版】WEB3黑暗森林中的隐私博弈
  • 原文地址:https://blog.csdn.net/zhixingheyi_tian/article/details/126244082