Skip to content

Commit

Permalink
feat: make ooc sort configurable (pola-rs#15084)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Mar 15, 2024
1 parent 4f069f5 commit 5d449cc
Showing 1 changed file with 16 additions and 5 deletions.
21 changes: 16 additions & 5 deletions crates/polars-pipe/src/executors/sinks/sort/ooc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,14 @@ pub(super) fn sort_ooc(
ooc_start: Instant,
) -> PolarsResult<FinalizedSink> {
let now = Instant::now();
let multithreaded_partition = std::env::var("POLARS_OOC_SORT_PAR_PARTITION").is_ok();
let spill_size = std::env::var("POLARS_OOC_SORT_SPILL_SIZE")
.map(|v| v.parse::<usize>().expect("integer"))
.unwrap_or(1 << 26);
let samples = samples.to_physical_repr().into_owned();
let spill_size = std::cmp::min(
memtrack.get_available_latest() / (samples.len() * 3),
1 << 26,
spill_size,
);

// we collect as I am not sure that if we write to the same directory the
Expand Down Expand Up @@ -167,7 +171,8 @@ pub(super) fn sort_ooc(
let assigned_parts = det_partitions(sort_col, &samples, descending);

// partition the dataframe into proper buckets
let (iter, unique_assigned_parts) = partition_df(df, &assigned_parts)?;
let (iter, unique_assigned_parts) =
partition_df(df, &assigned_parts, multithreaded_partition)?;
for (part, df) in unique_assigned_parts.into_no_null_iter().zip(iter) {
if let Some(df) = partitions_spiller.push(part as usize, df) {
io_thread.dump_partition_local(part, df)
Expand Down Expand Up @@ -227,16 +232,22 @@ fn det_partitions(s: &Series, partitions: &Series, descending: bool) -> IdxCa {
search_sorted(partitions, &s, SearchSortedSide::Any, descending).unwrap()
}

fn partition_df(df: DataFrame, partitions: &IdxCa) -> PolarsResult<(DfIter, IdxCa)> {
let groups = partitions.group_tuples(true, false)?;
fn partition_df(
df: DataFrame,
partitions: &IdxCa,
multithreaded: bool,
) -> PolarsResult<(DfIter, IdxCa)> {
let groups = partitions.group_tuples(multithreaded, false)?;
let partitions = unsafe { partitions.clone().into_series().agg_first(&groups) };
let partitions = partitions.idx().unwrap().clone();

let out = match groups {
GroupsProxy::Idx(idx) => {
let iter = idx.into_iter().map(move |(_, group)| {
// groups are in bounds and sorted
unsafe { df._take_unchecked_slice_sorted(&group, true, IsSorted::Ascending) }
unsafe {
df._take_unchecked_slice_sorted(&group, multithreaded, IsSorted::Ascending)
}
});
Box::new(iter) as DfIter
},
Expand Down

0 comments on commit 5d449cc

Please sign in to comment.