-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-41321: [C++][Parquet] More strict Parquet level checking #41346
Merged
Merged
Changes from 13 commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
31d628a
Update level and batch-read checkings
mapleFU 7df0415
Revert changes in column-reader (since it's different from RecordReader)
mapleFU 4930aae
Refactor handling for error
mapleFU 2cf9591
tweaks
mapleFU db35d8b
Fix test and add more tests
mapleFU 07c2386
Merge branch 'main' into more-checks-for-read-levels
mapleFU 7b37a29
fix comment
mapleFU e36680c
Merge branch 'main' into more-checks-for-read-levels
mapleFU 762eb33
update level checking
mapleFU c082f8d
Update test and err message
mapleFU ca7d804
add more tests
mapleFU f50a393
revert back parquet-testing change
mapleFU 3d58afe
update impl
mapleFU 5e46d61
resolve comment
mapleFU File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -101,6 +101,10 @@ inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) { | |
std::to_string(expected)); | ||
} | ||
} | ||
|
||
constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues = | ||
"Number of decoded rep / def levels do not match num_values in page header"; | ||
|
||
} // namespace | ||
|
||
LevelDecoder::LevelDecoder() : num_values_remaining_(0) {} | ||
|
@@ -907,6 +911,8 @@ class ColumnReaderImplBase { | |
static_cast<int>(data_size)); | ||
} | ||
|
||
// Available values in the current data page, value includes repeated values | ||
// and nulls. | ||
int64_t available_values_current_page() const { | ||
return num_buffered_values_ - num_decoded_values_; | ||
} | ||
|
@@ -933,7 +939,7 @@ class ColumnReaderImplBase { | |
int64_t num_buffered_values_; | ||
|
||
// The number of values from the current data page that have been decoded | ||
// into memory | ||
// into memory or skipped over. | ||
int64_t num_decoded_values_; | ||
|
||
::arrow::MemoryPool* pool_; | ||
|
@@ -1026,28 +1032,36 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>, | |
|
||
// Read definition and repetition levels. Also return the number of definition levels | ||
// and number of values to read. This function is called before reading values. | ||
// | ||
// ReadLevels will throw exception when any num-levels read is not equal to the number | ||
// of the levels can be read. | ||
void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, | ||
int64_t* num_def_levels, int64_t* values_to_read) { | ||
batch_size = | ||
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); | ||
int64_t* num_def_levels, int64_t* non_null_values_to_read) { | ||
batch_size = std::min(batch_size, this->available_values_current_page()); | ||
|
||
// If the field is required and non-repeated, there are no definition levels | ||
if (this->max_def_level_ > 0 && def_levels != nullptr) { | ||
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); | ||
if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) { | ||
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); | ||
} | ||
// TODO(wesm): this tallying of values-to-decode can be performed with better | ||
// cache-efficiency if fused with the level decoding. | ||
*values_to_read += | ||
*non_null_values_to_read += | ||
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_); | ||
} else { | ||
// Required field, read all values | ||
*values_to_read = batch_size; | ||
if (num_def_levels != nullptr) { | ||
*num_def_levels = 0; | ||
} | ||
*non_null_values_to_read = batch_size; | ||
} | ||
|
||
// Not present for non-repeated fields | ||
if (this->max_rep_level_ > 0 && rep_levels != nullptr) { | ||
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); | ||
if (def_levels != nullptr && *num_def_levels != num_rep_levels) { | ||
throw ParquetException("Number of decoded rep / def levels did not match"); | ||
if (batch_size != num_rep_levels) { | ||
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); | ||
} | ||
} | ||
} | ||
|
@@ -1090,8 +1104,7 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary( | |
*indices_read = ReadDictionaryIndices(indices_to_read, indices); | ||
int64_t total_indices = std::max<int64_t>(num_def_levels, *indices_read); | ||
// Some callers use a batch size of 0 just to get the dictionary. | ||
int64_t expected_values = | ||
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); | ||
int64_t expected_values = std::min(batch_size, this->available_values_current_page()); | ||
if (total_indices == 0 && expected_values > 0) { | ||
std::stringstream ss; | ||
ss << "Read 0 values, expected " << expected_values; | ||
|
@@ -1106,7 +1119,8 @@ template <typename DType> | |
int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels, | ||
int16_t* rep_levels, T* values, | ||
int64_t* values_read) { | ||
// HasNext invokes ReadNewPage | ||
// HasNext might invoke ReadNewPage until a data page with | ||
// `available_values_current_page() > 0` is found. | ||
if (!HasNext()) { | ||
*values_read = 0; | ||
return 0; | ||
|
@@ -1115,20 +1129,27 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def | |
// TODO(wesm): keep reading data pages until batch_size is reached, or the | ||
// row group is finished | ||
int64_t num_def_levels = 0; | ||
int64_t values_to_read = 0; | ||
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read); | ||
|
||
*values_read = this->ReadValues(values_to_read, values); | ||
// Number of non-null values to read within `num_def_levels`. | ||
int64_t non_null_values_to_read = 0; | ||
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, | ||
&non_null_values_to_read); | ||
// Should not return more values than available in the current data page. | ||
ARROW_DCHECK_LE(num_def_levels, this->available_values_current_page()); | ||
if (non_null_values_to_read != 0) { | ||
*values_read = this->ReadValues(non_null_values_to_read, values); | ||
} else { | ||
*values_read = 0; | ||
} | ||
// Adjust total_values, since if max_def_level_ == 0, num_def_levels would | ||
// be 0 and `values_read` would adjust to `available_values_current_page()`. | ||
int64_t total_values = std::max<int64_t>(num_def_levels, *values_read); | ||
int64_t expected_values = | ||
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); | ||
int64_t expected_values = std::min(batch_size, this->available_values_current_page()); | ||
if (total_values == 0 && expected_values > 0) { | ||
std::stringstream ss; | ||
ss << "Read 0 values, expected " << expected_values; | ||
ParquetException::EofException(ss.str()); | ||
} | ||
this->ConsumeBufferedValues(total_values); | ||
|
||
return total_values; | ||
} | ||
|
||
|
@@ -1137,29 +1158,33 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced( | |
int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, | ||
uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read, | ||
int64_t* values_read, int64_t* null_count_out) { | ||
// HasNext invokes ReadNewPage | ||
// HasNext might invoke ReadNewPage until a data page with | ||
// `available_values_current_page() > 0` is found. | ||
if (!HasNext()) { | ||
*levels_read = 0; | ||
*values_read = 0; | ||
*null_count_out = 0; | ||
return 0; | ||
} | ||
|
||
// Number of non-null values to read | ||
int64_t total_values; | ||
// TODO(wesm): keep reading data pages until batch_size is reached, or the | ||
// row group is finished | ||
batch_size = | ||
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_); | ||
batch_size = std::min(batch_size, this->available_values_current_page()); | ||
|
||
// If the field is required and non-repeated, there are no definition levels | ||
if (this->max_def_level_ > 0) { | ||
int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels); | ||
if (ARROW_PREDICT_FALSE(num_def_levels != batch_size)) { | ||
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); | ||
} | ||
|
||
// Not present for non-repeated fields | ||
if (this->max_rep_level_ > 0) { | ||
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels); | ||
if (num_def_levels != num_rep_levels) { | ||
throw ParquetException("Number of decoded rep / def levels did not match"); | ||
if (ARROW_PREDICT_FALSE(num_def_levels != num_rep_levels)) { | ||
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); | ||
} | ||
} | ||
|
||
|
@@ -1401,26 +1426,21 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, | |
int16_t* def_levels = this->def_levels() + levels_written_; | ||
int16_t* rep_levels = this->rep_levels() + levels_written_; | ||
|
||
// Not present for non-repeated fields | ||
int64_t levels_read = 0; | ||
if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size, def_levels) != | ||
batch_size)) { | ||
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); | ||
} | ||
if (this->max_rep_level_ > 0) { | ||
levels_read = this->ReadDefinitionLevels(batch_size, def_levels); | ||
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { | ||
throw ParquetException("Number of decoded rep / def levels did not match"); | ||
int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels); | ||
if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) { | ||
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); | ||
} | ||
} else if (this->max_def_level_ > 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This branch is useless... |
||
levels_read = this->ReadDefinitionLevels(batch_size, def_levels); | ||
} | ||
|
||
// Exhausted column chunk | ||
if (levels_read == 0) { | ||
break; | ||
} | ||
|
||
levels_written_ += levels_read; | ||
levels_written_ += batch_size; | ||
records_read += ReadRecordData(num_records - records_read); | ||
} else { | ||
// No repetition or definition levels | ||
// No repetition and definition levels, we can read values directly | ||
batch_size = std::min(num_records - records_read, batch_size); | ||
records_read += ReadRecordData(batch_size); | ||
} | ||
|
@@ -1574,13 +1594,14 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>, | |
int16_t* def_levels = this->def_levels() + levels_written_; | ||
int16_t* rep_levels = this->rep_levels() + levels_written_; | ||
|
||
int64_t levels_read = 0; | ||
levels_read = this->ReadDefinitionLevels(batch_size, def_levels); | ||
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) { | ||
throw ParquetException("Number of decoded rep / def levels did not match"); | ||
if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) { | ||
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); | ||
} | ||
if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) { | ||
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues); | ||
} | ||
|
||
levels_written_ += levels_read; | ||
levels_written_ += batch_size; | ||
int64_t remaining_records = num_records - skipped_records; | ||
// This updates at_record_start_. | ||
skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records); | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this throw? it's not clear if this can be triggered by bad data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, throw is the duty of
ReadLevels
, a internalReadLevel
implementation cannot read more data than input batch-sizeThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the question is: can a corrupt Parquet file trigger this error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never, since:
Here we just do dcheck to check that
ReadLevels
has well done thisThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess if we are concerned with it, it might pay to still check it and throw an exception in case there is in fact a bug, I don't think this code is on the inner loop (it would be nice to have a macro that does this to avoid code bloat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. About macro I think it's just change three line to two, and may hack for func call...so keep it here