diff --git a/crates/polars-core/src/frame/chunks.rs b/crates/polars-core/src/frame/chunks.rs index 704df0b7d140..16fa8f7c1ff9 100644 --- a/crates/polars-core/src/frame/chunks.rs +++ b/crates/polars-core/src/frame/chunks.rs @@ -23,7 +23,7 @@ impl TryFrom<(RecordBatch, &ArrowSchema)> for DataFrame { impl DataFrame { pub fn split_chunks(&mut self) -> impl Iterator + '_ { - self.align_chunks(); + self.align_chunks_par(); (0..self.n_chunks()).map(move |i| unsafe { let columns = self diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 998b82eddc78..a06c5856a5f1 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -12,7 +12,7 @@ use crate::chunked_array::ops::unique::is_unique_helper; use crate::prelude::*; #[cfg(feature = "row_hash")] use crate::utils::split_df; -use crate::utils::{slice_offsets, try_get_supertype, NoNull}; +use crate::utils::{slice_offsets, try_get_supertype, Container, NoNull}; #[cfg(feature = "dataframe_arithmetic")] mod arithmetic; @@ -526,10 +526,15 @@ impl DataFrame { /// Aggregate all the chunks in the DataFrame to a single chunk in parallel. /// This may lead to more peak memory consumption. pub fn as_single_chunk_par(&mut self) -> &mut Self { - self.as_single_chunk(); - // if self.columns.iter().any(|s| s.n_chunks() > 1) { - // self.columns = self._apply_columns_par(&|s| s.rechunk()); - // } + if self.columns.iter().any(|c| { + if let Column::Series(s) = c { + s.n_chunks() > 1 + } else { + false + } + }) { + self.columns = self._apply_columns_par(&|s| s.rechunk()); + } self } @@ -552,13 +557,13 @@ impl DataFrame { None => false, Some(first_column_chunk_lengths) => { // Fast Path for single Chunk Series - if first_column_chunk_lengths.len() == 1 { - return chunk_lengths.any(|cl| cl.len() != 1); + if first_column_chunk_lengths.size_hint().0 == 1 { + return chunk_lengths.any(|cl| cl.size_hint().0 != 1); } // Always rechunk if we have more chunks than rows. // except when we have an empty df containing a single chunk let height = self.height(); - let n_chunks = first_column_chunk_lengths.len(); + let n_chunks = first_column_chunk_lengths.size_hint().0; if n_chunks > height && !(height == 0 && n_chunks == 1) { return true; } @@ -575,7 +580,7 @@ impl DataFrame { } /// Ensure all the chunks in the [`DataFrame`] are aligned. - pub fn align_chunks(&mut self) -> &mut Self { + pub fn align_chunks_par(&mut self) -> &mut Self { if self.should_rechunk() { self.as_single_chunk_par() } else { @@ -583,6 +588,14 @@ impl DataFrame { } } + pub fn align_chunks(&mut self) -> &mut Self { + if self.should_rechunk() { + self.as_single_chunk() + } else { + self + } + } + /// Get the [`DataFrame`] schema. /// /// # Example @@ -903,7 +916,7 @@ impl DataFrame { /// Concatenate a [`DataFrame`] to this [`DataFrame`] and return as newly allocated [`DataFrame`]. /// - /// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`]. + /// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`]. /// /// # Example /// @@ -949,7 +962,7 @@ impl DataFrame { /// Concatenate a [`DataFrame`] to this [`DataFrame`] /// - /// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`]. + /// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`]. /// /// # Example /// @@ -1012,7 +1025,7 @@ impl DataFrame { /// Concatenate a [`DataFrame`] to this [`DataFrame`] /// - /// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`]. + /// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`]. /// /// # Panics /// Panics if the schema's don't match. @@ -1038,7 +1051,7 @@ impl DataFrame { /// /// Prefer `vstack` over `extend` when you want to append many times before doing a query. For instance /// when you read in multiple files and when to store them in a single `DataFrame`. In the latter case, finish the sequence - /// of `append` operations with a [`rechunk`](Self::align_chunks). + /// of `append` operations with a [`rechunk`](Self::align_chunks_par). pub fn extend(&mut self, other: &DataFrame) -> PolarsResult<()> { polars_ensure!( self.width() == other.width(), @@ -2512,17 +2525,17 @@ impl DataFrame { /// but we also don't want to rechunk here, as this operation is costly and would benefit the caller /// as well. pub fn iter_chunks(&self, compat_level: CompatLevel, parallel: bool) -> RecordBatchIter { + debug_assert!(!self.should_rechunk(), "expected equal chunks"); // If any of the columns is binview and we don't convert `compat_level` we allow parallelism // as we must allocate arrow strings/binaries. - let parallel = if parallel && compat_level.0 >= 1 { - self.columns.len() > 1 - && self - .columns - .iter() - .any(|s| matches!(s.dtype(), DataType::String | DataType::Binary)) - } else { - false - }; + let must_convert = compat_level.0 == 0; + let parallel = parallel + && must_convert + && self.columns.len() > 1 + && self + .columns + .iter() + .any(|s| matches!(s.dtype(), DataType::String | DataType::Binary)); RecordBatchIter { columns: &self.columns, diff --git a/crates/polars-core/src/utils/mod.rs b/crates/polars-core/src/utils/mod.rs index 08b33b06c3b1..169c30fd0498 100644 --- a/crates/polars-core/src/utils/mod.rs +++ b/crates/polars-core/src/utils/mod.rs @@ -309,7 +309,7 @@ pub fn split_df(df: &mut DataFrame, target: usize, strict: bool) -> Vec PolarsResult<()> { - df.align_chunks(); + df.align_chunks_par(); let fields = df .iter() .map(|s| { diff --git a/crates/polars-io/src/utils/other.rs b/crates/polars-io/src/utils/other.rs index a49f2d429735..023d61fe525b 100644 --- a/crates/polars-io/src/utils/other.rs +++ b/crates/polars-io/src/utils/other.rs @@ -182,7 +182,7 @@ pub(crate) fn chunk_df_for_writing( row_group_size: usize, ) -> PolarsResult> { // ensures all chunks are aligned. - df.align_chunks(); + df.align_chunks_par(); // Accumulate many small chunks to the row group size. // See: #16403 diff --git a/crates/polars-pipe/src/executors/sinks/io.rs b/crates/polars-pipe/src/executors/sinks/io.rs index 34d357ee18a4..7ffdd505444d 100644 --- a/crates/polars-pipe/src/executors/sinks/io.rs +++ b/crates/polars-pipe/src/executors/sinks/io.rs @@ -170,7 +170,7 @@ impl IOThread { if let Some(partitions) = partitions { for (part, mut df) in partitions.into_no_null_iter().zip(iter) { df.shrink_to_fit(); - df.align_chunks(); + df.align_chunks_par(); let mut path = dir2.clone(); path.push(format!("{part}")); @@ -194,7 +194,7 @@ impl IOThread { for mut df in iter { df.shrink_to_fit(); - df.align_chunks(); + df.align_chunks_par(); writer.write_batch(&df).unwrap(); } writer.finish().unwrap(); @@ -252,13 +252,13 @@ impl IOThread { partition_no: IdxSize, mut df: DataFrame, ) { - df.shrink_to_fit(); + df.align_chunks(); let count = self.thread_local_count.fetch_add(1, Ordering::Relaxed); let mut path = self.dir.clone(); path.push(format!("{partition_no}")); let _ = std::fs::create_dir(&path); - // thread local name we start with an underscore to ensure we don't get + // Thread local name we start with an underscore to ensure we don't get // duplicates path.push(format!("_{count}.ipc")); let file = File::create(path).unwrap(); diff --git a/crates/polars-python/src/dataframe/export.rs b/crates/polars-python/src/dataframe/export.rs index 013cb32dfdd9..29ba4edb5371 100644 --- a/crates/polars-python/src/dataframe/export.rs +++ b/crates/polars-python/src/dataframe/export.rs @@ -71,7 +71,7 @@ impl PyDataFrame { #[allow(clippy::wrong_self_convention)] pub fn to_arrow(&mut self, compat_level: PyCompatLevel) -> PyResult> { - self.df.align_chunks(); + self.df.align_chunks_par(); Python::with_gil(|py| { let pyarrow = py.import_bound("pyarrow")?; let names = self.df.get_column_names_str(); @@ -144,7 +144,7 @@ impl PyDataFrame { py: Python<'py>, requested_schema: Option, ) -> PyResult> { - self.df.align_chunks(); + self.df.align_chunks_par(); dataframe_to_stream(&self.df, py) } }