Skip to content

Commit

Permalink
feat(rust): optimize column load of row groups with predicate(#13608)
Browse files Browse the repository at this point in the history
  • Loading branch information
bchalk101 authored and Boruch Chalk committed May 31, 2024
1 parent d7b4f72 commit c04ae0f
Show file tree
Hide file tree
Showing 25 changed files with 434 additions and 93 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ strum_macros = "0.26"
thiserror = "1"
tokio = "1.26"
tokio-util = "0.7.8"
tokio-stream = "0.1.15"
unicode-reverse = "1.0.8"
url = "2.4"
version_check = "0.9.4"
Expand Down
13 changes: 13 additions & 0 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,19 @@ impl PhysicalIoExpr for PhysicalIoHelper {
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
self.expr.as_stats_evaluator()
}

fn columns(&self) -> Vec<String> {
let mut arena: Arena<AExpr> = Arena::new();
to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena);
let mut columns = vec![];
for _ in 0..arena.len() {
let node = arena.pop().unwrap();
if let AExpr::Column(s) = node {
columns.push(s.as_ref().to_string())
}
}
columns
}
}

pub fn phys_expr_to_io_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalIoExpr> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-expr/src/state/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,4 @@ impl Clone for ExecutionState {
stop: self.stop.clone(),
}
}
}
}
29 changes: 3 additions & 26 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;

use super::mmap::ColumnStore;
use super::predicates::read_this_row_group;
use super::read_impl::compute_row_group_range;
use crate::cloud::{build_object_store, CloudLocation, CloudOptions, PolarsObjectStore};
use crate::parquet::metadata::FileMetaDataRef;
use crate::pl_async::get_runtime;
use crate::predicates::PhysicalIoExpr;

type DownloadedRowGroup = Vec<(u64, Bytes)>;
type QueuePayload = (usize, DownloadedRowGroup);
type QueueSend = Arc<Sender<PolarsResult<QueuePayload>>>;

