Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed May 8, 2024
1 parent 4930aae commit 2cf9591
Showing 1 changed file with 15 additions and 14 deletions.
29 changes: 15 additions & 14 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1036,13 +1036,19 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,

// Read definition and repetition levels. Also return the number of definition levels
// and number of values to read. This function is called before reading values.
//
// ReadLevels will throw exception when any num-levels read is not equal to the number
// of the levels can be read.
void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
int64_t* num_def_levels, int64_t* non_null_values_to_read) {
batch_size = std::min(batch_size, this->available_values_current_page());

// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0 && def_levels != nullptr) {
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
*non_null_values_to_read +=
Expand All @@ -1055,7 +1061,7 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,
// Not present for non-repeated fields
if (this->max_rep_level_ > 0 && rep_levels != nullptr) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (def_levels != nullptr && *num_def_levels != num_rep_levels) {
if (batch_size != num_rep_levels) {
throw ParquetException(kErrorRepDefLevelInEqual);
}
}
Expand Down Expand Up @@ -1131,13 +1137,8 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def
// Should not return more values than available in the current data page.
ARROW_DCHECK_LE(num_def_levels, this->available_values_current_page());
// Check levels read matches `available_values_current_page() > 0`
if (num_def_levels == 0 &&
std::min(batch_size, this->available_values_current_page()) > 0) {
std::stringstream ss;
ss << "Read 0 definition levels, expected "
<< std::min(batch_size, this->available_values_current_page());
ParquetException::EofException(ss.str());
}
ARROW_DCHECK(num_def_levels == 0 ||
std::min(batch_size, this->available_values_current_page()) > 0);
if (non_null_values_to_read != 0) {
*values_read = this->ReadValues(non_null_values_to_read, values);
if (*values_read != non_null_values_to_read) {
Expand Down Expand Up @@ -1422,15 +1423,12 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;

// Not present for non-repeated fields
int64_t levels_read = 0;
int64_t levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->max_rep_level_ > 0) {
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels);
if (rep_levels_read != levels_read) {
throw ParquetException(kErrorRepDefLevelInEqual);
}
} else if (this->max_def_level_ > 0) {
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
}

if (ARROW_PREDICT_FALSE(batch_size != levels_read)) {
Expand Down Expand Up @@ -1601,6 +1599,9 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException(kErrorRepDefLevelInEqual);
}
if (ARROW_PREDICT_FALSE(batch_size != levels_read)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

levels_written_ += levels_read;
int64_t remaining_records = num_records - skipped_records;
Expand Down

0 comments on commit 2cf9591

Please sign in to comment.