Skip to content

Commit

Permalink
[fix](parquet) fix dict page offset zero incompatible behaviour (apac…
Browse files Browse the repository at this point in the history
…he#41506)

## Proposed changes
Reason: https://issues.apache.org/jira/browse/ARROW-5322
Java readers(parquet-mr) handles "dictionaryPageOffset = 0" to determine
if dictionary page exists where as the C readers uses
"has_dictionaryPageOffset" (_isset bit in thrift message) to determine
the same resulting in incompatible behaviours.
Therefore, we should consider that dicttionary page exists when both
`__isset.dictionary_page_offset` is true and `dictionary_page_offset` is
greater than 0.
  • Loading branch information
suxiaogang223 authored and eldenmoon committed Oct 10, 2024
1 parent 426b34b commit d3c5750
Show file tree
Hide file tree
Showing 6 changed files with 13 additions and 22 deletions.
11 changes: 7 additions & 4 deletions be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,15 @@ ColumnChunkReader::ColumnChunkReader(io::BufferedStreamReader* reader,
_io_ctx(io_ctx) {}

Status ColumnChunkReader::init() {
size_t start_offset = _metadata.__isset.dictionary_page_offset
? _metadata.dictionary_page_offset
: _metadata.data_page_offset;
size_t start_offset =
_has_dict_page() ? _metadata.dictionary_page_offset : _metadata.data_page_offset;
size_t chunk_size = _metadata.total_compressed_size;
// create page reader
_page_reader = create_page_reader(_stream_reader, _io_ctx, start_offset, chunk_size,
_metadata.num_values, _offset_index);
// get the block compression codec
RETURN_IF_ERROR(get_block_compression_codec(_metadata.codec, &_block_compress_codec));
if (_metadata.__isset.dictionary_page_offset) {
if (_has_dict_page()) {
// seek to the directory page
_page_reader->seek_to_page(_metadata.dictionary_page_offset);
// Parse dictionary data when reading
Expand All @@ -82,6 +81,10 @@ Status ColumnChunkReader::init() {
return Status::OK();
}

bool ColumnChunkReader::_has_dict_page() const {
return _metadata.__isset.dictionary_page_offset && _metadata.dictionary_page_offset > 0;
}

Status ColumnChunkReader::next_page() {
if (_state == HEADER_PARSED) {
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class ColumnChunkReader {
private:
enum ColumnChunkReaderState { NOT_INIT, INITIALIZED, HEADER_PARSED, DATA_LOADED, PAGE_SKIPPED };

bool _has_dict_page() const;
Status _decode_dict_page();
void _reserve_decompress_buf(size_t size);
int32_t _get_type_length();
Expand Down
7 changes: 4 additions & 3 deletions be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,10 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end
Status ScalarColumnReader::init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size) {
_field_schema = field;
auto& chunk_meta = _chunk_meta.meta_data;
int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
int64_t chunk_start =
chunk_meta.__isset.dictionary_page_offset && chunk_meta.dictionary_page_offset > 0
? chunk_meta.dictionary_page_offset
: chunk_meta.data_page_offset;
size_t chunk_len = chunk_meta.total_compressed_size;
size_t prefetch_buffer_size = std::min(chunk_len, max_buf_size);
if (typeid_cast<io::MergeRangeFileReader*>(file.get())) {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,10 @@ suite("test_hdfs_parquet_group0","external,hive,tvf,external_docker") {


uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group0/dict-page-offset-zero.parquet"
test {
sql """ select * from HDFS(
order_qt_test_27 """ select * from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "parquet") limit 10; """
exception "Failed to deserialize parquet page header. offset: 0, header size: 40, end offset: 40, real header size: 40"
}


uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group0/repeated_no_annotation.parquet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,17 +218,6 @@ suite("test_hdfs_parquet_group5","external,hive,tvf,external_docker") {
"hadoop.username" = "${hdfsUserName}",
"format" = "parquet") limit 10; """


uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group5/dict-page-offset-zero.parquet"
test {
sql """ select * from HDFS(
"uri" = "${uri}",
"hadoop.username" = "${hdfsUserName}",
"format" = "parquet") limit 10; """
exception "Failed to deserialize parquet page header. offset: 0, header size: 40, end offset: 40, real header size: 40"
}


uri = "${defaultFS}" + "/user/doris/tvf_data/test_hdfs_parquet/group5/native_parquet_reader.parquet"
order_qt_test_28 """ select * from HDFS(
"uri" = "${uri}",
Expand Down

0 comments on commit d3c5750

Please sign in to comment.