diff --git a/Cargo.lock b/Cargo.lock index 2703f5de5a088..2bf94b33b58f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3028,6 +3028,7 @@ version = "0.40.0" dependencies = [ "ahash", "bitflags 2.5.0", + "crossbeam-channel", "futures", "glob", "once_cell", @@ -3148,6 +3149,7 @@ dependencies = [ "chrono", "chrono-tz", "ciborium", + "crossbeam-channel", "either", "futures", "hashbrown", diff --git a/Cargo.toml b/Cargo.toml index 8ef575594314d..4fb5083528083 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -79,6 +79,7 @@ strum_macros = "0.26" thiserror = "1" tokio = "1.26" tokio-util = "0.7.8" +tokio-stream = "0.1.15" unicode-reverse = "1.0.8" url = "2.4" version_check = "0.9.4" diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index 70344374f42cc..bad6ccb45c77c 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -616,6 +616,19 @@ impl PhysicalIoExpr for PhysicalIoHelper { fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> { self.expr.as_stats_evaluator() } + + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + to_aexpr(self.expr.as_expression().unwrap().clone(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } pub fn phys_expr_to_io_expr(expr: Arc) -> Arc { diff --git a/crates/polars-expr/src/state/execution_state.rs b/crates/polars-expr/src/state/execution_state.rs index 90798ef6b8ee8..f0b8bf351068b 100644 --- a/crates/polars-expr/src/state/execution_state.rs +++ b/crates/polars-expr/src/state/execution_state.rs @@ -266,4 +266,4 @@ impl Clone for ExecutionState { stop: self.stop.clone(), } } -} +} \ No newline at end of file diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index 32ca3a92a8a17..e8cc0db0edde7 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -14,17 +14,16 @@ use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; use super::mmap::ColumnStore; -use super::predicates::read_this_row_group; use super::read_impl::compute_row_group_range; use crate::cloud::{build_object_store, CloudLocation, CloudOptions, PolarsObjectStore}; use crate::parquet::metadata::FileMetaDataRef; use crate::pl_async::get_runtime; -use crate::predicates::PhysicalIoExpr; type DownloadedRowGroup = Vec<(u64, Bytes)>; type QueuePayload = (usize, DownloadedRowGroup); type QueueSend = Arc>>; +#[derive(Clone)] pub struct ParquetObjectStore { store: PolarsObjectStore, path: ObjectPath, @@ -266,10 +265,9 @@ pub struct FetchRowGroupsFromObjectStore { impl FetchRowGroupsFromObjectStore { pub fn new( - reader: ParquetObjectStore, + reader: Arc, schema: ArrowSchemaRef, projection: Option<&[usize]>, - predicate: Option>, row_groups: &[RowGroupMetaData], limit: usize, ) -> PolarsResult { @@ -283,28 +281,7 @@ impl FetchRowGroupsFromObjectStore { let row_groups_end = compute_row_group_range(0, row_groups.len(), limit, row_groups); let row_groups = &row_groups[0..row_groups_end]; - let mut prefetched: PlHashMap = PlHashMap::new(); - - let mut row_groups = if let Some(pred) = predicate.as_deref() { - row_groups - .iter() - .enumerate() - .filter(|(i, rg)| { - let should_be_read = - matches!(read_this_row_group(Some(pred), rg, &schema), Ok(true)); - - // Already add the row groups that will be skipped to the prefetched data. - if !should_be_read { - prefetched.insert(*i, Default::default()); - } - should_be_read - }) - .map(|(i, rg)| (i, rg.clone())) - .collect::>() - } else { - row_groups.iter().cloned().enumerate().collect() - }; - let reader = Arc::new(reader); + let mut row_groups = row_groups.iter().cloned().enumerate().collect::>(); let msg_limit = get_rg_prefetch_size(); if verbose() { diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 0226e0679ba21..c2bb823f9898e 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -7,7 +7,7 @@ use arrow::datatypes::ArrowSchemaRef; use polars_core::prelude::*; use polars_core::utils::{accumulate_dataframes_vertical, split_df}; use polars_core::POOL; -use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData}; +use polars_parquet::read::{self, ArrayIter, PhysicalType, RowGroupMetaData}; use rayon::prelude::*; #[cfg(feature = "cloud")] @@ -49,7 +49,7 @@ fn assert_dtypes(data_type: &ArrowDataType) { } } -fn column_idx_to_series( +pub fn column_idx_to_series( column_i: usize, md: &RowGroupMetaData, remaining_rows: usize, @@ -175,7 +175,7 @@ fn rg_to_dfs( row_group_start: usize, row_group_end: usize, remaining_rows: &mut usize, - file_metadata: &FileMetaData, + row_group_metadata: &[RowGroupMetaData], schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -191,7 +191,7 @@ fn rg_to_dfs( row_group_start, row_group_end, remaining_rows, - file_metadata, + row_group_metadata, schema, predicate, row_index, @@ -207,7 +207,7 @@ fn rg_to_dfs( row_group_end, previous_row_count, remaining_rows, - file_metadata, + row_group_metadata, schema, predicate, row_index, @@ -226,7 +226,7 @@ fn rg_to_dfs_optionally_par_over_columns( row_group_start: usize, row_group_end: usize, remaining_rows: &mut usize, - file_metadata: &FileMetaData, + row_group_metadata: &[RowGroupMetaData], schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -237,13 +237,14 @@ fn rg_to_dfs_optionally_par_over_columns( ) -> PolarsResult> { let mut dfs = Vec::with_capacity(row_group_end - row_group_start); - for rg_idx in row_group_start..row_group_end { - let md = &file_metadata.row_groups[rg_idx]; + for md in row_group_metadata + .iter() + .take(row_group_end) + .skip(row_group_start) + { let current_row_count = md.num_rows() as IdxSize; - if use_statistics - && !read_this_row_group(predicate, &file_metadata.row_groups[rg_idx], schema)? - { + if use_statistics && !read_this_row_group(predicate, md, schema)? { *previous_row_count += current_row_count; continue; } @@ -315,7 +316,7 @@ fn rg_to_dfs_par_over_rg( row_group_end: usize, previous_row_count: &mut IdxSize, remaining_rows: &mut usize, - file_metadata: &FileMetaData, + row_group_metadata: &[RowGroupMetaData], schema: &ArrowSchemaRef, predicate: Option<&dyn PhysicalIoExpr>, row_index: Option, @@ -324,8 +325,7 @@ fn rg_to_dfs_par_over_rg( hive_partition_columns: Option<&[Series]>, ) -> PolarsResult> { // compute the limits per row group and the row count offsets - let row_groups = file_metadata - .row_groups + let row_groups = row_group_metadata .iter() .enumerate() .skip(row_group_start) @@ -347,11 +347,7 @@ fn rg_to_dfs_par_over_rg( .map(|(rg_idx, md, projection_height, row_count_start)| { if projection_height == 0 || use_statistics - && !read_this_row_group( - predicate, - &file_metadata.row_groups[rg_idx], - schema, - )? + && !read_this_row_group(predicate, &row_group_metadata[rg_idx], schema)? { return Ok(None); } @@ -463,7 +459,7 @@ pub fn read_parquet( 0, n_row_groups, &mut limit, - &file_metadata, + &file_metadata.row_groups, reader_schema, predicate, row_index.clone(), @@ -563,7 +559,7 @@ pub struct BatchedParquetReader { limit: usize, projection: Arc<[usize]>, schema: ArrowSchemaRef, - metadata: FileMetaDataRef, + row_group_metadata: Vec, predicate: Option>, row_index: Option, rows_read: IdxSize, @@ -582,7 +578,7 @@ impl BatchedParquetReader { #[allow(clippy::too_many_arguments)] pub fn new( row_group_fetcher: RowGroupFetcher, - metadata: FileMetaDataRef, + row_group_metadata: Vec, schema: ArrowSchemaRef, limit: usize, projection: Option>, @@ -593,7 +589,7 @@ impl BatchedParquetReader { hive_partition_columns: Option>, mut parallel: ParallelStrategy, ) -> PolarsResult { - let n_row_groups = metadata.row_groups.len(); + let n_row_groups = row_group_metadata.len(); let projection = projection .map(Arc::from) .unwrap_or_else(|| (0usize..schema.len()).collect::>()); @@ -618,7 +614,7 @@ impl BatchedParquetReader { limit, projection, schema, - metadata, + row_group_metadata, row_index, rows_read: 0, predicate, @@ -637,6 +633,10 @@ impl BatchedParquetReader { self.limit == 0 } + pub fn num_row_groups(&self) -> usize { + self.row_group_metadata.len() + } + pub fn schema(&self) -> &ArrowSchemaRef { &self.schema } @@ -669,7 +669,7 @@ impl BatchedParquetReader { row_group_start, row_group_start + n, self.limit, - &self.metadata.row_groups, + &self.row_group_metadata, ); let store = self @@ -684,7 +684,7 @@ impl BatchedParquetReader { row_group_start, row_group_end, &mut self.limit, - &self.metadata, + &self.row_group_metadata, &self.schema, self.predicate.as_deref(), self.row_index.clone(), @@ -708,7 +708,7 @@ impl BatchedParquetReader { let row_index = self.row_index.clone(); let predicate = self.predicate.clone(); let schema = self.schema.clone(); - let metadata = self.metadata.clone(); + let metadata = self.row_group_metadata.clone(); let parallel = self.parallel; let projection = self.projection.clone(); let use_statistics = self.use_statistics; diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 97d96634be54f..b5635dbf36e54 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -2,6 +2,7 @@ use std::io::{Read, Seek}; use std::sync::Arc; use arrow::datatypes::ArrowSchemaRef; +use polars_core::config::verbose; use polars_core::prelude::*; #[cfg(feature = "cloud")] use polars_core::utils::accumulate_dataframes_vertical_unchecked; @@ -11,15 +12,19 @@ use polars_parquet::read; use super::async_impl::FetchRowGroupsFromObjectStore; #[cfg(feature = "cloud")] use super::async_impl::ParquetObjectStore; +use super::mmap::ColumnStore; +use super::predicates::read_this_row_group; pub use super::read_impl::BatchedParquetReader; -use super::read_impl::{read_parquet, FetchRowGroupsFromMmapReader}; +use super::read_impl::{ + column_idx_to_series, materialize_hive_partitions, read_parquet, FetchRowGroupsFromMmapReader, +}; #[cfg(feature = "cloud")] use super::utils::materialize_empty_df; #[cfg(feature = "cloud")] use crate::cloud::CloudOptions; use crate::mmap::MmapBytesReader; use crate::parquet::metadata::FileMetaDataRef; -use crate::predicates::PhysicalIoExpr; +use crate::predicates::{apply_predicate, PhysicalIoExpr}; use crate::prelude::*; use crate::RowIndex; @@ -137,7 +142,7 @@ impl ParquetReader { let row_group_fetcher = FetchRowGroupsFromMmapReader::new(Box::new(self.reader))?.into(); BatchedParquetReader::new( row_group_fetcher, - metadata, + metadata.row_groups.clone(), schema, self.n_rows.unwrap_or(usize::MAX), self.projection, @@ -258,6 +263,27 @@ impl ParquetAsyncReader { self.reader.num_rows().await } + pub async fn num_rows_with_predicate(&mut self) -> PolarsResult { + let metadata = self.reader.get_metadata().await?.clone(); + let schema = self.schema().await?; + let reader = Arc::new(self.reader.clone()); + + let row_sizes = prune_row_groups( + reader.clone(), + schema.clone(), + metadata, + usize::MAX, + self.predicate.clone(), + self.hive_partition_columns.as_deref(), + ) + .await? + .iter() + .map(|(row_size, _md)| *row_size) + .collect::>(); + + Ok(row_sizes.iter().sum()) + } + pub fn with_n_rows(mut self, n_rows: Option) -> Self { self.n_rows = n_rows; self @@ -302,25 +328,31 @@ impl ParquetAsyncReader { pub async fn batched(mut self, chunk_size: usize) -> PolarsResult { let metadata = self.reader.get_metadata().await?.clone(); - let schema = match self.schema { - Some(schema) => schema, - None => self.schema().await?, - }; + let schema = self.schema().await?; + let mut limit = self.n_rows.unwrap_or(usize::MAX); + + let reader = Arc::new(self.reader); + + let mut row_groups = metadata.row_groups.clone(); + if limit != 0 { + limit = usize::MAX; + } else { + row_groups = vec![]; + } // row group fetched deals with projection let row_group_fetcher = FetchRowGroupsFromObjectStore::new( - self.reader, + reader.clone(), schema.clone(), self.projection.as_deref(), - self.predicate.clone(), - &metadata.row_groups, - self.n_rows.unwrap_or(usize::MAX), + &row_groups, + limit, )? .into(); BatchedParquetReader::new( row_group_fetcher, - metadata, + row_groups, schema, - self.n_rows.unwrap_or(usize::MAX), + limit, self.projection, self.predicate.clone(), self.row_index, @@ -337,7 +369,6 @@ impl ParquetAsyncReader { pub async fn finish(mut self) -> PolarsResult { let rechunk = self.rechunk; - let metadata = self.get_metadata().await?.clone(); let reader_schema = self.schema().await?; let row_index = self.row_index.clone(); let hive_partition_columns = self.hive_partition_columns.clone(); @@ -345,7 +376,15 @@ impl ParquetAsyncReader { // batched reader deals with slice pushdown let reader = self.batched(usize::MAX).await?; - let n_batches = metadata.row_groups.len(); + let n_batches = reader.num_row_groups(); + if n_batches == 0 { + return Ok(materialize_empty_df( + projection.as_deref(), + reader_schema.as_ref(), + hive_partition_columns.as_deref(), + row_index.as_ref(), + )); + } let mut iter = reader.iter(n_batches); let mut chunks = Vec::with_capacity(n_batches); @@ -368,3 +407,83 @@ impl ParquetAsyncReader { Ok(df) } } + +#[cfg(feature = "cloud")] +async fn prune_row_groups( + reader: Arc, + schema: Arc, + metadata: Arc, + limit: usize, + predicate: Option>, + hive_partition_columns: Option<&[Series]>, +) -> PolarsResult> { + let predicate_columns = predicate.clone().unwrap().columns(); + let predicate_projection = materialize_projection( + Some(&predicate_columns), + &Schema::from(schema.clone()), + hive_partition_columns, + false, + ); + + let mut predicate_row_group_fetcher = FetchRowGroupsFromObjectStore::new( + reader.clone(), + schema.clone(), + predicate_projection.clone().as_deref(), + &metadata.row_groups, + usize::MAX, + )?; + + let predicate_store: ColumnStore<'_> = predicate_row_group_fetcher + .fetch_row_groups(0..metadata.row_groups.len()) + .await?; + + let mut remaining_rows = limit; + + let row_groups = metadata.row_groups.clone(); + let final_row_groups = row_groups + .iter() + .map(|md| { + if !read_this_row_group(predicate.clone().as_deref(), md, &schema).unwrap() + || remaining_rows == 0 + { + return (0, md); + } + let chunk_size = md.num_rows(); + + let mut columns: Vec = vec![]; + + for column_i in predicate_projection.as_ref().unwrap() { + let column = column_idx_to_series( + *column_i, + md, + usize::MAX, + &schema.clone(), + &predicate_store, + chunk_size, + ) + .unwrap(); + columns.push(column) + } + + let mut df = unsafe { DataFrame::new_no_checks(columns) }; + + materialize_hive_partitions(&mut df, hive_partition_columns, md.num_rows()); + apply_predicate(&mut df, predicate.as_deref(), false).unwrap(); + + let row_count = df.height(); + + remaining_rows = remaining_rows.saturating_sub(row_count); + + (row_count, md) + }) + .filter(|(row_count, _md)| *row_count != 0) + .map(|(row_count, md)| (row_count, md.clone())) + .collect::>(); + if verbose() { + eprintln!( + "reduced the number of row groups in pruning by {}", + row_groups.len() - final_row_groups.len() + ) + } + Ok(final_row_groups) +} diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 2da06f908769c..5b7ab7d04b961 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -13,6 +13,8 @@ pub trait PhysicalIoExpr: Send + Sync { fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { None } + + fn columns(&self) -> Vec; } pub trait StatsEvaluator { diff --git a/crates/polars-lazy/Cargo.toml b/crates/polars-lazy/Cargo.toml index 054089ff404f5..99649cc5ccc5e 100644 --- a/crates/polars-lazy/Cargo.toml +++ b/crates/polars-lazy/Cargo.toml @@ -21,6 +21,7 @@ polars-plan = { workspace = true } polars-time = { workspace = true, optional = true } polars-utils = { workspace = true } +crossbeam-channel = { workspace = true } ahash = { workspace = true } bitflags = { workspace = true } glob = { version = "0.3" } diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 44155a040995b..52ad42dfd8af1 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -19,6 +19,7 @@ use std::path::PathBuf; use std::sync::{Arc, Mutex}; pub use anonymous_scan::*; +use crossbeam_channel::{bounded, Receiver}; #[cfg(feature = "csv")] pub use csv::*; #[cfg(not(target_arch = "wasm32"))] @@ -31,6 +32,7 @@ pub use ndjson::*; #[cfg(feature = "parquet")] pub use parquet::*; use polars_core::prelude::*; +use polars_core::POOL; use polars_io::RowIndex; use polars_ops::frame::JoinCoalesce; pub use polars_plan::frame::{AllowedOptimizations, OptState}; @@ -793,6 +795,30 @@ impl LazyFrame { ) } + pub fn sink_to_batches(mut self) -> Result, PolarsError> { + self.opt_state.streaming = true; + let morsels_per_sink = POOL.current_num_threads(); + let backpressure = morsels_per_sink * 4; + let (sender, receiver) = bounded(backpressure); + self.logical_plan = DslPlan::Sink { + input: Arc::new(self.logical_plan), + payload: SinkType::Batch { + sender: BatchSender { id: 0, sender }, + }, + }; + + let (mut state, mut physical_plan, is_streaming) = self.prepare_collect(true)?; + polars_ensure!( + is_streaming, + ComputeError: format!("cannot run the whole query in a streaming order") + ); + POOL.spawn(move || { + let _ = physical_plan.execute(&mut state).unwrap(); + }); + + Ok(receiver) + } + #[cfg(any( feature = "ipc", feature = "parquet", diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 695cc58c98c0d..6084e02c1c0cf 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -5,9 +5,6 @@ use polars_core::config; use polars_core::config::{get_file_prefetch_size, verbose}; use polars_core::utils::accumulate_dataframes_vertical; use polars_io::cloud::CloudOptions; -use polars_io::parquet::metadata::FileMetaDataRef; -use polars_io::parquet::read::materialize_empty_df; -use polars_io::utils::is_cloud_url; use polars_io::RowIndex; use super::*; @@ -202,6 +199,13 @@ impl ParquetExec { ); } + // Now read the actual data. + let file_info = &self.file_info; + let file_options = &self.file_options; + let use_statistics = self.options.use_statistics; + let predicate = &self.predicate; + let base_row_index_ref = &base_row_index; + // First initialize the readers and get the metadata concurrently. let iter = paths.iter().enumerate().map(|(i, path)| async move { let first_file = batch_idx == 0 && i == 0; @@ -211,6 +215,7 @@ impl ParquetExec { } else { (None, None) }; + let mut reader = ParquetAsyncReader::from_uri( &path.to_string_lossy(), cloud_options, @@ -231,6 +236,7 @@ impl ParquetExec { } let num_rows = reader.num_rows().await?; + PolarsResult::Ok((num_rows, reader)) }); let readers_and_metadata = futures::future::try_join_all(iter).await?; @@ -244,13 +250,6 @@ impl ParquetExec { let rows_statistics = get_sequential_row_statistics(iter, remaining_rows_to_read); - // Now read the actual data. - let file_info = &self.file_info; - let file_options = &self.file_options; - let use_statistics = self.options.use_statistics; - let predicate = &self.predicate; - let base_row_index_ref = &base_row_index; - if verbose { eprintln!("reading of {}/{} file...", processed, self.paths.len()); } diff --git a/crates/polars-lazy/src/physical_plan/planner/lp.rs b/crates/polars-lazy/src/physical_plan/planner/lp.rs index 4e0e169847fe2..3a748cbcc5ff8 100644 --- a/crates/polars-lazy/src/physical_plan/planner/lp.rs +++ b/crates/polars-lazy/src/physical_plan/planner/lp.rs @@ -172,6 +172,9 @@ fn create_physical_plan_impl( SinkType::Cloud { .. } => { polars_bail!(InvalidOperation: "cloud sink not supported in standard engine.") }, + SinkType::Batch { .. } => { + polars_bail!(InvalidOperation: "batch sink not supported in the standard engine") + } }, Union { inputs, options } => { let inputs = inputs diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 6b906126aeeb3..3c5630967fddf 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -28,6 +28,19 @@ impl PhysicalIoExpr for Wrap { fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.0.as_stats_evaluator() } + + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + to_aexpr(self.0.as_expression().unwrap().clone(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } impl PhysicalPipedExpr for Wrap { fn evaluate(&self, chunk: &DataChunk, state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index e928e2ba8a082..8eec6cdf5b2eb 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -30,6 +30,10 @@ impl PhysicalIoExpr for Len { fn evaluate_io(&self, _df: &DataFrame) -> PolarsResult { unimplemented!() } + + fn columns(&self) -> Vec { + unimplemented!() + } } impl PhysicalPipedExpr for Len { fn evaluate(&self, chunk: &DataChunk, _lazy_state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs b/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs new file mode 100644 index 0000000000000..a38e44c1ad65a --- /dev/null +++ b/crates/polars-pipe/src/executors/sinks/output/batch_sink.rs @@ -0,0 +1,51 @@ +use std::any::Any; + +use polars_core::prelude::*; +use crossbeam_channel::Sender; + +use crate::operators::{ + chunks_to_df_unchecked, DataChunk, FinalizedSink, PExecutionContext, Sink, SinkResult, +}; + +#[derive(Clone)] +pub struct BatchSink { + sender: Sender +} + +impl BatchSink { + pub fn new(sender: Sender) -> PolarsResult { + Ok(Self { sender }) + } +} + +impl Sink for BatchSink { + fn sink(&mut self, _context: &PExecutionContext, chunk: DataChunk) -> PolarsResult { + let df: DataFrame = chunks_to_df_unchecked(vec![chunk]); + let result = self.sender.send(df); + match result { + Ok(..) => Ok(SinkResult::CanHaveMoreInput), + Err(..) => Ok(SinkResult::Finished) + } + } + + fn combine(&mut self, _other: &mut dyn Sink) { + // Nothing to do + } + + fn split(&self, _thread_no: usize) -> Box { + Box::new(self.clone()) + } + + fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult { + let _ = self.sender.send(Default::default()); + Ok(FinalizedSink::Finished(Default::default())) + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn fmt(&self) -> &str { + "batch_sink" + } +} diff --git a/crates/polars-pipe/src/executors/sinks/output/mod.rs b/crates/polars-pipe/src/executors/sinks/output/mod.rs index 602e525fa8534..36f8893e407d2 100644 --- a/crates/polars-pipe/src/executors/sinks/output/mod.rs +++ b/crates/polars-pipe/src/executors/sinks/output/mod.rs @@ -1,3 +1,4 @@ +mod batch_sink; #[cfg(feature = "csv")] mod csv; #[cfg(any( @@ -22,3 +23,5 @@ pub use ipc::*; pub use json::*; #[cfg(feature = "parquet")] pub use parquet::*; + +pub use batch_sink::*; \ No newline at end of file diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 79e7380c6291a..9be63327b5420 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -33,6 +33,7 @@ pub struct ParquetSource { processed_paths: usize, iter: Range, paths: Arc<[PathBuf]>, + total_files_read: usize, options: ParquetOptions, file_options: FileScanOptions, #[allow(dead_code)] @@ -43,6 +44,7 @@ pub struct ParquetSource { run_async: bool, prefetch_size: usize, predicate: Option>, + rows_left_to_read: usize, } impl ParquetSource { @@ -167,8 +169,11 @@ impl ParquetSource { } #[cfg(feature = "async")] - async fn init_reader_async(&self, index: usize) -> PolarsResult { - let metadata = self.metadata.clone(); + async fn init_reader_async( + &self, + index: usize, + n_rows: usize, + ) -> PolarsResult { let predicate = self.predicate.clone(); let cloud_options = self.cloud_options.clone(); let (path, options, file_options, projection, chunk_size, reader_schema, hive_partitions) = @@ -176,9 +181,9 @@ impl ParquetSource { let batched_reader = { let uri = path.to_string_lossy(); - ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, metadata) + ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, None) .await? - .with_n_rows(file_options.n_rows) + .with_n_rows(Some(n_rows)) .with_row_index(file_options.row_index) .with_projection(projection) .with_predicate(predicate.clone()) @@ -190,6 +195,29 @@ impl ParquetSource { Ok(batched_reader) } + #[cfg(feature = "async")] + async fn num_rows_per_reader(&self, index: usize) -> PolarsResult { + let predicate = self.predicate.clone(); + let cloud_options = self.cloud_options.clone(); + let (path, options, _file_options, projection, _chunk_size, reader_schema, hive_partitions) = + self.prepare_init_reader(index)?; + + let mut reader = { + let uri = path.to_string_lossy(); + ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), reader_schema, None) + .await? + .with_projection(projection) + .with_predicate(predicate.clone()) + .use_statistics(options.use_statistics) + .with_hive_partition_columns(hive_partitions) + }; + if predicate.is_some() { + reader.num_rows_with_predicate().await + } else { + reader.num_rows().await + } + } + #[allow(unused_variables)] #[allow(clippy::too_many_arguments)] pub(crate) fn new( @@ -211,6 +239,7 @@ impl ParquetSource { eprintln!("POLARS PREFETCH_SIZE: {}", prefetch_size) } let run_async = paths.first().map(is_cloud_url).unwrap_or(false) || config::force_async(); + let rows_left_to_read = file_options.n_rows.unwrap_or(usize::MAX); let mut source = ParquetSource { batched_readers: VecDeque::new(), @@ -220,6 +249,7 @@ impl ParquetSource { file_options, iter, paths, + total_files_read: 0, cloud_options, metadata, file_info, @@ -227,6 +257,7 @@ impl ParquetSource { run_async, prefetch_size, predicate, + rows_left_to_read, }; // Already start downloading when we deal with cloud urls. if run_async { @@ -242,8 +273,9 @@ impl ParquetSource { // // It is important we do this for a reasonable batch size, that's why we start this when we // have just 2 readers left. - if self.batched_readers.len() <= 2 && self.file_options.n_rows.is_none() - || self.batched_readers.is_empty() + if self.batched_readers.is_empty() + && self.rows_left_to_read != 0 + && self.total_files_read != self.paths.len() { let range = 0..self.prefetch_size - self.batched_readers.len(); @@ -257,7 +289,34 @@ impl ParquetSource { .zip(&mut self.iter) .map(|(_, index)| index) .collect::>(); - let init_iter = range.into_iter().map(|index| self.init_reader_async(index)); + + let num_rows_to_read = range + .clone() + .into_iter() + .map(|index| self.num_rows_per_reader(index)); + + let num_rows_to_read = polars_io::pl_async::get_runtime().block_on(async { + futures::future::try_join_all(num_rows_to_read).await + })?; + + let num_rows_to_read = num_rows_to_read + .into_iter() + .zip(range) + .map(|(rows_per_reader, index)| { + self.total_files_read += 1; + if self.rows_left_to_read == 0 { + return (index, 0); + } + self.rows_left_to_read = + self.rows_left_to_read.saturating_sub(rows_per_reader); + (index, rows_per_reader) + }) + .filter(|(_index, rows_per_reader)| *rows_per_reader != 0) + .collect::>(); + + let init_iter = num_rows_to_read + .into_iter() + .map(|(index, num_rows)| self.init_reader_async(index, num_rows)); let batched_readers = polars_io::pl_async::get_runtime() .block_on_potential_spawn(async { @@ -283,7 +342,9 @@ impl Source for ParquetSource { self.prefetch_files()?; let Some(mut reader) = self.batched_readers.pop_front() else { - // If there was no new reader, we depleted all of them and are finished. + if self.total_files_read != self.paths.len() && self.rows_left_to_read != 0 { + return self.get_batches(_context); + } return Ok(SourceResult::Finished); }; @@ -292,10 +353,6 @@ impl Source for ParquetSource { Ok(match batches { None => { - if reader.limit_reached() { - return Ok(SourceResult::Finished); - } - // reset the reader self.init_next_reader()?; return self.get_batches(_context); diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 46d9482283b8b..1398eb7dce555 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -132,6 +132,18 @@ where fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.p.as_stats_evaluator() } + fn columns(&self) -> Vec { + let mut arena: Arena = Arena::new(); + to_aexpr(self.p.expression(), &mut arena); + let mut columns = vec![]; + for _ in 0..arena.len() { + let node = arena.pop().unwrap(); + if let AExpr::Column(s) = node { + columns.push(s.as_ref().to_string()) + } + } + columns + } } PolarsResult::Ok(Arc::new(Wrap { p }) as Arc) @@ -174,6 +186,9 @@ where SinkType::Memory => { Box::new(OrderedSink::new(input_schema.into_owned())) as Box }, + SinkType::Batch { sender } => { + Box::new(BatchSink::new(sender.sender.clone())?) as Box + }, #[allow(unused_variables)] SinkType::File { path, file_type, .. diff --git a/crates/polars-plan/Cargo.toml b/crates/polars-plan/Cargo.toml index 92113dc29b04f..231fb5c017709 100644 --- a/crates/polars-plan/Cargo.toml +++ b/crates/polars-plan/Cargo.toml @@ -22,6 +22,7 @@ polars-parquet = { workspace = true, optional = true } polars-time = { workspace = true, optional = true } polars-utils = { workspace = true } +crossbeam-channel = { workspace = true } ahash = { workspace = true } arrow = { workspace = true } bytemuck = { workspace = true } diff --git a/crates/polars-plan/src/logical_plan/alp/dot.rs b/crates/polars-plan/src/logical_plan/alp/dot.rs index a0692b7ef9d6f..7c9257e06530e 100644 --- a/crates/polars-plan/src/logical_plan/alp/dot.rs +++ b/crates/polars-plan/src/logical_plan/alp/dot.rs @@ -263,6 +263,7 @@ impl<'a> IRDotDisplay<'a> { write_label(f, id, |f| { f.write_str(match payload { SinkType::Memory => "SINK (MEMORY)", + SinkType::Batch { .. } => "SINK (BATCH)", SinkType::File { .. } => "SINK (FILE)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (CLOUD)", diff --git a/crates/polars-plan/src/logical_plan/alp/format.rs b/crates/polars-plan/src/logical_plan/alp/format.rs index f7debc9c72237..36625cd6ed38e 100644 --- a/crates/polars-plan/src/logical_plan/alp/format.rs +++ b/crates/polars-plan/src/logical_plan/alp/format.rs @@ -313,6 +313,7 @@ impl<'a> IRDisplay<'a> { Sink { input, payload, .. } => { let name = match payload { SinkType::Memory => "SINK (memory)", + SinkType::Batch { .. }=> "SINK (batch)", SinkType::File { .. } => "SINK (file)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", diff --git a/crates/polars-plan/src/logical_plan/alp/schema.rs b/crates/polars-plan/src/logical_plan/alp/schema.rs index 6047fe6d59436..8e54a5c3a0d34 100644 --- a/crates/polars-plan/src/logical_plan/alp/schema.rs +++ b/crates/polars-plan/src/logical_plan/alp/schema.rs @@ -36,6 +36,7 @@ impl IR { ExtContext { .. } => "ext_context", Sink { payload, .. } => match payload { SinkType::Memory => "sink (memory)", + SinkType::Batch { .. } => "sink (batch)", SinkType::File { .. } => "sink (file)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "sink (cloud)", diff --git a/crates/polars-plan/src/logical_plan/alp/tree_format.rs b/crates/polars-plan/src/logical_plan/alp/tree_format.rs index 7337a6c332012..50aa4de484364 100644 --- a/crates/polars-plan/src/logical_plan/alp/tree_format.rs +++ b/crates/polars-plan/src/logical_plan/alp/tree_format.rs @@ -339,6 +339,7 @@ impl<'a> TreeFmtNode<'a> { match payload { SinkType::Memory => "SINK (memory)", SinkType::File { .. } => "SINK (file)", + SinkType::Batch { .. } => "SINK (batch)", #[cfg(feature = "cloud")] SinkType::Cloud { .. } => "SINK (cloud)", }, diff --git a/crates/polars-plan/src/logical_plan/options.rs b/crates/polars-plan/src/logical_plan/options.rs index 67d1e3a439855..8646ecde5e28c 100644 --- a/crates/polars-plan/src/logical_plan/options.rs +++ b/crates/polars-plan/src/logical_plan/options.rs @@ -1,4 +1,6 @@ -use std::path::PathBuf; +use std::{hash::{Hash, Hasher}, path::PathBuf}; + +use crossbeam_channel::{bounded, Receiver, Sender}; use polars_core::prelude::*; #[cfg(feature = "csv")] @@ -223,10 +225,42 @@ pub struct AnonymousScanOptions { pub fmt_str: &'static str, } +#[derive(Clone, Debug)] +pub struct BatchSender { + pub id: u32, + pub sender: Sender, +} + +impl Default for BatchSender { + fn default() -> Self { + let (sender, receiver) = bounded(1); + Self{ id: 0, sender: sender} + } +} + +impl PartialEq for BatchSender { + fn eq(&self, other: &Self) -> bool { + self.sender.same_channel(&other.sender) + } +} + +impl Eq for BatchSender{} + +impl Hash for BatchSender { + fn hash(&self, state: &mut H) { + self.id.hash(state) + } +} + + #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Clone, Debug, PartialEq, Eq, Hash)] pub enum SinkType { Memory, + Batch { + #[serde(skip)] + sender: BatchSender, + }, File { path: Arc, file_type: FileType, diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 190f5589a756a..884f7fc91b2f9 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -5,7 +5,7 @@ import sys from datetime import datetime, time, timezone from decimal import Decimal -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast import fsspec import numpy as np @@ -971,3 +971,19 @@ def test_hybrid_rle() -> None: assert "RLE_DICTIONARY" in column["encodings"] f.seek(0) assert_frame_equal(pl.read_parquet(f), df) + + +def test_skip_full_load_of_rgs_using_predicate( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, capfd: Any +) -> None: + monkeypatch.setenv("POLARS_VERBOSE", "1") + monkeypatch.setenv("POLARS_FORCE_ASYNC", "1") + df = pl.DataFrame( + {"a": pl.arange(0, 10, eager=True), "b": pl.arange(0, 10, eager=True)} + ) + root = tmp_path / "test_rg_skip.parquet" + df.write_parquet(root, use_pyarrow=True, row_group_size=2) + + q = pl.scan_parquet(root, parallel="row_groups") + assert q.filter(pl.col("a").gt(6)).collect().shape == (3, 2) + assert "reduced the number of row groups in pruning by 3" in capfd.readouterr().err