Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Parallel linearize in new streaming engine #17050

Merged
merged 4 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
.yarn/
coverage.lcov
coverage.xml
profile.json
polars/vendor

# OS
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/polars-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pin-project-lite = { workspace = true }
polars-io = { workspace = true, features = ["async"] }
polars-utils = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
recursive = { workspace = true }
slotmap = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ mod graph;
mod morsel;
mod nodes;
mod physical_plan;
mod utils;
11 changes: 11 additions & 0 deletions crates/polars-stream/src/morsel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ impl MorselSeq {
pub fn new(seq: u64) -> Self {
Self(seq.checked_mul(2).unwrap())
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
}

// The morsel sequence id which comes after this morsel.
pub fn successor(self) -> Self {
// We increment by two because in the future we want to use the least
// significant bit to indicate the final morsel with that sequence id.
Self(self.0.checked_add(2).unwrap())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a comment why we have increments of 2?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what the comment on new is for, the idea is to indicate the 'last morsel with this sequence id' using the final bit, so the sequence 1, 2, 3, 3, 3, 4 can become 2, 4, 6, 6, 7, 8 and as soon as we see 7 we know there won't be any 6s or 7s anymore.

}

pub fn to_u64(self) -> u64 {
self.0
}
}

pub struct Morsel {
Expand Down
11 changes: 9 additions & 2 deletions crates/polars-stream/src/nodes/filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use polars_error::PolarsResult;
use polars_error::{polars_err, PolarsResult};
use polars_expr::prelude::PhysicalExpr;
use polars_expr::state::ExecutionState;

Expand Down Expand Up @@ -35,7 +35,14 @@ impl ComputeNode for FilterNode {
while let Ok(morsel) = recv.recv().await {
let morsel = morsel.try_map(|df| {
let mask = self.predicate.evaluate(&df, state)?;
df.filter(mask.bool().unwrap())
let mask = mask.bool().map_err(|_| {
polars_err!(
ComputeError: "filter predicate must be of type `Boolean`, got `{}`", mask.dtype()
)
})?;

// We already parallelize, call the sequential filter.
df._filter_seq(mask)
})?;

if morsel.df().is_empty() {
Expand Down
67 changes: 13 additions & 54 deletions crates/polars-stream/src/nodes/in_memory_sink.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,30 @@
use std::cmp::Ordering;
use std::cmp::Reverse;
use std::collections::{BinaryHeap, VecDeque};

use parking_lot::Mutex;
use polars_core::frame::DataFrame;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_core::utils::rayon::iter::{IntoParallelIterator, ParallelIterator};
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_expr::state::ExecutionState;
use polars_utils::priority::Priority;
use polars_utils::sync::SyncPtr;

use super::ComputeNode;
use crate::async_executor::{JoinHandle, TaskScope};
use crate::async_primitives::pipe::{Receiver, Sender};
use crate::morsel::Morsel;

struct KMergeMorsel(Morsel, usize);

impl Eq for KMergeMorsel {}

impl Ord for KMergeMorsel {
fn cmp(&self, other: &Self) -> Ordering {
// Intentionally reverse order, BinaryHeap is a max-heap but we want the
// smallest sequence number.
other.0.seq().cmp(&self.0.seq())
}
}

impl PartialOrd for KMergeMorsel {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl PartialEq for KMergeMorsel {
fn eq(&self, other: &Self) -> bool {
self.0.seq() == other.0.seq()
}
}
use crate::utils::in_memory_linearize::linearize;

#[derive(Default)]
pub struct InMemorySink {
per_pipe_morsels: Mutex<Vec<VecDeque<Morsel>>>,
morsels_per_pipe: Mutex<Vec<Vec<Morsel>>>,
}

impl ComputeNode for InMemorySink {
fn initialize(&mut self, _num_pipelines: usize) {
self.per_pipe_morsels.get_mut().clear();
self.morsels_per_pipe.get_mut().clear();
}

fn spawn<'env, 's>(
Expand All @@ -58,42 +39,20 @@ impl ComputeNode for InMemorySink {
let [mut recv] = <[_; 1]>::try_from(recv).ok().unwrap();

scope.spawn_task(true, async move {
let mut morsels = VecDeque::new();
let mut morsels = Vec::new();
while let Ok(mut morsel) = recv.recv().await {
morsel.take_consume_token();
morsels.push_back(morsel);
morsels.push(morsel);
}

self.per_pipe_morsels.lock().push(morsels);
self.morsels_per_pipe.lock().push(morsels);
Ok(())
})
}

fn finalize(&mut self) -> PolarsResult<Option<DataFrame>> {
// Do a K-way merge on the morsels based on sequence id.
let mut per_pipe_morsels = core::mem::take(&mut *self.per_pipe_morsels.get_mut());
let mut dataframes = Vec::with_capacity(per_pipe_morsels.iter().map(|p| p.len()).sum());

let mut kmerge = BinaryHeap::new();
for (pipe_idx, pipe) in per_pipe_morsels.iter_mut().enumerate() {
if let Some(morsel) = pipe.pop_front() {
kmerge.push(KMergeMorsel(morsel, pipe_idx));
}
}

while let Some(KMergeMorsel(morsel, pipe_idx)) = kmerge.pop() {
let seq = morsel.seq();
dataframes.push(morsel.into_df());
while let Some(new_morsel) = per_pipe_morsels[pipe_idx].pop_front() {
if new_morsel.seq() == seq {
dataframes.push(new_morsel.into_df());
} else {
kmerge.push(KMergeMorsel(new_morsel, pipe_idx));
break;
}
}
}

let mut morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut());
let dataframes = linearize(morsels_per_pipe);
Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes)))
}
}
124 changes: 124 additions & 0 deletions crates/polars-stream/src/utils/in_memory_linearize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::cmp::Reverse;
use std::collections::BinaryHeap;

use polars_core::frame::DataFrame;
use polars_core::POOL;
use polars_utils::priority::Priority;
use polars_utils::sync::SyncPtr;

use crate::morsel::Morsel;

/// Amount of morsels we need to consider spawning a thread during linearization.
const MORSELS_PER_THREAD: usize = 256;

/// Given a Vec<Morsel> for each pipe, it will output a vec of the contained dataframes.
/// If the morsels are ordered by their sequence ids within each vec, and no
/// sequence ID occurs in multiple vecs, the output will follow the same order globally.
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
pub fn linearize(mut morsels_per_pipe: Vec<Vec<Morsel>>) -> Vec<DataFrame> {
let num_morsels: usize = morsels_per_pipe.iter().map(|p| p.len()).sum();
if num_morsels == 0 {
return vec![];
}

let n_threads = num_morsels
.div_ceil(MORSELS_PER_THREAD)
.min(POOL.current_num_threads()) as u64;

// Partitioning based on sequence number.
let max_seq = morsels_per_pipe
.iter()
.flat_map(|p| p.iter().map(|m| m.seq().to_u64()))
.max()
.unwrap();
let seqs_per_thread = (max_seq + 1).div_ceil(n_threads);

let morsels_per_p = &morsels_per_pipe;
let mut dataframes: Vec<DataFrame> = Vec::with_capacity(num_morsels);
let mut dataframes_ptr = unsafe { SyncPtr::new(dataframes.as_mut_ptr()) };
rayon::scope(|s| {
let mut out_offset = 0;
let mut stop_idx_per_pipe = vec![0; morsels_per_p.len()];
for t in 0..n_threads {
// This thread will handle all morsels with sequence id
// [t * seqs_per_thread, (t + 1) * seqs_per_threads).
// Compute per pipe the slice that covers this range, re-using
// the stop indices from the previous thread as our starting indices.
let this_thread_out_offset = out_offset;
let partition_max_seq = (t + 1) * seqs_per_thread;
let cur_idx_per_pipe = stop_idx_per_pipe;
stop_idx_per_pipe = Vec::with_capacity(morsels_per_p.len());
for p in 0..morsels_per_p.len() {
let stop_idx =
morsels_per_p[p].partition_point(|m| m.seq().to_u64() < partition_max_seq);
assert!(stop_idx >= cur_idx_per_pipe[p]);
out_offset += stop_idx - cur_idx_per_pipe[p];
stop_idx_per_pipe.push(stop_idx);
}

{
let stop_idx_per_pipe = stop_idx_per_pipe.clone();
s.spawn(move |_| unsafe {
fill_partition(
morsels_per_p,
cur_idx_per_pipe,
&stop_idx_per_pipe,
dataframes_ptr.get().add(this_thread_out_offset),
)
});
}
}
});

// SAFETY: all partitions were handled, so dataframes is full filled and
// morsels_per_pipe fully consumed.
unsafe {
for morsels in morsels_per_pipe.iter_mut() {
morsels.set_len(0);
}
dataframes.set_len(num_morsels);
}
dataframes
}

unsafe fn fill_partition(
morsels_per_pipe: &[Vec<Morsel>],
mut cur_idx_per_pipe: Vec<usize>,
stop_idx_per_pipe: &[usize],
mut out_ptr: *mut DataFrame,
) {
// K-way merge, initialize priority queue with one element per pipe.
let mut kmerge = BinaryHeap::with_capacity(morsels_per_pipe.len());
for (p, morsels) in morsels_per_pipe.iter().enumerate() {
if cur_idx_per_pipe[p] != stop_idx_per_pipe[p] {
let seq = morsels[cur_idx_per_pipe[p]].seq();
kmerge.push(Priority(Reverse(seq), p));
}
}

// While the merge queue isn't empty, keep copying elements into the output.
unsafe {
while let Some(Priority(Reverse(mut seq), p)) = kmerge.pop() {
// Write the next morsel from this pipe to the output.
let morsels = &morsels_per_pipe[p];
let cur_idx = &mut cur_idx_per_pipe[p];
core::ptr::copy_nonoverlapping(morsels[*cur_idx].df(), out_ptr, 1);
out_ptr = out_ptr.add(1);
*cur_idx += 1;

// Handle next element from this pipe.
while *cur_idx != stop_idx_per_pipe[p] {
let new_seq = morsels[*cur_idx].seq();
if new_seq <= seq.successor() {
// New sequence number is the same, or a direct successor, can output immediately.
core::ptr::copy_nonoverlapping(morsels[*cur_idx].df(), out_ptr, 1);
out_ptr = out_ptr.add(1);
*cur_idx += 1;
seq = new_seq;
} else {
kmerge.push(Priority(Reverse(new_seq), p));
break;
}
}
}
}
}
1 change: 1 addition & 0 deletions crates/polars-stream/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod in_memory_linearize;
1 change: 1 addition & 0 deletions crates/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod hashing;
pub mod idx_vec;
pub mod mem;
pub mod min_max;
pub mod priority;
pub mod slice;
pub mod sort;
pub mod sync;
Expand Down
25 changes: 25 additions & 0 deletions crates/polars-utils/src/priority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::cmp::Ordering;

/// A pair which is ordered exclusively by the first element.
#[derive(Copy, Clone, Debug)]
pub struct Priority<P, T>(pub P, pub T);

impl<P: Ord + Eq, T> Ord for Priority<P, T> {
fn cmp(&self, other: &Self) -> Ordering {
self.0.cmp(&other.0)
}
}

impl<P: Ord + Eq, T> PartialOrd for Priority<P, T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<P: Eq, T> PartialEq for Priority<P, T> {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}

impl<P: Eq, T> Eq for Priority<P, T> {}
8 changes: 7 additions & 1 deletion crates/polars-utils/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// Utility that allows use to send pointers to another thread.
/// This is better than going through `usize` as MIRI can follow these.
#[derive(Copy, Clone, Debug)]
#[derive(Debug)]
#[repr(transparent)]
pub struct SyncPtr<T>(*mut T);

Expand Down Expand Up @@ -37,6 +37,12 @@ impl<T> SyncPtr<T> {
}
}

impl<T> Copy for SyncPtr<T> {}
impl<T> Clone for SyncPtr<T> {
fn clone(&self) -> SyncPtr<T> {
*self
}
}
unsafe impl<T> Sync for SyncPtr<T> {}
unsafe impl<T> Send for SyncPtr<T> {}

Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ all = [
"sql",
"binary_encoding",
"ffi_plugin",
# "new_streaming",
ritchie46 marked this conversation as resolved.
Show resolved Hide resolved
]

# we cannot conditionally activate simd
Expand Down
Loading