diff --git a/.gitignore b/.gitignore index e8cf99feefb5..4e34b437a91f 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,7 @@ .yarn/ coverage.lcov coverage.xml +profile.json polars/vendor # OS diff --git a/Cargo.lock b/Cargo.lock index f16e6517d5fb..b41cde9223bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3395,6 +3395,7 @@ dependencies = [ "polars-plan", "polars-utils", "rand", + "rayon", "recursive", "slotmap", "version_check", diff --git a/crates/polars-stream/Cargo.toml b/crates/polars-stream/Cargo.toml index 71c2560567e8..0146a0f3437c 100644 --- a/crates/polars-stream/Cargo.toml +++ b/crates/polars-stream/Cargo.toml @@ -17,6 +17,7 @@ pin-project-lite = { workspace = true } polars-io = { workspace = true, features = ["async"] } polars-utils = { workspace = true } rand = { workspace = true } +rayon = { workspace = true } recursive = { workspace = true } slotmap = { workspace = true } diff --git a/crates/polars-stream/src/lib.rs b/crates/polars-stream/src/lib.rs index 66111cc356b9..263935da38ce 100644 --- a/crates/polars-stream/src/lib.rs +++ b/crates/polars-stream/src/lib.rs @@ -13,3 +13,4 @@ mod graph; mod morsel; mod nodes; mod physical_plan; +mod utils; diff --git a/crates/polars-stream/src/morsel.rs b/crates/polars-stream/src/morsel.rs index f351e54558cc..6e0570128894 100644 --- a/crates/polars-stream/src/morsel.rs +++ b/crates/polars-stream/src/morsel.rs @@ -23,6 +23,17 @@ impl MorselSeq { pub fn new(seq: u64) -> Self { Self(seq.checked_mul(2).unwrap()) } + + // The morsel sequence id which comes after this morsel. + pub fn successor(self) -> Self { + // We increment by two because in the future we want to use the least + // significant bit to indicate the final morsel with that sequence id. + Self(self.0.checked_add(2).unwrap()) + } + + pub fn to_u64(self) -> u64 { + self.0 + } } pub struct Morsel { diff --git a/crates/polars-stream/src/nodes/filter.rs b/crates/polars-stream/src/nodes/filter.rs index 78e96f1027fe..6e8e0a5a10ea 100644 --- a/crates/polars-stream/src/nodes/filter.rs +++ b/crates/polars-stream/src/nodes/filter.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use polars_error::PolarsResult; +use polars_error::{polars_err, PolarsResult}; use polars_expr::prelude::PhysicalExpr; use polars_expr::state::ExecutionState; @@ -35,7 +35,14 @@ impl ComputeNode for FilterNode { while let Ok(morsel) = recv.recv().await { let morsel = morsel.try_map(|df| { let mask = self.predicate.evaluate(&df, state)?; - df.filter(mask.bool().unwrap()) + let mask = mask.bool().map_err(|_| { + polars_err!( + ComputeError: "filter predicate must be of type `Boolean`, got `{}`", mask.dtype() + ) + })?; + + // We already parallelize, call the sequential filter. + df._filter_seq(mask) })?; if morsel.df().is_empty() { diff --git a/crates/polars-stream/src/nodes/in_memory_sink.rs b/crates/polars-stream/src/nodes/in_memory_sink.rs index eb5dce413d0c..80919e01bbc5 100644 --- a/crates/polars-stream/src/nodes/in_memory_sink.rs +++ b/crates/polars-stream/src/nodes/in_memory_sink.rs @@ -1,49 +1,30 @@ -use std::cmp::Ordering; +use std::cmp::Reverse; use std::collections::{BinaryHeap, VecDeque}; use parking_lot::Mutex; use polars_core::frame::DataFrame; use polars_core::utils::accumulate_dataframes_vertical_unchecked; +use polars_core::utils::rayon::iter::{IntoParallelIterator, ParallelIterator}; +use polars_core::POOL; use polars_error::PolarsResult; use polars_expr::state::ExecutionState; +use polars_utils::priority::Priority; +use polars_utils::sync::SyncPtr; use super::ComputeNode; use crate::async_executor::{JoinHandle, TaskScope}; use crate::async_primitives::pipe::{Receiver, Sender}; use crate::morsel::Morsel; - -struct KMergeMorsel(Morsel, usize); - -impl Eq for KMergeMorsel {} - -impl Ord for KMergeMorsel { - fn cmp(&self, other: &Self) -> Ordering { - // Intentionally reverse order, BinaryHeap is a max-heap but we want the - // smallest sequence number. - other.0.seq().cmp(&self.0.seq()) - } -} - -impl PartialOrd for KMergeMorsel { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for KMergeMorsel { - fn eq(&self, other: &Self) -> bool { - self.0.seq() == other.0.seq() - } -} +use crate::utils::in_memory_linearize::linearize; #[derive(Default)] pub struct InMemorySink { - per_pipe_morsels: Mutex>>, + morsels_per_pipe: Mutex>>, } impl ComputeNode for InMemorySink { fn initialize(&mut self, _num_pipelines: usize) { - self.per_pipe_morsels.get_mut().clear(); + self.morsels_per_pipe.get_mut().clear(); } fn spawn<'env, 's>( @@ -58,42 +39,20 @@ impl ComputeNode for InMemorySink { let [mut recv] = <[_; 1]>::try_from(recv).ok().unwrap(); scope.spawn_task(true, async move { - let mut morsels = VecDeque::new(); + let mut morsels = Vec::new(); while let Ok(mut morsel) = recv.recv().await { morsel.take_consume_token(); - morsels.push_back(morsel); + morsels.push(morsel); } - self.per_pipe_morsels.lock().push(morsels); + self.morsels_per_pipe.lock().push(morsels); Ok(()) }) } fn finalize(&mut self) -> PolarsResult> { - // Do a K-way merge on the morsels based on sequence id. - let mut per_pipe_morsels = core::mem::take(&mut *self.per_pipe_morsels.get_mut()); - let mut dataframes = Vec::with_capacity(per_pipe_morsels.iter().map(|p| p.len()).sum()); - - let mut kmerge = BinaryHeap::new(); - for (pipe_idx, pipe) in per_pipe_morsels.iter_mut().enumerate() { - if let Some(morsel) = pipe.pop_front() { - kmerge.push(KMergeMorsel(morsel, pipe_idx)); - } - } - - while let Some(KMergeMorsel(morsel, pipe_idx)) = kmerge.pop() { - let seq = morsel.seq(); - dataframes.push(morsel.into_df()); - while let Some(new_morsel) = per_pipe_morsels[pipe_idx].pop_front() { - if new_morsel.seq() == seq { - dataframes.push(new_morsel.into_df()); - } else { - kmerge.push(KMergeMorsel(new_morsel, pipe_idx)); - break; - } - } - } - + let mut morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut()); + let dataframes = linearize(morsels_per_pipe); Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes))) } } diff --git a/crates/polars-stream/src/utils/in_memory_linearize.rs b/crates/polars-stream/src/utils/in_memory_linearize.rs new file mode 100644 index 000000000000..24aa9f596a74 --- /dev/null +++ b/crates/polars-stream/src/utils/in_memory_linearize.rs @@ -0,0 +1,124 @@ +use std::cmp::Reverse; +use std::collections::BinaryHeap; + +use polars_core::frame::DataFrame; +use polars_core::POOL; +use polars_utils::priority::Priority; +use polars_utils::sync::SyncPtr; + +use crate::morsel::Morsel; + +/// Amount of morsels we need to consider spawning a thread during linearization. +const MORSELS_PER_THREAD: usize = 256; + +/// Given a Vec for each pipe, it will output a vec of the contained dataframes. +/// If the morsels are ordered by their sequence ids within each vec, and no +/// sequence ID occurs in multiple vecs, the output will follow the same order globally. +pub fn linearize(mut morsels_per_pipe: Vec>) -> Vec { + let num_morsels: usize = morsels_per_pipe.iter().map(|p| p.len()).sum(); + if num_morsels == 0 { + return vec![]; + } + + let n_threads = num_morsels + .div_ceil(MORSELS_PER_THREAD) + .min(POOL.current_num_threads()) as u64; + + // Partitioning based on sequence number. + let max_seq = morsels_per_pipe + .iter() + .flat_map(|p| p.iter().map(|m| m.seq().to_u64())) + .max() + .unwrap(); + let seqs_per_thread = (max_seq + 1).div_ceil(n_threads); + + let morsels_per_p = &morsels_per_pipe; + let mut dataframes: Vec = Vec::with_capacity(num_morsels); + let mut dataframes_ptr = unsafe { SyncPtr::new(dataframes.as_mut_ptr()) }; + rayon::scope(|s| { + let mut out_offset = 0; + let mut stop_idx_per_pipe = vec![0; morsels_per_p.len()]; + for t in 0..n_threads { + // This thread will handle all morsels with sequence id + // [t * seqs_per_thread, (t + 1) * seqs_per_threads). + // Compute per pipe the slice that covers this range, re-using + // the stop indices from the previous thread as our starting indices. + let this_thread_out_offset = out_offset; + let partition_max_seq = (t + 1) * seqs_per_thread; + let cur_idx_per_pipe = stop_idx_per_pipe; + stop_idx_per_pipe = Vec::with_capacity(morsels_per_p.len()); + for p in 0..morsels_per_p.len() { + let stop_idx = + morsels_per_p[p].partition_point(|m| m.seq().to_u64() < partition_max_seq); + assert!(stop_idx >= cur_idx_per_pipe[p]); + out_offset += stop_idx - cur_idx_per_pipe[p]; + stop_idx_per_pipe.push(stop_idx); + } + + { + let stop_idx_per_pipe = stop_idx_per_pipe.clone(); + s.spawn(move |_| unsafe { + fill_partition( + morsels_per_p, + cur_idx_per_pipe, + &stop_idx_per_pipe, + dataframes_ptr.get().add(this_thread_out_offset), + ) + }); + } + } + }); + + // SAFETY: all partitions were handled, so dataframes is full filled and + // morsels_per_pipe fully consumed. + unsafe { + for morsels in morsels_per_pipe.iter_mut() { + morsels.set_len(0); + } + dataframes.set_len(num_morsels); + } + dataframes +} + +unsafe fn fill_partition( + morsels_per_pipe: &[Vec], + mut cur_idx_per_pipe: Vec, + stop_idx_per_pipe: &[usize], + mut out_ptr: *mut DataFrame, +) { + // K-way merge, initialize priority queue with one element per pipe. + let mut kmerge = BinaryHeap::with_capacity(morsels_per_pipe.len()); + for (p, morsels) in morsels_per_pipe.iter().enumerate() { + if cur_idx_per_pipe[p] != stop_idx_per_pipe[p] { + let seq = morsels[cur_idx_per_pipe[p]].seq(); + kmerge.push(Priority(Reverse(seq), p)); + } + } + + // While the merge queue isn't empty, keep copying elements into the output. + unsafe { + while let Some(Priority(Reverse(mut seq), p)) = kmerge.pop() { + // Write the next morsel from this pipe to the output. + let morsels = &morsels_per_pipe[p]; + let cur_idx = &mut cur_idx_per_pipe[p]; + core::ptr::copy_nonoverlapping(morsels[*cur_idx].df(), out_ptr, 1); + out_ptr = out_ptr.add(1); + *cur_idx += 1; + + // Handle next element from this pipe. + while *cur_idx != stop_idx_per_pipe[p] { + let new_seq = morsels[*cur_idx].seq(); + if new_seq <= seq.successor() { + // New sequence number is the same, or a direct successor, can output immediately. + core::ptr::copy_nonoverlapping(morsels[*cur_idx].df(), out_ptr, 1); + out_ptr = out_ptr.add(1); + *cur_idx += 1; + seq = new_seq; + } else { + kmerge.push(Priority(Reverse(new_seq), p)); + break; + } + } + } + } +} diff --git a/crates/polars-stream/src/utils/mod.rs b/crates/polars-stream/src/utils/mod.rs new file mode 100644 index 000000000000..fbb297b7e6da --- /dev/null +++ b/crates/polars-stream/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod in_memory_linearize; diff --git a/crates/polars-utils/src/lib.rs b/crates/polars-utils/src/lib.rs index fdbaf35cade6..9ae809f77458 100644 --- a/crates/polars-utils/src/lib.rs +++ b/crates/polars-utils/src/lib.rs @@ -15,6 +15,7 @@ pub mod hashing; pub mod idx_vec; pub mod mem; pub mod min_max; +pub mod priority; pub mod slice; pub mod sort; pub mod sync; diff --git a/crates/polars-utils/src/priority.rs b/crates/polars-utils/src/priority.rs new file mode 100644 index 000000000000..a249a25dcb32 --- /dev/null +++ b/crates/polars-utils/src/priority.rs @@ -0,0 +1,25 @@ +use std::cmp::Ordering; + +/// A pair which is ordered exclusively by the first element. +#[derive(Copy, Clone, Debug)] +pub struct Priority(pub P, pub T); + +impl Ord for Priority { + fn cmp(&self, other: &Self) -> Ordering { + self.0.cmp(&other.0) + } +} + +impl PartialOrd for Priority { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl PartialEq for Priority { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +impl Eq for Priority {} diff --git a/crates/polars-utils/src/sync.rs b/crates/polars-utils/src/sync.rs index e4257ac17b82..895b1290be31 100644 --- a/crates/polars-utils/src/sync.rs +++ b/crates/polars-utils/src/sync.rs @@ -1,6 +1,6 @@ /// Utility that allows use to send pointers to another thread. /// This is better than going through `usize` as MIRI can follow these. -#[derive(Copy, Clone, Debug)] +#[derive(Debug)] #[repr(transparent)] pub struct SyncPtr(*mut T); @@ -37,6 +37,12 @@ impl SyncPtr { } } +impl Copy for SyncPtr {} +impl Clone for SyncPtr { + fn clone(&self) -> SyncPtr { + *self + } +} unsafe impl Sync for SyncPtr {} unsafe impl Send for SyncPtr {} diff --git a/py-polars/Cargo.toml b/py-polars/Cargo.toml index 90cf6bea635d..364971ae9288 100644 --- a/py-polars/Cargo.toml +++ b/py-polars/Cargo.toml @@ -240,6 +240,7 @@ all = [ "sql", "binary_encoding", "ffi_plugin", + # "new_streaming", ] # we cannot conditionally activate simd