#[derive(Clone)]
pub struct ParquetObjectStore {
store: PolarsObjectStore,
path: ObjectPath,
Expand Down Expand Up @@ -266,10 +265,9 @@ pub struct FetchRowGroupsFromObjectStore {

impl FetchRowGroupsFromObjectStore {
pub fn new(
reader: ParquetObjectStore,
reader: Arc<ParquetObjectStore>,
schema: ArrowSchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_groups: &[RowGroupMetaData],
limit: usize,
) -> PolarsResult<Self> {
Expand All @@ -283,28 +281,7 @@ impl FetchRowGroupsFromObjectStore {
let row_groups_end = compute_row_group_range(0, row_groups.len(), limit, row_groups);
let row_groups = &row_groups[0..row_groups_end];

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
row_groups
.iter()
.enumerate()
.filter(|(i, rg)| {
let should_be_read =
matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true));

// Already add the row groups that will be skipped to the prefetched data.
if !should_be_read {
prefetched.insert(*i, Default::default());
}
should_be_read
})
.map(|(i, rg)| (i, rg.clone()))
.collect::<Vec<_>>()
} else {
row_groups.iter().cloned().enumerate().collect()
};
let reader = Arc::new(reader);
let mut row_groups = row_groups.iter().cloned().enumerate().collect::<Vec<_>>();
let msg_limit = get_rg_prefetch_size();

if verbose() {
Expand Down
54 changes: 27 additions & 27 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use arrow::datatypes::ArrowSchemaRef;
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData};
use polars_parquet::read::{self, ArrayIter, PhysicalType, RowGroupMetaData};
use rayon::prelude::*;

#[cfg(feature = "cloud")]
Expand Down Expand Up @@ -49,7 +49,7 @@ fn assert_dtypes(data_type: &ArrowDataType) {
}
}

fn column_idx_to_series(
pub fn column_idx_to_series(
column_i: usize,
md: &RowGroupMetaData,
remaining_rows: usize,
Expand Down Expand Up @@ -175,7 +175,7 @@ fn rg_to_dfs(
row_group_start: usize,
row_group_end: usize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -191,7 +191,7 @@ fn rg_to_dfs(
row_group_start,
row_group_end,
remaining_rows,
file_metadata,
row_group_metadata,
schema,
predicate,
row_index,
Expand All @@ -207,7 +207,7 @@ fn rg_to_dfs(
row_group_end,
previous_row_count,
remaining_rows,
file_metadata,
row_group_metadata,
schema,
predicate,
row_index,
Expand All @@ -226,7 +226,7 @@ fn rg_to_dfs_optionally_par_over_columns(
row_group_start: usize,
row_group_end: usize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -237,13 +237,14 @@ fn rg_to_dfs_optionally_par_over_columns(
) -> PolarsResult<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(row_group_end - row_group_start);

for rg_idx in row_group_start..row_group_end {
let md = &file_metadata.row_groups[rg_idx];
for md in row_group_metadata
.iter()
.take(row_group_end)
.skip(row_group_start)
{
let current_row_count = md.num_rows() as IdxSize;

if use_statistics
&& !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)?
{
if use_statistics && !read_this_row_group(predicate, md, schema)? {
*previous_row_count += current_row_count;
continue;
}
Expand Down Expand Up @@ -315,7 +316,7 @@ fn rg_to_dfs_par_over_rg(
row_group_end: usize,
previous_row_count: &mut IdxSize,
remaining_rows: &mut usize,
file_metadata: &FileMetaData,
row_group_metadata: &[RowGroupMetaData],
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
row_index: Option<RowIndex>,
Expand All @@ -324,8 +325,7 @@ fn rg_to_dfs_par_over_rg(
hive_partition_columns: Option<&[Series]>,
) -> PolarsResult<Vec<DataFrame>> {
// compute the limits per row group and the row count offsets
let row_groups = file_metadata
.row_groups
let row_groups = row_group_metadata
.iter()
.enumerate()
.skip(row_group_start)
Expand All @@ -347,11 +347,7 @@ fn rg_to_dfs_par_over_rg(
.map(|(rg_idx, md, projection_height, row_count_start)| {
if projection_height == 0
|| use_statistics
&& !read_this_row_group(
predicate,
&file_metadata.row_groups[rg_idx],
schema,
)?
&& !read_this_row_group(predicate, &row_group_metadata[rg_idx], schema)?
{
return Ok(None);
}
Expand Down Expand Up @@ -463,7 +459,7 @@ pub fn read_parquet<R: MmapBytesReader>(
0,
n_row_groups,
&mut limit,
&file_metadata,
&file_metadata.row_groups,
reader_schema,
predicate,
row_index.clone(),
Expand Down Expand Up @@ -563,7 +559,7 @@ pub struct BatchedParquetReader {
limit: usize,
projection: Arc<[usize]>,
schema: ArrowSchemaRef,
metadata: FileMetaDataRef,
row_group_metadata: Vec<RowGroupMetaData>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
row_index: Option<RowIndex>,
rows_read: IdxSize,
Expand All @@ -582,7 +578,7 @@ impl BatchedParquetReader {
#[allow(clippy::too_many_arguments)]
pub fn new(
row_group_fetcher: RowGroupFetcher,
metadata: FileMetaDataRef,
row_group_metadata: Vec<RowGroupMetaData>,
schema: ArrowSchemaRef,
limit: usize,
projection: Option<Vec<usize>>,
Expand All @@ -593,7 +589,7 @@ impl BatchedParquetReader {
hive_partition_columns: Option<Vec<Series>>,
mut parallel: ParallelStrategy,
) -> PolarsResult<Self> {
let n_row_groups = metadata.row_groups.len();
let n_row_groups = row_group_metadata.len();
let projection = projection
.map(Arc::from)
.unwrap_or_else(|| (0usize..schema.len()).collect::<Arc<[_]>>());
Expand All @@ -618,7 +614,7 @@ impl BatchedParquetReader {
limit,
projection,
schema,
metadata,
row_group_metadata,
row_index,
rows_read: 0,
predicate,
Expand All @@ -637,6 +633,10 @@ impl BatchedParquetReader {
self.limit == 0
}

pub fn num_row_groups(&self) -> usize {
self.row_group_metadata.len()
}

pub fn schema(&self) -> &ArrowSchemaRef {
&self.schema
}
Expand Down Expand Up @@ -669,7 +669,7 @@ impl BatchedParquetReader {
row_group_start,
row_group_start + n,
self.limit,
&self.metadata.row_groups,
&self.row_group_metadata,
);

let store = self
Expand All @@ -684,7 +684,7 @@ impl BatchedParquetReader {
row_group_start,
row_group_end,
&mut self.limit,
&self.metadata,
&self.row_group_metadata,
&self.schema,
self.predicate.as_deref(),
self.row_index.clone(),
Expand All @@ -708,7 +708,7 @@ impl BatchedParquetReader {
let row_index = self.row_index.clone();
let predicate = self.predicate.clone();
let schema = self.schema.clone();
let metadata = self.metadata.clone();
let metadata = self.row_group_metadata.clone();
let parallel = self.parallel;
let projection = self.projection.clone();
let use_statistics = self.use_statistics;
Expand Down
Loading

0 comments on commit c04ae0f

Please sign in to comment.