Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensure aligned chunks in OOC sort #19118

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-core/src/frame/chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl TryFrom<(RecordBatch, &ArrowSchema)> for DataFrame {

impl DataFrame {
pub fn split_chunks(&mut self) -> impl Iterator<Item = DataFrame> + '_ {
self.align_chunks();
self.align_chunks_par();

(0..self.n_chunks()).map(move |i| unsafe {
let columns = self
Expand Down
57 changes: 35 additions & 22 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::chunked_array::ops::unique::is_unique_helper;
use crate::prelude::*;
#[cfg(feature = "row_hash")]
use crate::utils::split_df;
use crate::utils::{slice_offsets, try_get_supertype, NoNull};
use crate::utils::{slice_offsets, try_get_supertype, Container, NoNull};

#[cfg(feature = "dataframe_arithmetic")]
mod arithmetic;
Expand Down Expand Up @@ -526,10 +526,15 @@ impl DataFrame {
/// Aggregate all the chunks in the DataFrame to a single chunk in parallel.
/// This may lead to more peak memory consumption.
pub fn as_single_chunk_par(&mut self) -> &mut Self {
self.as_single_chunk();
// if self.columns.iter().any(|s| s.n_chunks() > 1) {
// self.columns = self._apply_columns_par(&|s| s.rechunk());
// }
if self.columns.iter().any(|c| {
if let Column::Series(s) = c {
s.n_chunks() > 1
} else {
false
}
}) {
self.columns = self._apply_columns_par(&|s| s.rechunk());
}
self
}

Expand All @@ -552,13 +557,13 @@ impl DataFrame {
None => false,
Some(first_column_chunk_lengths) => {
// Fast Path for single Chunk Series
if first_column_chunk_lengths.len() == 1 {
return chunk_lengths.any(|cl| cl.len() != 1);
if first_column_chunk_lengths.size_hint().0 == 1 {
return chunk_lengths.any(|cl| cl.size_hint().0 != 1);
}
// Always rechunk if we have more chunks than rows.
// except when we have an empty df containing a single chunk
let height = self.height();
let n_chunks = first_column_chunk_lengths.len();
let n_chunks = first_column_chunk_lengths.size_hint().0;
if n_chunks > height && !(height == 0 && n_chunks == 1) {
return true;
}
Expand All @@ -575,14 +580,22 @@ impl DataFrame {
}

/// Ensure all the chunks in the [`DataFrame`] are aligned.
pub fn align_chunks(&mut self) -> &mut Self {
pub fn align_chunks_par(&mut self) -> &mut Self {
if self.should_rechunk() {
self.as_single_chunk_par()
} else {
self
}
}

pub fn align_chunks(&mut self) -> &mut Self {
if self.should_rechunk() {
self.as_single_chunk()
} else {
self
}
}

/// Get the [`DataFrame`] schema.
///
/// # Example
Expand Down Expand Up @@ -903,7 +916,7 @@ impl DataFrame {

/// Concatenate a [`DataFrame`] to this [`DataFrame`] and return as newly allocated [`DataFrame`].
///
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`].
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`].
///
/// # Example
///
Expand Down Expand Up @@ -949,7 +962,7 @@ impl DataFrame {

/// Concatenate a [`DataFrame`] to this [`DataFrame`]
///
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`].
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`].
///
/// # Example
///
Expand Down Expand Up @@ -1012,7 +1025,7 @@ impl DataFrame {

/// Concatenate a [`DataFrame`] to this [`DataFrame`]
///
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks`].
/// If many `vstack` operations are done, it is recommended to call [`DataFrame::align_chunks_par`].
///
/// # Panics
/// Panics if the schema's don't match.
Expand All @@ -1038,7 +1051,7 @@ impl DataFrame {
///
/// Prefer `vstack` over `extend` when you want to append many times before doing a query. For instance
/// when you read in multiple files and when to store them in a single `DataFrame`. In the latter case, finish the sequence
/// of `append` operations with a [`rechunk`](Self::align_chunks).
/// of `append` operations with a [`rechunk`](Self::align_chunks_par).
pub fn extend(&mut self, other: &DataFrame) -> PolarsResult<()> {
polars_ensure!(
self.width() == other.width(),
Expand Down Expand Up @@ -2512,17 +2525,17 @@ impl DataFrame {
/// but we also don't want to rechunk here, as this operation is costly and would benefit the caller
/// as well.
pub fn iter_chunks(&self, compat_level: CompatLevel, parallel: bool) -> RecordBatchIter {
debug_assert!(!self.should_rechunk(), "expected equal chunks");
// If any of the columns is binview and we don't convert `compat_level` we allow parallelism
// as we must allocate arrow strings/binaries.
let parallel = if parallel && compat_level.0 >= 1 {
self.columns.len() > 1
&& self
.columns
.iter()
.any(|s| matches!(s.dtype(), DataType::String | DataType::Binary))
} else {
false
};
let must_convert = compat_level.0 == 0;
let parallel = parallel
&& must_convert
&& self.columns.len() > 1
&& self
.columns
.iter()
.any(|s| matches!(s.dtype(), DataType::String | DataType::Binary));

RecordBatchIter {
columns: &self.columns,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub fn split_df(df: &mut DataFrame, target: usize, strict: bool) -> Vec<DataFram
return vec![df.clone()];
}
// make sure that chunks are aligned.
df.align_chunks();
df.align_chunks_par();
split_df_as_ref(df, target, strict)
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/ipc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ where
compression: self.compression.map(|c| c.into()),
},
)?;
df.align_chunks();
df.align_chunks_par();
let iter = df.iter_chunks(self.compat_level, true);

for batch in iter {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ where
}

fn finish(&mut self, df: &mut DataFrame) -> PolarsResult<()> {
df.align_chunks();
df.align_chunks_par();
let fields = df
.iter()
.map(|s| {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/utils/other.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub(crate) fn chunk_df_for_writing(
row_group_size: usize,
) -> PolarsResult<Cow<DataFrame>> {
// ensures all chunks are aligned.
df.align_chunks();
df.align_chunks_par();

// Accumulate many small chunks to the row group size.
// See: #16403
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-pipe/src/executors/sinks/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl IOThread {
if let Some(partitions) = partitions {
for (part, mut df) in partitions.into_no_null_iter().zip(iter) {
df.shrink_to_fit();
df.align_chunks();
df.align_chunks_par();
let mut path = dir2.clone();
path.push(format!("{part}"));

Expand All @@ -194,7 +194,7 @@ impl IOThread {

for mut df in iter {
df.shrink_to_fit();
df.align_chunks();
df.align_chunks_par();
writer.write_batch(&df).unwrap();
}
writer.finish().unwrap();
Expand Down Expand Up @@ -252,13 +252,13 @@ impl IOThread {
partition_no: IdxSize,
mut df: DataFrame,
) {
df.shrink_to_fit();
df.align_chunks();
let count = self.thread_local_count.fetch_add(1, Ordering::Relaxed);
let mut path = self.dir.clone();
path.push(format!("{partition_no}"));

let _ = std::fs::create_dir(&path);
// thread local name we start with an underscore to ensure we don't get
// Thread local name we start with an underscore to ensure we don't get
// duplicates
path.push(format!("_{count}.ipc"));
let file = File::create(path).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-python/src/dataframe/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl PyDataFrame {

#[allow(clippy::wrong_self_convention)]
pub fn to_arrow(&mut self, compat_level: PyCompatLevel) -> PyResult<Vec<PyObject>> {
self.df.align_chunks();
self.df.align_chunks_par();
Python::with_gil(|py| {
let pyarrow = py.import_bound("pyarrow")?;
let names = self.df.get_column_names_str();
Expand Down Expand Up @@ -144,7 +144,7 @@ impl PyDataFrame {
py: Python<'py>,
requested_schema: Option<PyObject>,
) -> PyResult<Bound<'py, PyCapsule>> {
self.df.align_chunks();
self.df.align_chunks_par();
dataframe_to_stream(&self.df, py)
}
}
Loading