Skip to content

Commit

Permalink
perf: improve join performance through radix partitioned join (pola-r…
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Nov 7, 2023
1 parent dc10eb9 commit b11d3a6
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 49 deletions.
1 change: 1 addition & 0 deletions crates/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ impl<T> Deref for Wrap<T> {
}
}

#[inline(always)]
pub fn _set_partition_size() -> usize {
POOL.current_num_threads()
}
Expand Down
13 changes: 3 additions & 10 deletions crates/polars-ops/src/frame/join/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,11 @@ pub fn default_join_ids() -> ChunkJoinOptIds {
macro_rules! det_hash_prone_order {
($self:expr, $other:expr) => {{
// The shortest relation will be used to create a hash table.
let left_first = $self.len() > $other.len();
let a;
let b;
if left_first {
a = $self;
b = $other;
if $self.len() > $other.len() {
($self, $other, false)
} else {
b = $self;
a = $other;
($other, $self, true)
}

(a, b, !left_first)
}};
}

Expand Down
160 changes: 126 additions & 34 deletions crates/polars-ops/src/frame/join/hash_join/single_keys.rs
Original file line number Diff line number Diff line change
@@ -1,53 +1,145 @@
use polars_utils::sync::SyncPtr;

use super::*;

// FIXME: we should compute the number of threads / partition size we'll use.
// let avail_threads = POOL.current_num_threads();
// let n_threads = (num_keys / MIN_ELEMS_PER_THREAD).clamp(1, avail_threads);
// Use a small element per thread threshold for debugging/testing purposes.
const MIN_ELEMS_PER_THREAD: usize = if cfg!(debug_assertions) { 1 } else { 128 };

pub(crate) fn build_tables<T, I>(keys: Vec<I>) -> Vec<PlHashMap<T, Vec<IdxSize>>>
where
T: Send + Hash + Eq + Sync + Copy + AsU64,
I: IntoIterator<Item = T> + Send + Sync + Clone,
// <I as IntoIterator>::IntoIter: TrustedLen,
{
let n_partitions = _set_partition_size();
// FIXME: change interface to split the input here, instead of taking
// pre-split input iterators.
let n_partitions = keys.len();
let n_threads = n_partitions;
let num_keys_est: usize = keys
.iter()
.map(|k| k.clone().into_iter().size_hint().0)
.sum();

// Don't bother parallelizing anything for small inputs.
if num_keys_est < 2 * MIN_ELEMS_PER_THREAD {
let mut hm: PlHashMap<T, Vec<IdxSize>> = PlHashMap::new();
let mut offset = 0;
for it in keys {
for k in it {
hm.entry(k).or_default().push(offset);
offset += 1;
}
}
return vec![hm];
}

// We will create a hashtable in every thread.
// We use the hash to partition the keys to the matching hashtable.
// Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition.
POOL.install(|| {
(0..n_partitions)
.into_par_iter()
.map(|partition_no| {
let partition_no = partition_no as u64;
// Compute the number of elements in each partition for each portion.
let per_thread_partition_sizes: Vec<Vec<usize>> = keys
.par_iter()
.map(|key_portion| {
let mut partition_sizes = vec![0; n_partitions];
for key in key_portion.clone() {
let h = key.as_u64();
let p = hash_to_partition(h, n_partitions);
unsafe {
*partition_sizes.get_unchecked_mut(p) += 1;
}
}
partition_sizes
})
.collect();

let mut hash_tbl: PlHashMap<T, Vec<IdxSize>> =
PlHashMap::with_capacity(_HASHMAP_INIT_SIZE);
// Compute output offsets with a cumulative sum.
let mut per_thread_partition_offsets = vec![0; n_partitions * n_threads + 1];
let mut partition_offsets = vec![0; n_partitions + 1];
let mut cum_offset = 0;
for p in 0..n_partitions {
partition_offsets[p] = cum_offset;
for t in 0..n_threads {
per_thread_partition_offsets[t * n_partitions + p] = cum_offset;
cum_offset += per_thread_partition_sizes[t][p];
}
}
let num_keys = cum_offset;
per_thread_partition_offsets[n_threads * n_partitions] = num_keys;
partition_offsets[n_partitions] = num_keys;

let n_partitions = n_partitions as u64;
let mut offset = 0;
for keys in &keys {
let keys = keys.clone().into_iter();
let len = keys.size_hint().1.unwrap() as IdxSize;
// FIXME: we wouldn't need this if we changed our interface to split the
// input in this function, instead of taking a vec of iterators.
let mut per_thread_input_offsets = vec![0; n_partitions];
cum_offset = 0;
for t in 0..n_threads {
per_thread_input_offsets[t] = cum_offset;
for p in 0..n_partitions {
cum_offset += per_thread_partition_sizes[t][p];
}
}

let mut cnt = 0;
keys.for_each(|k| {
let idx = cnt + offset;
cnt += 1;
// Scatter values into partitions.
let mut scatter_keys: Vec<T> = Vec::with_capacity(num_keys);
let mut scatter_idxs: Vec<IdxSize> = Vec::with_capacity(num_keys);
let scatter_keys_ptr = unsafe { SyncPtr::new(scatter_keys.as_mut_ptr()) };
let scatter_idxs_ptr = unsafe { SyncPtr::new(scatter_idxs.as_mut_ptr()) };
keys.into_par_iter()
.enumerate()
.for_each(|(t, key_portion)| {
let mut partition_offsets =
per_thread_partition_offsets[t * n_partitions..(t + 1) * n_partitions].to_vec();
for (i, key) in key_portion.into_iter().enumerate() {
unsafe {
let p = hash_to_partition(key.as_u64(), n_partitions);
let off = partition_offsets.get_unchecked_mut(p);
*scatter_keys_ptr.get().add(*off) = key;
*scatter_idxs_ptr.get().add(*off) =
(per_thread_input_offsets[t] + i) as IdxSize;
*off += 1;
}
}
});
unsafe {
scatter_keys.set_len(num_keys);
scatter_idxs.set_len(num_keys);
}

if this_partition(k.as_u64(), partition_no, n_partitions) {
let entry = hash_tbl.entry(k);
// Build tables.
(0..n_partitions)
.into_par_iter()
.map(|p| {
// Resizing the hash map is very, very expensive. That's why we
// adopt a hybrid strategy: we assume an initially small hash
// map, which would satisfy a highly skewed relation. If this
// fills up we immediately reserve enough for a full cardinality
// data set.
let partition_range = partition_offsets[p]..partition_offsets[p + 1];
let full_size = partition_range.len();
let mut conservative_size = _HASHMAP_INIT_SIZE.max(full_size / 64);
let mut hm: PlHashMap<T, Vec<IdxSize>> =
PlHashMap::with_capacity(conservative_size);

match entry {
Entry::Vacant(entry) => {
entry.insert(vec![idx]);
},
Entry::Occupied(mut entry) => {
let v = entry.get_mut();
v.push(idx);
},
}
unsafe {
for i in partition_range {
if hm.len() == conservative_size {
hm.reserve(full_size - conservative_size);
conservative_size = 0; // Hack to ensure we never hit this branch again.
}
});
offset += len;

let key = *scatter_keys.get_unchecked(i);
let idx = *scatter_idxs.get_unchecked(i);
match hm.entry(key) {
Entry::Occupied(mut o) => {
o.get_mut().push(idx as IdxSize);
},
Entry::Vacant(v) => {
v.insert(vec![idx as IdxSize]);
},
};
}
}
hash_tbl

hm
})
.collect()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(super) fn hash_join_tuples_inner<T, I>(
validate: JoinValidation,
) -> PolarsResult<(Vec<IdxSize>, Vec<IdxSize>)>
where
I: IntoIterator<Item = T> + Send + Sync + Copy,
I: IntoIterator<Item = T> + Send + Sync + Clone,
// <I as IntoIterator>::IntoIter: TrustedLen,
T: Send + Hash + Eq + Sync + Copy + AsU64,
{
Expand All @@ -49,7 +49,7 @@ where
let hash_tbls = if validate.needs_checks() {
let expected_size = build
.iter()
.map(|v| v.into_iter().size_hint().1.unwrap())
.map(|v| v.clone().into_iter().size_hint().1.unwrap())
.sum();
let hash_tbls = build_tables(build);
let build_size = hash_tbls.iter().map(|m| m.len()).sum();
Expand Down
2 changes: 0 additions & 2 deletions crates/polars-ops/src/frame/join/merge_sorted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,6 @@ where
for a in &mut a_iter {
current_a = a;
if a <= current_b {
// safety
// we pre-allocated enough
out.push(A_INDICATOR);
continue;
}
Expand Down
14 changes: 14 additions & 0 deletions crates/polars-utils/src/functions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
use std::hash::{BuildHasher, Hash};
use std::ops::Range;

// The ith portion of a range split in k (as equal as possible) parts.
#[inline(always)]
pub fn range_portion(i: usize, k: usize, r: Range<usize>) -> Range<usize> {
// Each portion having size n / k leaves n % k elements unaccounted for.
// Make the first n % k portions have 1 extra element.
let n = r.len();
let base_size = n / k;
let num_one_larger = n % k;
let num_before = base_size * i + i.min(num_one_larger);
let our_size = base_size + (i < num_one_larger) as usize;
r.start + num_before..r.start + num_before + our_size
}

// Faster than collecting from a flattened iterator.
pub fn flatten<T: Clone, R: AsRef<[T]>>(bufs: &[R], len: Option<usize>) -> Vec<T> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod cache;
pub mod cell;
pub mod contention_pool;
mod error;
mod functions;
pub mod functions;
pub mod mem;
pub mod slice;
pub mod sort;
Expand Down

0 comments on commit b11d3a6

Please sign in to comment.