Skip to content

Commit

Permalink
fix: Ensure aligned chunks in OOC sort (#19118)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 7, 2024
1 parent 1a9b224 commit d31bbf4
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 33 deletions.
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl TryFrom<(RecordBatch, &ArrowSchema)> for DataFrame {

impl DataFrame {
pub fn split_chunks(&mut self) -> impl Iterator<Item = DataFrame> + '_ {
self.align_chunks();
self.align_chunks_par();

(0..self.n_chunks()).map(move |i| unsafe {
let columns = self
Expand Down
57 changes: 35 additions & 22 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::chunked_array::ops::unique::is_unique_helper;
use crate::prelude::*;
#[cfg(feature = "row_hash")]
use crate::utils::split_df;
use crate::utils::{slice_offsets, try_get_supertype, NoNull};
use crate::utils::{slice_offsets, try_get_supertype, Container, NoNull};

#[cfg(feature = "dataframe_arithmetic")]
mod arithmetic;
Expand Down Expand Up @@ -526,10 +526,15 @@ impl DataFrame {
/// Aggregate all the chunks in the DataFrame to a single chunk in parallel.
/// This may lead to more peak memory consumption.
pub fn as_single_chunk_par(&mut self) -> &mut Self {
self.as_single_chunk();
// if self.columns.iter().any(|s| s.n_chunks() > 1) {
// self.columns = self._apply_columns_par(&|s| s.rechunk());
// }
if self.columns.iter().any(|c| {
if let Column::Series(s) = c {
s.n_chunks() > 1
} else {
false
}
}) {
self.columns = self._apply_columns_par(&|s| s.rechunk());
}
self
}

Expand All @@ -552,13 +557,13 @@ impl DataFrame {
None => false,
Some(first_column_chunk_lengths) => {
// Fast Path for single Chunk Series
if first_column_chunk_lengths.len() == 1 {
return chunk_lengths.any(|cl| cl.len() != 1);
if first_column_chunk_lengths.size_hint().0 == 1 {
return chunk_lengths.any(|cl| cl.size_hint().0 != 1);
}
// Always rechunk if we have more chunks than rows.
// except when we have an empty df containing a single chunk
let height = self.height();
let n_chunks = first_column_chunk_lengths.len();
let n_chunks = first_column_chunk_lengths.size_hint().0;
if n_chunks > height && !(height == 0 && n_chunks == 1) {
return true;
}
Expand All @@ -575,14 +580,22 @@ impl DataFrame {
}

/// Ensure all the chunks in the [`DataFrame`] are aligned.
pub fn align_chunks(&mut self) -> &mut Self {
pub fn align_chunks_par(&mut self) -> &mut Self {
if self.should_rechunk() {
self.as_single_chunk_par()
} else {
self
}
}

pub fn align_chunks(&mut self) -> &mut Self {
if self.should_rechunk() {
self.as_single_chunk()
} else {
self
}
}

/// Get the [`DataFrame`] schema.
///
/// # Example
Expand Down Expand Up @@ -903,7 +916,7 @@ impl DataFrame {

/// Concatenate a [`DataFrame`] to this [`DataFrame`] and return as newly allocated [`DataFrame`].
///
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`].
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`].
///
/// # Example
///
Expand Down Expand Up @@ -949,7 +962,7 @@ impl DataFrame {

/// Concatenate a [`DataFrame`] to this [`DataFrame`]
///
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`].
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`].
///
/// # Example
///
Expand Down Expand Up @@ -1012,7 +1025,7 @@ impl DataFrame {

/// Concatenate a [`DataFrame`] to this [`DataFrame`]
///
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`].
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`].
///
/// # Panics
/// Panics if the schema's don't match.
Expand All @@ -1038,7 +1051,7 @@ impl DataFrame {
///
/// Prefer `vstack` over `extend` when you want to append many times before doing a query. For instance
/// when you read in multiple files and when to store them in a single `DataFrame`. In the latter case, finish the sequence
/// of `append` operations with a [`rechunk`](Self::align_chunks).
/// of `append` operations with a [`rechunk`](Self::align_chunks_par).
pub fn extend(&mut self, other: &DataFrame) -> PolarsResult<()> {
polars_ensure!(
self.width() == other.width(),
Expand Down Expand Up @@ -2512,17 +2525,17 @@ impl DataFrame {
/// but we also don't want to rechunk here, as this operation is costly and would benefit the caller
/// as well.
pub fn iter_chunks(&self, compat_level: CompatLevel, parallel: bool) -> RecordBatchIter {
debug_assert!(!self.should_rechunk(), "expected equal chunks");
// If any of the columns is binview and we don't convert `compat_level` we allow parallelism
// as we must allocate arrow strings/binaries.
let parallel = if parallel && compat_level.0 >= 1 {
self.columns.len() > 1
&& self
.columns
.iter()
.any(|s| matches!(s.dtype(), DataType::String | DataType::Binary))
} else {
false
};
let must_convert = compat_level.0 == 0;
let parallel = parallel
&& must_convert
&& self.columns.len() > 1
&& self
.columns
.iter()
.any(|s| matches!(s.dtype(), DataType::String | DataType::Binary));

RecordBatchIter {
columns: &self.columns,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub fn split_df(df: &mut DataFrame, target: usize, strict: bool) -> Vec<DataFram
return vec![df.clone()];
}
// make sure that chunks are aligned.
df.align_chunks();
df.align_chunks_par();
split_df_as_ref(df, target, strict)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ where
compression: self.compression.map(|c| c.into()),
},
)?;
df.align_chunks();
df.align_chunks_par();
let iter = df.iter_chunks(self.compat_level, true);

for batch in iter {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ where
}

fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
df.align_chunks();
df.align_chunks_par();
let fields = df
.iter()
.map(|s| {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub(crate) fn chunk_df_for_writing(
row_group_size: usize,
) -> PolarsResult<Cow<DataFrame>> {
// ensures all chunks are aligned.
df.align_chunks();
df.align_chunks_par();

// Accumulate many small chunks to the row group size.
// See: #16403
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-pipe/src/executors/sinks/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl IOThread {
if let Some(partitions) = partitions {
for (part, mut df) in partitions.into_no_null_iter().zip(iter) {
df.shrink_to_fit();
df.align_chunks();
df.align_chunks_par();
let mut path = dir2.clone();
path.push(format!("{part}"));

Expand All @@ -194,7 +194,7 @@ impl IOThread {

for mut df in iter {
df.shrink_to_fit();
df.align_chunks();
df.align_chunks_par();
writer.write_batch(&df).unwrap();
}
writer.finish().unwrap();
Expand Down Expand Up @@ -252,13 +252,13 @@ impl IOThread {
partition_no: IdxSize,
mut df: DataFrame,
) {
df.shrink_to_fit();
df.align_chunks();
let count = self.thread_local_count.fetch_add(1, Ordering::Relaxed);
let mut path = self.dir.clone();
path.push(format!("{partition_no}"));

let _ = std::fs::create_dir(&path);
// thread local name we start with an underscore to ensure we don't get
// Thread local name we start with an underscore to ensure we don't get
// duplicates
path.push(format!("_{count}.ipc"));
let file = File::create(path).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-python/src/dataframe/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl PyDataFrame {

#[allow(clippy::wrong_self_convention)]
pub fn to_arrow(&mut self, compat_level: PyCompatLevel) -> PyResult<Vec<PyObject>> {
self.df.align_chunks();
self.df.align_chunks_par();
Python::with_gil(|py| {
let pyarrow = py.import_bound("pyarrow")?;
let names = self.df.get_column_names_str();
Expand Down Expand Up @@ -144,7 +144,7 @@ impl PyDataFrame {
py: Python<'py>,
requested_schema: Option<PyObject>,
) -> PyResult<Bound<'py, PyCapsule>> {
self.df.align_chunks();
self.df.align_chunks_par();
dataframe_to_stream(&self.df, py)
}
}

0 comments on commit d31bbf4

Please sign in to comment.