Skip to content

Commit

Permalink
feat(rust): optimize column load of row groups with predicate(#13608)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchalk101 authored and Boruch Chalk committed May 16, 2024
1 parent 11fe9d8 commit c77cf72
Show file tree
Hide file tree
Showing 11 changed files with 295 additions and 63 deletions.
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type DownloadedRowGroup = Vec<(u64, Bytes)>;
type QueuePayload = (usize, DownloadedRowGroup);
type QueueSend = Arc<Sender<PolarsResult<QueuePayload>>>;

#[derive(Clone)]
pub struct ParquetObjectStore {
store: PolarsObjectStore,
path: ObjectPath,
Expand Down Expand Up @@ -266,7 +267,7 @@ pub struct FetchRowGroupsFromObjectStore {

impl FetchRowGroupsFromObjectStore {
pub fn new(
reader: ParquetObjectStore,
reader: Arc<ParquetObjectStore>,
schema: ArrowSchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
Expand Down Expand Up @@ -304,7 +305,6 @@ impl FetchRowGroupsFromObjectStore {
} else {
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);
let msg_limit = get_rg_prefetch_size();

if verbose() {
Expand Down
54 changes: 27 additions & 27 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read;
use polars_parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData};
use polars_parquet::read::{ArrayIter, RowGroupMetaData};
use rayon::prelude::*;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -49,7 +49,7 @@ fn assert_dtypes(data_type: &ArrowDataType) {
}
}

fn column_idx_to_series(
pub fn column_idx_to_series(
column_i: usize,
md: &RowGroupMetaData,
remaining_rows: usize,
Expand Down Expand Up @@ -132,7 +132,7 @@ fn rg_to_dfs(
row_group_start: usize,
row_group_end: usize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -148,7 +148,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
remaining_rows,
file_metadata,
row_group_metadata,
schema,
predicate,
row_index,
Expand All @@ -164,7 +164,7 @@ fn rg_to_dfs(
row_group_end,
previous_row_count,
remaining_rows,
file_metadata,
row_group_metadata,
schema,
predicate,
row_index,
Expand All @@ -183,7 +183,7 @@ fn rg_to_dfs_optionally_par_over_columns(
row_group_start: usize,
row_group_end: usize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -194,13 +194,14 @@ fn rg_to_dfs_optionally_par_over_columns(
) -> PolarsResult<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];
for md in row_group_metadata
.iter()
.take(row_group_end)
.skip(row_group_start)
{
let current_row_count = md.num_rows() as IdxSize;

if use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
if use_statistics && !read_this_row_group(predicate, md, schema)? {
*previous_row_count += current_row_count;
continue;
}
Expand Down Expand Up @@ -272,7 +273,7 @@ fn rg_to_dfs_par_over_rg(
row_group_end: usize,
previous_row_count: &mut IdxSize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -281,8 +282,7 @@ fn rg_to_dfs_par_over_rg(
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
// compute the limits per row group and the row count offsets
let row_groups = file_metadata
.row_groups
let row_groups = row_group_metadata
.iter()
.enumerate()
.skip(row_group_start)
Expand All @@ -304,11 +304,7 @@ fn rg_to_dfs_par_over_rg(
.map(|(rg_idx, md, projection_height, row_count_start)| {
if projection_height == 0
|| use_statistics
&& !read_this_row_group(
predicate,
&file_metadata.row_groups[rg_idx],
schema,
)?
&& !read_this_row_group(predicate, &row_group_metadata[rg_idx], schema)?
{
return Ok(None);
}
Expand Down Expand Up @@ -420,7 +416,7 @@ pub fn read_parquet<R: MmapBytesReader>(
0,
n_row_groups,
&mut limit,
&file_metadata,
&file_metadata.row_groups,
reader_schema,
predicate,
row_index.clone(),
Expand Down Expand Up @@ -520,7 +516,7 @@ pub struct BatchedParquetReader {
limit: usize,
projection: Arc<[usize]>,
schema: ArrowSchemaRef,
metadata: FileMetaDataRef,
row_group_metadata: Vec<RowGroupMetaData>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_index: Option<RowIndex>,
rows_read: IdxSize,
Expand All @@ -539,7 +535,7 @@ impl BatchedParquetReader {
#[allow(clippy::too_many_arguments)]
pub fn new(
row_group_fetcher: RowGroupFetcher,
metadata: FileMetaDataRef,
row_group_metadata: Vec<RowGroupMetaData>,
schema: ArrowSchemaRef,
limit: usize,
projection: Option<Vec<usize>>,
Expand All @@ -550,7 +546,7 @@ impl BatchedParquetReader {
hive_partition_columns: Option<Vec<Series>>,
mut parallel: ParallelStrategy,
) -> PolarsResult<Self> {
let n_row_groups = metadata.row_groups.len();
let n_row_groups = row_group_metadata.len();
let projection = projection
.map(Arc::from)
.unwrap_or_else(|| (0usize..schema.len()).collect::<Arc<[_]>>());
Expand All @@ -575,7 +571,7 @@ impl BatchedParquetReader {
limit,
projection,
schema,
metadata,
row_group_metadata,
row_index,
rows_read: 0,
predicate,
Expand All @@ -594,6 +590,10 @@ impl BatchedParquetReader {
self.limit == 0
}

pub fn num_row_groups(&self) -> usize {
self.row_group_metadata.len()
}

pub fn schema(&self) -> &ArrowSchemaRef {
&self.schema
}
Expand Down Expand Up @@ -626,7 +626,7 @@ impl BatchedParquetReader {
row_group_start,
row_group_start + n,
self.limit,
&self.metadata.row_groups,
&self.row_group_metadata,
);

let store = self
Expand All @@ -641,7 +641,7 @@ impl BatchedParquetReader {
row_group_start,
row_group_end,
&mut self.limit,
&self.metadata,
&self.row_group_metadata,
&self.schema,
self.predicate.as_deref(),
self.row_index.clone(),
Expand All @@ -665,7 +665,7 @@ impl BatchedParquetReader {
let row_index = self.row_index.clone();
let predicate = self.predicate.clone();
let schema = self.schema.clone();
let metadata = self.metadata.clone();
let metadata = self.row_group_metadata.clone();
let parallel = self.parallel;
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
Expand Down
Loading

0 comments on commit c77cf72

Please sign in to comment.