Skip to content

Commit

Permalink
perf: parallel linearize in new streaming engine (#17050)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Jun 24, 2024
1 parent f7ff2ef commit c8166a8
Show file tree
Hide file tree
Showing 13 changed files with 196 additions and 57 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
.yarn/
coverage.lcov
coverage.xml
profile.json
polars/vendor

# OS
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/polars-stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pin-project-lite = { workspace = true }
polars-io = { workspace = true, features = ["async"] }
polars-utils = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
recursive = { workspace = true }
slotmap = { workspace = true }

Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ mod graph;
mod morsel;
mod nodes;
mod physical_plan;
mod utils;
11 changes: 11 additions & 0 deletions crates/polars-stream/src/morsel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ impl MorselSeq {
pub fn new(seq: u64) -> Self {
Self(seq.checked_mul(2).unwrap())
}

// The morsel sequence id which comes after this morsel.
pub fn successor(self) -> Self {
// We increment by two because in the future we want to use the least
// significant bit to indicate the final morsel with that sequence id.
Self(self.0.checked_add(2).unwrap())
}

pub fn to_u64(self) -> u64 {
self.0
}
}

pub struct Morsel {
Expand Down
11 changes: 9 additions & 2 deletions crates/polars-stream/src/nodes/filter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use polars_error::PolarsResult;
use polars_error::{polars_err, PolarsResult};
use polars_expr::prelude::PhysicalExpr;
use polars_expr::state::ExecutionState;

Expand Down Expand Up @@ -35,7 +35,14 @@ impl ComputeNode for FilterNode {
while let Ok(morsel) = recv.recv().await {
let morsel = morsel.try_map(|df| {
let mask = self.predicate.evaluate(&df, state)?;
df.filter(mask.bool().unwrap())
let mask = mask.bool().map_err(|_| {
polars_err!(
ComputeError: "filter predicate must be of type `Boolean`, got `{}`", mask.dtype()
)
})?;

// We already parallelize, call the sequential filter.
df._filter_seq(mask)
})?;

if morsel.df().is_empty() {
Expand Down
67 changes: 13 additions & 54 deletions crates/polars-stream/src/nodes/in_memory_sink.rs
Original file line number Diff line number Diff line change
@@ -1,49 +1,30 @@
use std::cmp::Ordering;
use std::cmp::Reverse;
use std::collections::{BinaryHeap, VecDeque};

use parking_lot::Mutex;
use polars_core::frame::DataFrame;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_core::utils::rayon::iter::{IntoParallelIterator, ParallelIterator};
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_expr::state::ExecutionState;
use polars_utils::priority::Priority;
use polars_utils::sync::SyncPtr;

use super::ComputeNode;
use crate::async_executor::{JoinHandle, TaskScope};
use crate::async_primitives::pipe::{Receiver, Sender};
use crate::morsel::Morsel;

struct KMergeMorsel(Morsel, usize);

impl Eq for KMergeMorsel {}

impl Ord for KMergeMorsel {
fn cmp(&self, other: &Self) -> Ordering {
// Intentionally reverse order, BinaryHeap is a max-heap but we want the
// smallest sequence number.
other.0.seq().cmp(&self.0.seq())
}
}

impl PartialOrd for KMergeMorsel {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl PartialEq for KMergeMorsel {
fn eq(&self, other: &Self) -> bool {
self.0.seq() == other.0.seq()
}
}
use crate::utils::in_memory_linearize::linearize;

#[derive(Default)]
pub struct InMemorySink {
per_pipe_morsels: Mutex<Vec<VecDeque<Morsel>>>,
morsels_per_pipe: Mutex<Vec<Vec<Morsel>>>,
}

impl ComputeNode for InMemorySink {
fn initialize(&mut self, _num_pipelines: usize) {
self.per_pipe_morsels.get_mut().clear();
self.morsels_per_pipe.get_mut().clear();
}

fn spawn<'env, 's>(
Expand All @@ -58,42 +39,20 @@ impl ComputeNode for InMemorySink {
let [mut recv] = <[_; 1]>::try_from(recv).ok().unwrap();

scope.spawn_task(true, async move {
let mut morsels = VecDeque::new();
let mut morsels = Vec::new();
while let Ok(mut morsel) = recv.recv().await {
morsel.take_consume_token();
morsels.push_back(morsel);
morsels.push(morsel);
}

self.per_pipe_morsels.lock().push(morsels);
self.morsels_per_pipe.lock().push(morsels);
Ok(())
})
}

fn finalize(&mut self) -> PolarsResult<Option<DataFrame>> {
// Do a K-way merge on the morsels based on sequence id.
let mut per_pipe_morsels = core::mem::take(&mut *self.per_pipe_morsels.get_mut());
let mut dataframes = Vec::with_capacity(per_pipe_morsels.iter().map(|p| p.len()).sum());

let mut kmerge = BinaryHeap::new();
for (pipe_idx, pipe) in per_pipe_morsels.iter_mut().enumerate() {
if let Some(morsel) = pipe.pop_front() {
kmerge.push(KMergeMorsel(morsel, pipe_idx));
}
}

while let Some(KMergeMorsel(morsel, pipe_idx)) = kmerge.pop() {
let seq = morsel.seq();
dataframes.push(morsel.into_df());
while let Some(new_morsel) = per_pipe_morsels[pipe_idx].pop_front() {
if new_morsel.seq() == seq {
dataframes.push(new_morsel.into_df());
} else {
kmerge.push(KMergeMorsel(new_morsel, pipe_idx));
break;
}
}
}

let mut morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut());
let dataframes = linearize(morsels_per_pipe);
Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes)))
}
}
124 changes: 124 additions & 0 deletions crates/polars-stream/src/utils/in_memory_linearize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::cmp::Reverse;
use std::collections::BinaryHeap;

use polars_core::frame::DataFrame;
use polars_core::POOL;
use polars_utils::priority::Priority;
use polars_utils::sync::SyncPtr;

use crate::morsel::Morsel;

/// Amount of morsels we need to consider spawning a thread during linearization.
const MORSELS_PER_THREAD: usize = 256;

/// Given a Vec<Morsel> for each pipe, it will output a vec of the contained dataframes.
/// If the morsels are ordered by their sequence ids within each vec, and no
/// sequence ID occurs in multiple vecs, the output will follow the same order globally.
pub fn linearize(mut morsels_per_pipe: Vec<Vec<Morsel>>) -> Vec<DataFrame> {
let num_morsels: usize = morsels_per_pipe.iter().map(|p| p.len()).sum();
if num_morsels == 0 {
return vec![];
}

let n_threads = num_morsels
.div_ceil(MORSELS_PER_THREAD)
.min(POOL.current_num_threads()) as u64;

// Partitioning based on sequence number.
let max_seq = morsels_per_pipe
.iter()
.flat_map(|p| p.iter().map(|m| m.seq().to_u64()))
.max()
.unwrap();
let seqs_per_thread = (max_seq + 1).div_ceil(n_threads);

let morsels_per_p = &morsels_per_pipe;
let mut dataframes: Vec<DataFrame> = Vec::with_capacity(num_morsels);
let mut dataframes_ptr = unsafe { SyncPtr::new(dataframes.as_mut_ptr()) };
rayon::scope(|s| {
let mut out_offset = 0;
let mut stop_idx_per_pipe = vec![0; morsels_per_p.len()];
for t in 0..n_threads {
// This thread will handle all morsels with sequence id
// [t * seqs_per_thread, (t + 1) * seqs_per_threads).
// Compute per pipe the slice that covers this range, re-using
// the stop indices from the previous thread as our starting indices.
let this_thread_out_offset = out_offset;
let partition_max_seq = (t + 1) * seqs_per_thread;
let cur_idx_per_pipe = stop_idx_per_pipe;
stop_idx_per_pipe = Vec::with_capacity(morsels_per_p.len());
for p in 0..morsels_per_p.len() {
let stop_idx =
morsels_per_p[p].partition_point(|m| m.seq().to_u64() < partition_max_seq);
assert!(stop_idx >= cur_idx_per_pipe[p]);
out_offset += stop_idx - cur_idx_per_pipe[p];
stop_idx_per_pipe.push(stop_idx);
}

{
let stop_idx_per_pipe = stop_idx_per_pipe.clone();
s.spawn(move |_| unsafe {
fill_partition(
morsels_per_p,
cur_idx_per_pipe,
&stop_idx_per_pipe,
dataframes_ptr.get().add(this_thread_out_offset),
)
});
}
}
});

// SAFETY: all partitions were handled, so dataframes is full filled and
// morsels_per_pipe fully consumed.
unsafe {
for morsels in morsels_per_pipe.iter_mut() {
morsels.set_len(0);
}
dataframes.set_len(num_morsels);
}
dataframes
}

unsafe fn fill_partition(
morsels_per_pipe: &[Vec<Morsel>],
mut cur_idx_per_pipe: Vec<usize>,
stop_idx_per_pipe: &[usize],
mut out_ptr: *mut DataFrame,
) {
// K-way merge, initialize priority queue with one element per pipe.
let mut kmerge = BinaryHeap::with_capacity(morsels_per_pipe.len());
for (p, morsels) in morsels_per_pipe.iter().enumerate() {
if cur_idx_per_pipe[p] != stop_idx_per_pipe[p] {
let seq = morsels[cur_idx_per_pipe[p]].seq();
kmerge.push(Priority(Reverse(seq), p));
}
}

// While the merge queue isn't empty, keep copying elements into the output.
unsafe {
while let Some(Priority(Reverse(mut seq), p)) = kmerge.pop() {
// Write the next morsel from this pipe to the output.
let morsels = &morsels_per_pipe[p];
let cur_idx = &mut cur_idx_per_pipe[p];
core::ptr::copy_nonoverlapping(morsels[*cur_idx].df(), out_ptr, 1);
out_ptr = out_ptr.add(1);
*cur_idx += 1;

// Handle next element from this pipe.
while *cur_idx != stop_idx_per_pipe[p] {
let new_seq = morsels[*cur_idx].seq();
if new_seq <= seq.successor() {
// New sequence number is the same, or a direct successor, can output immediately.
core::ptr::copy_nonoverlapping(morsels[*cur_idx].df(), out_ptr, 1);
out_ptr = out_ptr.add(1);
*cur_idx += 1;
seq = new_seq;
} else {
kmerge.push(Priority(Reverse(new_seq), p));
break;
}
}
}
}
}
1 change: 1 addition & 0 deletions crates/polars-stream/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod in_memory_linearize;
1 change: 1 addition & 0 deletions crates/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub mod hashing;
pub mod idx_vec;
pub mod mem;
pub mod min_max;
pub mod priority;
pub mod slice;
pub mod sort;
pub mod sync;
Expand Down
25 changes: 25 additions & 0 deletions crates/polars-utils/src/priority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::cmp::Ordering;

/// A pair which is ordered exclusively by the first element.
#[derive(Copy, Clone, Debug)]
pub struct Priority<P, T>(pub P, pub T);

impl<P: Ord + Eq, T> Ord for Priority<P, T> {
fn cmp(&self, other: &Self) -> Ordering {
self.0.cmp(&other.0)
}
}

impl<P: Ord + Eq, T> PartialOrd for Priority<P, T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<P: Eq, T> PartialEq for Priority<P, T> {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}

impl<P: Eq, T> Eq for Priority<P, T> {}
8 changes: 7 additions & 1 deletion crates/polars-utils/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// Utility that allows use to send pointers to another thread.
/// This is better than going through `usize` as MIRI can follow these.
#[derive(Copy, Clone, Debug)]
#[derive(Debug)]
#[repr(transparent)]
pub struct SyncPtr<T>(*mut T);

Expand Down Expand Up @@ -37,6 +37,12 @@ impl<T> SyncPtr<T> {
}
}

impl<T> Copy for SyncPtr<T> {}
impl<T> Clone for SyncPtr<T> {
fn clone(&self) -> SyncPtr<T> {
*self
}
}
unsafe impl<T> Sync for SyncPtr<T> {}
unsafe impl<T> Send for SyncPtr<T> {}

Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ all = [
"sql",
"binary_encoding",
"ffi_plugin",
# "new_streaming",
]

# we cannot conditionally activate simd
Expand Down

0 comments on commit c8166a8

Please sign in to comment.