diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 2da06f908769c..e3cd636f5b944 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -44,7 +44,7 @@ pub fn apply_predicate( /// - Null count /// - Minimum value /// - Maximum value -#[derive(Debug)] +#[derive(Debug, Clone)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct ColumnStats { field: Field, @@ -91,6 +91,10 @@ impl ColumnStats { } } + pub fn field_name(&self) -> &SmartString { + self.field.name() + } + /// Returns the [`DataType`] of the column. pub fn dtype(&self) -> &DataType { self.field.data_type() @@ -195,7 +199,7 @@ fn use_min_max(dtype: &DataType) -> bool { /// A collection of column stats with a known schema. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BatchStats { schema: SchemaRef, stats: Vec, @@ -238,4 +242,12 @@ impl BatchStats { pub fn num_rows(&self) -> Option { self.num_rows } + + pub fn with_schema(&mut self, schema: SchemaRef) { + self.schema = schema; + } + + pub fn take_indices(&mut self, indices: &[usize]) { + self.stats = indices.iter().map(|&i| self.stats[i].clone()).collect(); + } } diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 69a4e2348fcbe..9bab31b8c4405 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -7,7 +7,6 @@ use polars_core::config::{get_file_prefetch_size, verbose}; use polars_core::utils::accumulate_dataframes_vertical; use polars_io::cloud::CloudOptions; use polars_io::parquet::metadata::FileMetaDataRef; -use polars_io::parquet::read::materialize_empty_df; use polars_io::utils::is_cloud_url; use polars_io::RowIndex; @@ -16,7 +15,7 @@ use super::*; pub struct ParquetExec { paths: Arc<[PathBuf]>, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option>, options: ParquetOptions, #[allow(dead_code)] @@ -31,7 +30,7 @@ impl ParquetExec { pub(crate) fn new( paths: Arc<[PathBuf]>, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option>, options: ParquetOptions, cloud_options: Option, @@ -324,34 +323,7 @@ impl ParquetExec { .and_then(|_| self.predicate.take()) .map(phys_expr_to_io_expr); - let is_cloud = match self.paths.first() { - Some(p) => is_cloud_url(p.as_path()), - None => { - let hive_partitions = self - .hive_parts - .as_ref() - .filter(|x| !x.is_empty()) - .map(|x| x.first().unwrap().materialize_partition_columns()); - let (projection, _) = prepare_scan_args( - None, - &mut self.file_options.with_columns, - &mut self.file_info.schema, - self.file_options.row_index.is_some(), - hive_partitions.as_deref(), - ); - return Ok(materialize_empty_df( - projection.as_deref(), - self.file_info - .reader_schema - .as_ref() - .unwrap() - .as_ref() - .unwrap_left(), - hive_partitions.as_deref(), - self.file_options.row_index.as_ref(), - )); - }, - }; + let is_cloud = is_cloud_url(self.paths.first().unwrap()); let force_async = config::force_async(); let out = if is_cloud || force_async { diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index c4222d07c206b..781a387499812 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -40,7 +40,7 @@ pub struct ParquetSource { cloud_options: Option, metadata: Option, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, verbose: bool, run_async: bool, prefetch_size: usize, @@ -192,7 +192,7 @@ impl ParquetSource { metadata: Option, file_options: FileScanOptions, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, verbose: bool, predicate: Option>, ) -> PolarsResult { diff --git a/crates/polars-plan/src/plans/hive.rs b/crates/polars-plan/src/plans/hive.rs index 505e404fea360..e0c2b0282d712 100644 --- a/crates/polars-plan/src/plans/hive.rs +++ b/crates/polars-plan/src/plans/hive.rs @@ -9,7 +9,7 @@ use polars_io::utils::{BOOLEAN_RE, FLOAT_RE, INTEGER_RE}; use serde::{Deserialize, Serialize}; #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct HivePartitions { /// Single value Series that can be used to run the predicate against. /// They are to be broadcasted if the predicates don't filter them out. @@ -17,6 +17,32 @@ pub struct HivePartitions { } impl HivePartitions { + pub fn get_projection_schema_and_indices>( + &self, + names: &[T], + ) -> (SchemaRef, Vec) { + let names = names.iter().map(T::as_ref).collect::>(); + let mut out_schema = Schema::with_capacity(self.stats.schema().len()); + let mut out_indices = Vec::with_capacity(self.stats.column_stats().len()); + + for (i, cs) in self.stats.column_stats().iter().enumerate() { + let name = cs.field_name(); + if names.contains(name.as_str()) { + out_indices.push(i); + out_schema + .insert_at_index(out_schema.len(), name.clone(), cs.dtype().clone()) + .unwrap(); + } + } + + (out_schema.into(), out_indices) + } + + pub fn apply_projection(&mut self, new_schema: SchemaRef, column_indices: &[usize]) { + self.stats.with_schema(new_schema); + self.stats.take_indices(column_indices); + } + pub fn get_statistics(&self) -> &BatchStats { &self.stats } @@ -40,7 +66,7 @@ pub fn hive_partitions_from_paths( paths: &[PathBuf], hive_start_idx: usize, schema: Option, -) -> PolarsResult>>> { +) -> PolarsResult>> { let Some(path) = paths.first() else { return Ok(None); }; @@ -131,10 +157,10 @@ pub fn hive_partitions_from_paths( let stats = BatchStats::new(hive_schema.clone(), column_stats, None); - hive_partitions.push(Arc::new(HivePartitions { stats })); + hive_partitions.push(HivePartitions { stats }); } - Ok(Some(hive_partitions)) + Ok(Some(Arc::from(hive_partitions))) } /// Determine the path separator for identifying Hive partitions. diff --git a/crates/polars-plan/src/plans/ir/mod.rs b/crates/polars-plan/src/plans/ir/mod.rs index 815e292e0b174..e4e09954255ed 100644 --- a/crates/polars-plan/src/plans/ir/mod.rs +++ b/crates/polars-plan/src/plans/ir/mod.rs @@ -51,7 +51,7 @@ pub enum IR { Scan { paths: Arc<[PathBuf]>, file_info: FileInfo, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option, /// schema of the projected file output_schema: Option, diff --git a/crates/polars-plan/src/plans/mod.rs b/crates/polars-plan/src/plans/mod.rs index ca9acc44cf532..c92f3b8bc48b7 100644 --- a/crates/polars-plan/src/plans/mod.rs +++ b/crates/polars-plan/src/plans/mod.rs @@ -81,7 +81,7 @@ pub enum DslPlan { paths: Arc<[PathBuf]>, // Option as this is mostly materialized on the IR phase. file_info: Option, - hive_parts: Option>>, + hive_parts: Option>, predicate: Option, file_options: FileScanOptions, scan_type: FileScan, diff --git a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs index 6d07ff86ddf75..42a505a92b800 100644 --- a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs @@ -378,7 +378,7 @@ impl<'a> PredicatePushDown<'a> { } scan_type.remove_metadata(); } - if paths.is_empty() { + if new_paths.is_empty() { let schema = output_schema.as_ref().unwrap_or(&file_info.schema); let df = DataFrame::from(schema.as_ref()); @@ -390,7 +390,7 @@ impl<'a> PredicatePushDown<'a> { }); } else { paths = Arc::from(new_paths); - scan_hive_parts = Some(new_hive_parts); + scan_hive_parts = Some(Arc::from(new_hive_parts)); } } } diff --git a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs index 288ebe8768e3b..b7d8e7822676f 100644 --- a/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/projection_pushdown/mod.rs @@ -395,7 +395,7 @@ impl ProjectionPushDown { Scan { paths, file_info, - hive_parts, + mut hive_parts, scan_type, predicate, mut file_options, @@ -422,9 +422,7 @@ impl ProjectionPushDown { file_options.row_index.as_ref(), ); - output_schema = if file_options.with_columns.is_none() { - None - } else { + output_schema = if let Some(ref with_columns) = file_options.with_columns { let mut schema = update_scan_schema( &acc_projections, expr_arena, @@ -434,7 +432,28 @@ impl ProjectionPushDown { // Hive partitions are created AFTER the projection, so the output // schema is incorrect. Here we ensure the columns that are projected and hive // parts are added at the proper place in the schema, which is at the end. - if let Some(ref hive_parts) = hive_parts { + hive_parts = if let Some(hive_parts) = hive_parts { + let (new_schema, projected_indices) = hive_parts[0] + .get_projection_schema_and_indices(with_columns.as_ref()); + + Some( + hive_parts + .into_iter() + .cloned() + .map(|mut hp| { + hp.apply_projection( + new_schema.clone(), + projected_indices.as_ref(), + ); + hp + }) + .collect::>(), + ) + } else { + hive_parts + }; + + if let Some(ref mut hive_parts) = hive_parts { let partition_schema = hive_parts.first().unwrap().schema(); for (name, _) in partition_schema.iter() { @@ -444,6 +463,8 @@ impl ProjectionPushDown { } } Some(Arc::new(schema)) + } else { + None }; } diff --git a/py-polars/tests/unit/io/test_hive.py b/py-polars/tests/unit/io/test_hive.py index af870de093ea4..703c5d0c71501 100644 --- a/py-polars/tests/unit/io/test_hive.py +++ b/py-polars/tests/unit/io/test_hive.py @@ -12,9 +12,6 @@ from polars.testing import assert_frame_equal, assert_series_equal -@pytest.mark.skip( - reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892" -) @pytest.mark.xdist_group("streaming") @pytest.mark.write_disk() def test_hive_partitioned_predicate_pushdown( @@ -102,9 +99,6 @@ def test_hive_partitioned_predicate_pushdown_skips_correct_number_of_files( assert result.to_dict(as_series=False) == expected -@pytest.mark.skip( - reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892" -) @pytest.mark.xdist_group("streaming") @pytest.mark.write_disk() def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> None: @@ -139,9 +133,6 @@ def test_hive_partitioned_slice_pushdown(io_files_path: Path, tmp_path: Path) -> ] -@pytest.mark.skip( - reason="Broken by pyarrow 15 release: https://github.com/pola-rs/polars/issues/13892" -) @pytest.mark.xdist_group("streaming") @pytest.mark.write_disk() def test_hive_partitioned_projection_pushdown(