Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-41321: [C++][Parquet] More strict Parquet level checking #41346

Merged
merged 14 commits into from
May 21, 2024
105 changes: 63 additions & 42 deletions cpp/src/parquet/column_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ inline void CheckNumberDecoded(int64_t number_decoded, int64_t expected) {
std::to_string(expected));
}
}

constexpr std::string_view kErrorRepDefLevelNotMatchesNumValues =
"Number of decoded rep / def levels do not match num_values in page header";

} // namespace

LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
Expand Down Expand Up @@ -907,6 +911,8 @@ class ColumnReaderImplBase {
static_cast<int>(data_size));
}

// Available values in the current data page, value includes repeated values
// and nulls.
int64_t available_values_current_page() const {
return num_buffered_values_ - num_decoded_values_;
}
Expand All @@ -933,7 +939,7 @@ class ColumnReaderImplBase {
int64_t num_buffered_values_;

// The number of values from the current data page that have been decoded
// into memory
// into memory or skipped over.
int64_t num_decoded_values_;

::arrow::MemoryPool* pool_;
Expand Down Expand Up @@ -1026,28 +1032,36 @@ class TypedColumnReaderImpl : public TypedColumnReader<DType>,

// Read definition and repetition levels. Also return the number of definition levels
// and number of values to read. This function is called before reading values.
//
// ReadLevels will throw exception when any num-levels read is not equal to the number
// of the levels can be read.
void ReadLevels(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
int64_t* num_def_levels, int64_t* values_to_read) {
batch_size =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
int64_t* num_def_levels, int64_t* non_null_values_to_read) {
batch_size = std::min(batch_size, this->available_values_current_page());

// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0 && def_levels != nullptr) {
*num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(*num_def_levels != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
// TODO(wesm): this tallying of values-to-decode can be performed with better
// cache-efficiency if fused with the level decoding.
*values_to_read +=
*non_null_values_to_read +=
std::count(def_levels, def_levels + *num_def_levels, this->max_def_level_);
} else {
// Required field, read all values
*values_to_read = batch_size;
if (num_def_levels != nullptr) {
*num_def_levels = 0;
}
*non_null_values_to_read = batch_size;
}

// Not present for non-repeated fields
if (this->max_rep_level_ > 0 && rep_levels != nullptr) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (def_levels != nullptr && *num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
if (batch_size != num_rep_levels) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}
}
Expand Down Expand Up @@ -1090,8 +1104,7 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchWithDictionary(
*indices_read = ReadDictionaryIndices(indices_to_read, indices);
int64_t total_indices = std::max<int64_t>(num_def_levels, *indices_read);
// Some callers use a batch size of 0 just to get the dictionary.
int64_t expected_values =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
int64_t expected_values = std::min(batch_size, this->available_values_current_page());
if (total_indices == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
Expand All @@ -1106,7 +1119,8 @@ template <typename DType>
int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def_levels,
int16_t* rep_levels, T* values,
int64_t* values_read) {
// HasNext invokes ReadNewPage
// HasNext might invoke ReadNewPage until a data page with
// `available_values_current_page() > 0` is found.
if (!HasNext()) {
*values_read = 0;
return 0;
Expand All @@ -1115,20 +1129,27 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatch(int64_t batch_size, int16_t* def
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
int64_t num_def_levels = 0;
int64_t values_to_read = 0;
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels, &values_to_read);

*values_read = this->ReadValues(values_to_read, values);
// Number of non-null values to read within `num_def_levels`.
int64_t non_null_values_to_read = 0;
ReadLevels(batch_size, def_levels, rep_levels, &num_def_levels,
&non_null_values_to_read);
// Should not return more values than available in the current data page.
ARROW_DCHECK_LE(num_def_levels, this->available_values_current_page());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this throw? it's not clear if this can be triggered by bad data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, throw is the duty of ReadLevels, a internal ReadLevel implementation cannot read more data than input batch-size

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the question is: can a corrupt Parquet file trigger this error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never, since:

// ReadLevels will throw exception when any num-levels read is not equal to the number
// of the levels can be read.

Here we just do dcheck to check that ReadLevels has well done this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess if we are concerned with it, it might pay to still check it and throw an exception in case there is in fact a bug, I don't think this code is on the inner loop (it would be nice to have a macro that does this to avoid code bloat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. About macro I think it's just change three line to two, and may hack for func call...so keep it here

if (non_null_values_to_read != 0) {
*values_read = this->ReadValues(non_null_values_to_read, values);
} else {
*values_read = 0;
}
// Adjust total_values, since if max_def_level_ == 0, num_def_levels would
// be 0 and `values_read` would adjust to `available_values_current_page()`.
int64_t total_values = std::max<int64_t>(num_def_levels, *values_read);
int64_t expected_values =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
int64_t expected_values = std::min(batch_size, this->available_values_current_page());
if (total_values == 0 && expected_values > 0) {
std::stringstream ss;
ss << "Read 0 values, expected " << expected_values;
ParquetException::EofException(ss.str());
}
this->ConsumeBufferedValues(total_values);

return total_values;
}

Expand All @@ -1137,29 +1158,33 @@ int64_t TypedColumnReaderImpl<DType>::ReadBatchSpaced(
int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
int64_t* values_read, int64_t* null_count_out) {
// HasNext invokes ReadNewPage
// HasNext might invoke ReadNewPage until a data page with
// `available_values_current_page() > 0` is found.
if (!HasNext()) {
*levels_read = 0;
*values_read = 0;
*null_count_out = 0;
return 0;
}

// Number of non-null values to read
int64_t total_values;
// TODO(wesm): keep reading data pages until batch_size is reached, or the
// row group is finished
batch_size =
std::min(batch_size, this->num_buffered_values_ - this->num_decoded_values_);
batch_size = std::min(batch_size, this->available_values_current_page());

// If the field is required and non-repeated, there are no definition levels
if (this->max_def_level_ > 0) {
int64_t num_def_levels = this->ReadDefinitionLevels(batch_size, def_levels);
if (ARROW_PREDICT_FALSE(num_def_levels != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

// Not present for non-repeated fields
if (this->max_rep_level_ > 0) {
int64_t num_rep_levels = this->ReadRepetitionLevels(batch_size, rep_levels);
if (num_def_levels != num_rep_levels) {
throw ParquetException("Number of decoded rep / def levels did not match");
if (ARROW_PREDICT_FALSE(num_def_levels != num_rep_levels)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
}

Expand Down Expand Up @@ -1401,26 +1426,21 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;

// Not present for non-repeated fields
int64_t levels_read = 0;
if (ARROW_PREDICT_FALSE(this->ReadDefinitionLevels(batch_size, def_levels) !=
batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (this->max_rep_level_ > 0) {
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException("Number of decoded rep / def levels did not match");
int64_t rep_levels_read = this->ReadRepetitionLevels(batch_size, rep_levels);
if (ARROW_PREDICT_FALSE(rep_levels_read != batch_size)) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
} else if (this->max_def_level_ > 0) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This branch is useless...

levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
}

// Exhausted column chunk
if (levels_read == 0) {
break;
}

levels_written_ += levels_read;
levels_written_ += batch_size;
records_read += ReadRecordData(num_records - records_read);
} else {
// No repetition or definition levels
// No repetition and definition levels, we can read values directly
batch_size = std::min(num_records - records_read, batch_size);
records_read += ReadRecordData(batch_size);
}
Expand Down Expand Up @@ -1574,13 +1594,14 @@ class TypedRecordReader : public TypedColumnReaderImpl<DType>,
int16_t* def_levels = this->def_levels() + levels_written_;
int16_t* rep_levels = this->rep_levels() + levels_written_;

int64_t levels_read = 0;
levels_read = this->ReadDefinitionLevels(batch_size, def_levels);
if (this->ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
throw ParquetException("Number of decoded rep / def levels did not match");
if (this->ReadDefinitionLevels(batch_size, def_levels) != batch_size) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}
if (this->ReadRepetitionLevels(batch_size, rep_levels) != batch_size) {
throw ParquetException(kErrorRepDefLevelNotMatchesNumValues);
}

levels_written_ += levels_read;
levels_written_ += batch_size;
int64_t remaining_records = num_records - skipped_records;
// This updates at_record_start_.
skipped_records += DelimitAndSkipRecordsInBuffer(remaining_records);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/parquet/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class PARQUET_EXPORT ColumnReader {
template <typename DType>
class TypedColumnReader : public ColumnReader {
public:
typedef typename DType::c_type T;
using T = typename DType::c_type;

// Read a batch of repetition levels, definition levels, and values from the
// column.
Expand Down
76 changes: 75 additions & 1 deletion cpp/src/parquet/column_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
&descr, values, /*num_values=*/2, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/{},
/*max_rep_level=*/0);
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
Expand All @@ -431,6 +431,80 @@ TEST_F(TestPrimitiveReader, TestReadValuesMissing) {
ParquetException);
}

// GH-41321: When max_def_level > 0 or max_rep_level > 0, and
// Page has more or less levels than the `num_values` in
// PageHeader. We should detect and throw exception.
TEST_F(TestPrimitiveReader, DefRepLevelNotExpected) {
auto do_check = [&](const NodePtr& type, const std::vector<int16_t>& input_def_levels,
const std::vector<int16_t>& input_rep_levels, int num_values) {
std::vector<bool> values(num_values, false);
const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);

// The data page falls back to plain encoding
std::shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
std::shared_ptr<DataPageV1> data_page = MakeDataPage<BooleanType>(
&descr, values, /*num_values=*/num_values, Encoding::PLAIN, /*indices=*/{},
/*indices_size=*/0, /*def_levels=*/input_def_levels, max_def_level_,
/*rep_levels=*/input_rep_levels,
/*max_rep_level=*/max_rep_level_);
pages_.push_back(data_page);
InitReader(&descr);
auto reader = static_cast<BoolReader*>(reader_.get());
ASSERT_TRUE(reader->HasNext());

constexpr int batch_size = 10;
std::vector<int16_t> def_levels(batch_size, 0);
std::vector<int16_t> rep_levels(batch_size, 0);
bool values_out[batch_size];
int64_t values_read;
EXPECT_THROW_THAT(
[&]() {
reader->ReadBatch(batch_size, def_levels.data(), rep_levels.data(), values_out,
&values_read);
},
ParquetException,
::testing::Property(&ParquetException::what,
::testing::HasSubstr("Number of decoded rep / def levels do "
"not match num_values in page header")));
};
// storing def-levels less than value in page-header
{
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
std::vector<int16_t> input_def_levels(1, 1);
std::vector<int16_t> input_rep_levels{};
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/3);
}
// storing def-levels more than value in page-header
{
max_def_level_ = 1;
max_rep_level_ = 0;
NodePtr type = schema::Boolean("a", Repetition::OPTIONAL);
std::vector<int16_t> input_def_levels(2, 1);
std::vector<int16_t> input_rep_levels{};
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
}
// storing rep-levels less than value in page-header
{
max_def_level_ = 0;
max_rep_level_ = 1;
NodePtr type = schema::Boolean("a", Repetition::REPEATED);
std::vector<int16_t> input_def_levels{};
std::vector<int16_t> input_rep_levels(3, 0);
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/4);
}
// storing rep-levels more than value in page-header
{
max_def_level_ = 0;
max_rep_level_ = 1;
NodePtr type = schema::Boolean("a", Repetition::REPEATED);
std::vector<int16_t> input_def_levels{};
std::vector<int16_t> input_rep_levels(2, 1);
do_check(type, input_def_levels, input_rep_levels, /*num_values=*/1);
}
pitrou marked this conversation as resolved.
Show resolved Hide resolved
}

// Repetition level byte length reported in Page but Max Repetition level
// is zero for the column.
TEST_F(TestPrimitiveReader, TestRepetitionLvlBytesWithMaxRepetitionZero) {
Expand Down
Loading