cpp/src/parquet/column_reader.cc
TypedRecordReader(const ColumnDescriptor* descr, LevelInfo leaf_info, MemoryPool* pool)
: BASE(descr, pool) {
leaf_info_ = leaf_info;
nullable_values_ = leaf_info.HasNullableValues();
at_record_start_ = true;
records_read_ = 0;
values_written_ = 0;
values_capacity_ = 0;
null_count_ = 0;
levels_written_ = 0;
levels_position_ = 0;
levels_capacity_ = 0;
uses_values_ = !(descr->physical_type() == Type::BYTE_ARRAY);
if (uses_values_) {
values_ = AllocateBuffer(pool);
}
valid_bits_ = AllocateBuffer(pool);
def_levels_ = AllocateBuffer(pool);
rep_levels_ = AllocateBuffer(pool);
Reset();
}
以下逻辑均在 cpp/src/parquet/arrow/reader.cc
::arrow::Iterator batches = ::arrow::MakeFunctionIterator(
[readers, batch_schema, num_rows,
this]() mutable -> ::arrow::Result {
::arrow::ChunkedArrayVector columns(readers.size());
// don't reserve more rows than necessary
int64_t batch_size = std::min(properties().batch_size(), num_rows);
num_rows -= batch_size;
RETURN_NOT_OK(::arrow::internal::OptionalParallelFor(
reader_properties_.use_threads(), static_cast(readers.size()),
[&](int i) { return readers[i]->NextBatch(batch_size, &columns[i]); }));
for (const auto& column : columns) {
if (column == nullptr || column->length() == 0) {
return ::arrow::IterationTraits::End();
}
}
auto table = ::arrow::Table::Make(batch_schema, std::move(columns));
auto table_reader = std::make_shared<::arrow::TableBatchReader>(*table);
// NB: explicitly preserve table so that table_reader doesn't outlive it
return ::arrow::MakeFunctionIterator(
[table, table_reader] { return table_reader->Next(); });
});
先通过 metadata 算出了 num_rows
int64_t num_rows = 0;
for (int row_group : row_groups) {
num_rows += parquet_reader()->metadata()->RowGroup(row_group)->num_rows();
}
然后计算 真实的batch_size
// don't reserve more rows than necessary
int64_t batch_size = std::min(properties().batch_size(), num_rows);
num_rows -= batch_size;
::arrow::Status NextBatch(int64_t batch_size,
std::shared_ptr<::arrow::ChunkedArray>* out) final {
RETURN_NOT_OK(LoadBatch(batch_size));
RETURN_NOT_OK(BuildArray(batch_size, out));
for (int x = 0; x < (*out)->num_chunks(); x++) {
RETURN_NOT_OK((*out)->chunk(x)->Validate());
}
return Status::OK();
}
Status LoadBatch(int64_t records_to_read) final {
BEGIN_PARQUET_CATCH_EXCEPTIONS
out_ = nullptr;
record_reader_->Reset();
// Pre-allocation gives much better performance for flat columns
record_reader_->Reserve(records_to_read);
while (records_to_read > 0) {
if (!record_reader_->HasMoreData()) {
break;
}
int64_t records_read = record_reader_->ReadRecords(records_to_read);
records_to_read -= records_read;
if (records_read == 0) {
NextRowGroup();
}
}
RETURN_NOT_OK(TransferColumnData(record_reader_.get(), field_->type(), descr_,
ctx_->pool, &out_));
return Status::OK();
END_PARQUET_CATCH_EXCEPTIONS
}