Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: compile error due to merge stale PR #646

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl ArrowReader {
try_join!(parquet_file.metadata(), parquet_file.reader())?;
let parquet_file_reader = ArrowFileReader::new(parquet_metadata, parquet_reader);

let should_load_page_index = row_selection_enabled && task.predicate().is_some();
let should_load_page_index = row_selection_enabled && task.predicate.is_some();

// Start creating the record batch stream, which wraps the parquet file reader
let mut record_batch_stream_builder = ParquetRecordBatchStreamBuilder::new_with_options(
Expand Down Expand Up @@ -245,7 +245,7 @@ impl ArrowReader {
record_batch_stream_builder.metadata(),
&selected_row_groups,
&field_id_map,
task.schema(),
&task.schema,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conflicts with #609

)?;

record_batch_stream_builder =
Expand Down
44 changes: 35 additions & 9 deletions crates/iceberg/src/expr/visitors/page_index_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use ordered_float::OrderedFloat;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::page_index::index::Index;
use parquet::format::PageLocation;
use parquet::file::page_index::offset_index::OffsetIndexMetaData;

use crate::expr::visitors::bound_predicate_visitor::{visit, BoundPredicateVisitor};
use crate::expr::{BoundPredicate, BoundReference};
use crate::spec::{Datum, PrimitiveLiteral, PrimitiveType, Schema};
use crate::{Error, ErrorKind, Result};

type OffsetIndex = Vec<Vec<PageLocation>>;
type OffsetIndex = Vec<OffsetIndexMetaData>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conflicts with #626


const IN_PREDICATE_LIMIT: usize = 200;

Expand Down Expand Up @@ -206,13 +206,14 @@ impl<'a> PageIndexEvaluator<'a> {
}

/// returns a list of row counts per page
fn calc_row_counts(&self, offset_index: &[PageLocation]) -> Vec<usize> {
fn calc_row_counts(&self, offset_index: &OffsetIndexMetaData) -> Vec<usize> {
let mut remaining_rows = self.row_group_metadata.num_rows() as usize;
let mut row_counts = Vec::with_capacity(self.offset_index.len());

for (idx, page_location) in offset_index.iter().enumerate() {
let row_count = if idx < offset_index.len() - 1 {
let row_count = (offset_index[idx + 1].first_row_index
let page_locations = offset_index.page_locations();
for (idx, page_location) in page_locations.iter().enumerate() {
let row_count = if idx < page_locations.len() - 1 {
let row_count = (page_locations[idx + 1].first_row_index
- page_location.first_row_index) as usize;
remaining_rows -= row_count;
row_count
Expand Down Expand Up @@ -868,6 +869,7 @@ mod tests {
use parquet::data_type::ByteArray;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::page_index::index::{Index, NativeIndex, PageIndex};
use parquet::file::page_index::offset_index::OffsetIndexMetaData;
use parquet::file::statistics::Statistics;
use parquet::format::{BoundaryOrder, PageLocation};
use parquet::schema::types::{
Expand Down Expand Up @@ -1417,28 +1419,36 @@ mod tests {
Ok(row_group_metadata?)
}

fn create_page_index() -> Result<(Vec<Index>, Vec<Vec<PageLocation>>)> {
fn create_page_index() -> Result<(Vec<Index>, Vec<OffsetIndexMetaData>)> {
let idx_float = Index::FLOAT(NativeIndex::<f32> {
indexes: vec![
PageIndex {
min: None,
max: None,
null_count: Some(1024),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: Some(0.0),
max: Some(10.0),
null_count: Some(0),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: Some(10.0),
max: Some(20.0),
null_count: Some(1),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: None,
repetition_level_histogram: None,
definition_level_histogram: None,
},
],
boundary_order: BoundaryOrder(0), // UNORDERED
Expand All @@ -1450,26 +1460,36 @@ mod tests {
min: Some("AA".into()),
max: Some("DD".into()),
null_count: Some(0),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: Some("DE".into()),
max: Some("DE".into()),
null_count: Some(0),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: Some("DF".into()),
max: Some("UJ".into()),
null_count: Some(1),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: Some(48),
repetition_level_histogram: None,
definition_level_histogram: None,
},
PageIndex {
min: None,
max: None,
null_count: None,
repetition_level_histogram: None,
definition_level_histogram: None,
},
],
boundary_order: BoundaryOrder(0), // UNORDERED
Expand All @@ -1491,8 +1511,14 @@ mod tests {
];

Ok((vec![idx_float, idx_string], vec![
page_locs_float,
page_locs_string,
OffsetIndexMetaData {
page_locations: page_locs_float,
unencoded_byte_array_data_bytes: None,
},
OffsetIndexMetaData {
page_locations: page_locs_string,
unencoded_byte_array_data_bytes: None,
},
]))
}
}
Loading