From b11d3a6a57a38284295f20849ce1862db40b317d Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Tue, 7 Nov 2023 19:39:42 +0100 Subject: [PATCH] perf: improve join performance through radix partitioned join (#12270) --- crates/polars-core/src/utils/mod.rs | 1 + .../src/frame/join/hash_join/mod.rs | 13 +- .../src/frame/join/hash_join/single_keys.rs | 160 ++++++++++++++---- .../frame/join/hash_join/single_keys_inner.rs | 4 +- .../polars-ops/src/frame/join/merge_sorted.rs | 2 - crates/polars-utils/src/functions.rs | 14 ++ crates/polars-utils/src/lib.rs | 2 +- 7 files changed, 147 insertions(+), 49 deletions(-) diff --git a/crates/polars-core/src/utils/mod.rs b/crates/polars-core/src/utils/mod.rs index 97368d41b650..8965fedee828 100644 --- a/crates/polars-core/src/utils/mod.rs +++ b/crates/polars-core/src/utils/mod.rs @@ -29,6 +29,7 @@ impl Deref for Wrap { } } +#[inline(always)] pub fn _set_partition_size() -> usize { POOL.current_num_threads() } diff --git a/crates/polars-ops/src/frame/join/hash_join/mod.rs b/crates/polars-ops/src/frame/join/hash_join/mod.rs index b6f4df2cba6e..a1fa3ca5ee12 100644 --- a/crates/polars-ops/src/frame/join/hash_join/mod.rs +++ b/crates/polars-ops/src/frame/join/hash_join/mod.rs @@ -42,18 +42,11 @@ pub fn default_join_ids() -> ChunkJoinOptIds { macro_rules! det_hash_prone_order { ($self:expr, $other:expr) => {{ // The shortest relation will be used to create a hash table. - let left_first = $self.len() > $other.len(); - let a; - let b; - if left_first { - a = $self; - b = $other; + if $self.len() > $other.len() { + ($self, $other, false) } else { - b = $self; - a = $other; + ($other, $self, true) } - - (a, b, !left_first) }}; } diff --git a/crates/polars-ops/src/frame/join/hash_join/single_keys.rs b/crates/polars-ops/src/frame/join/hash_join/single_keys.rs index 7fac84d9c758..e0ff9f819c6c 100644 --- a/crates/polars-ops/src/frame/join/hash_join/single_keys.rs +++ b/crates/polars-ops/src/frame/join/hash_join/single_keys.rs @@ -1,53 +1,145 @@ +use polars_utils::sync::SyncPtr; + use super::*; +// FIXME: we should compute the number of threads / partition size we'll use. +// let avail_threads = POOL.current_num_threads(); +// let n_threads = (num_keys / MIN_ELEMS_PER_THREAD).clamp(1, avail_threads); +// Use a small element per thread threshold for debugging/testing purposes. +const MIN_ELEMS_PER_THREAD: usize = if cfg!(debug_assertions) { 1 } else { 128 }; + pub(crate) fn build_tables(keys: Vec) -> Vec>> where T: Send + Hash + Eq + Sync + Copy + AsU64, I: IntoIterator + Send + Sync + Clone, - // ::IntoIter: TrustedLen, { - let n_partitions = _set_partition_size(); + // FIXME: change interface to split the input here, instead of taking + // pre-split input iterators. + let n_partitions = keys.len(); + let n_threads = n_partitions; + let num_keys_est: usize = keys + .iter() + .map(|k| k.clone().into_iter().size_hint().0) + .sum(); + + // Don't bother parallelizing anything for small inputs. + if num_keys_est < 2 * MIN_ELEMS_PER_THREAD { + let mut hm: PlHashMap> = PlHashMap::new(); + let mut offset = 0; + for it in keys { + for k in it { + hm.entry(k).or_default().push(offset); + offset += 1; + } + } + return vec![hm]; + } - // We will create a hashtable in every thread. - // We use the hash to partition the keys to the matching hashtable. - // Every thread traverses all keys/hashes and ignores the ones that doesn't fall in that partition. POOL.install(|| { - (0..n_partitions) - .into_par_iter() - .map(|partition_no| { - let partition_no = partition_no as u64; + // Compute the number of elements in each partition for each portion. + let per_thread_partition_sizes: Vec> = keys + .par_iter() + .map(|key_portion| { + let mut partition_sizes = vec![0; n_partitions]; + for key in key_portion.clone() { + let h = key.as_u64(); + let p = hash_to_partition(h, n_partitions); + unsafe { + *partition_sizes.get_unchecked_mut(p) += 1; + } + } + partition_sizes + }) + .collect(); - let mut hash_tbl: PlHashMap> = - PlHashMap::with_capacity(_HASHMAP_INIT_SIZE); + // Compute output offsets with a cumulative sum. + let mut per_thread_partition_offsets = vec![0; n_partitions * n_threads + 1]; + let mut partition_offsets = vec![0; n_partitions + 1]; + let mut cum_offset = 0; + for p in 0..n_partitions { + partition_offsets[p] = cum_offset; + for t in 0..n_threads { + per_thread_partition_offsets[t * n_partitions + p] = cum_offset; + cum_offset += per_thread_partition_sizes[t][p]; + } + } + let num_keys = cum_offset; + per_thread_partition_offsets[n_threads * n_partitions] = num_keys; + partition_offsets[n_partitions] = num_keys; - let n_partitions = n_partitions as u64; - let mut offset = 0; - for keys in &keys { - let keys = keys.clone().into_iter(); - let len = keys.size_hint().1.unwrap() as IdxSize; + // FIXME: we wouldn't need this if we changed our interface to split the + // input in this function, instead of taking a vec of iterators. + let mut per_thread_input_offsets = vec![0; n_partitions]; + cum_offset = 0; + for t in 0..n_threads { + per_thread_input_offsets[t] = cum_offset; + for p in 0..n_partitions { + cum_offset += per_thread_partition_sizes[t][p]; + } + } - let mut cnt = 0; - keys.for_each(|k| { - let idx = cnt + offset; - cnt += 1; + // Scatter values into partitions. + let mut scatter_keys: Vec = Vec::with_capacity(num_keys); + let mut scatter_idxs: Vec = Vec::with_capacity(num_keys); + let scatter_keys_ptr = unsafe { SyncPtr::new(scatter_keys.as_mut_ptr()) }; + let scatter_idxs_ptr = unsafe { SyncPtr::new(scatter_idxs.as_mut_ptr()) }; + keys.into_par_iter() + .enumerate() + .for_each(|(t, key_portion)| { + let mut partition_offsets = + per_thread_partition_offsets[t * n_partitions..(t + 1) * n_partitions].to_vec(); + for (i, key) in key_portion.into_iter().enumerate() { + unsafe { + let p = hash_to_partition(key.as_u64(), n_partitions); + let off = partition_offsets.get_unchecked_mut(p); + *scatter_keys_ptr.get().add(*off) = key; + *scatter_idxs_ptr.get().add(*off) = + (per_thread_input_offsets[t] + i) as IdxSize; + *off += 1; + } + } + }); + unsafe { + scatter_keys.set_len(num_keys); + scatter_idxs.set_len(num_keys); + } - if this_partition(k.as_u64(), partition_no, n_partitions) { - let entry = hash_tbl.entry(k); + // Build tables. + (0..n_partitions) + .into_par_iter() + .map(|p| { + // Resizing the hash map is very, very expensive. That's why we + // adopt a hybrid strategy: we assume an initially small hash + // map, which would satisfy a highly skewed relation. If this + // fills up we immediately reserve enough for a full cardinality + // data set. + let partition_range = partition_offsets[p]..partition_offsets[p + 1]; + let full_size = partition_range.len(); + let mut conservative_size = _HASHMAP_INIT_SIZE.max(full_size / 64); + let mut hm: PlHashMap> = + PlHashMap::with_capacity(conservative_size); - match entry { - Entry::Vacant(entry) => { - entry.insert(vec![idx]); - }, - Entry::Occupied(mut entry) => { - let v = entry.get_mut(); - v.push(idx); - }, - } + unsafe { + for i in partition_range { + if hm.len() == conservative_size { + hm.reserve(full_size - conservative_size); + conservative_size = 0; // Hack to ensure we never hit this branch again. } - }); - offset += len; + + let key = *scatter_keys.get_unchecked(i); + let idx = *scatter_idxs.get_unchecked(i); + match hm.entry(key) { + Entry::Occupied(mut o) => { + o.get_mut().push(idx as IdxSize); + }, + Entry::Vacant(v) => { + v.insert(vec![idx as IdxSize]); + }, + }; + } } - hash_tbl + + hm }) .collect() }) diff --git a/crates/polars-ops/src/frame/join/hash_join/single_keys_inner.rs b/crates/polars-ops/src/frame/join/hash_join/single_keys_inner.rs index 1af9386e9031..8f2bc1c4c49d 100644 --- a/crates/polars-ops/src/frame/join/hash_join/single_keys_inner.rs +++ b/crates/polars-ops/src/frame/join/hash_join/single_keys_inner.rs @@ -40,7 +40,7 @@ pub(super) fn hash_join_tuples_inner( validate: JoinValidation, ) -> PolarsResult<(Vec, Vec)> where - I: IntoIterator + Send + Sync + Copy, + I: IntoIterator + Send + Sync + Clone, // ::IntoIter: TrustedLen, T: Send + Hash + Eq + Sync + Copy + AsU64, { @@ -49,7 +49,7 @@ where let hash_tbls = if validate.needs_checks() { let expected_size = build .iter() - .map(|v| v.into_iter().size_hint().1.unwrap()) + .map(|v| v.clone().into_iter().size_hint().1.unwrap()) .sum(); let hash_tbls = build_tables(build); let build_size = hash_tbls.iter().map(|m| m.len()).sum(); diff --git a/crates/polars-ops/src/frame/join/merge_sorted.rs b/crates/polars-ops/src/frame/join/merge_sorted.rs index d01def8f5e4c..2a9a12d4477a 100644 --- a/crates/polars-ops/src/frame/join/merge_sorted.rs +++ b/crates/polars-ops/src/frame/join/merge_sorted.rs @@ -175,8 +175,6 @@ where for a in &mut a_iter { current_a = a; if a <= current_b { - // safety - // we pre-allocated enough out.push(A_INDICATOR); continue; } diff --git a/crates/polars-utils/src/functions.rs b/crates/polars-utils/src/functions.rs index 54a3b241375f..84a0bae2973e 100644 --- a/crates/polars-utils/src/functions.rs +++ b/crates/polars-utils/src/functions.rs @@ -1,4 +1,18 @@ use std::hash::{BuildHasher, Hash}; +use std::ops::Range; + +// The ith portion of a range split in k (as equal as possible) parts. +#[inline(always)] +pub fn range_portion(i: usize, k: usize, r: Range) -> Range { + // Each portion having size n / k leaves n % k elements unaccounted for. + // Make the first n % k portions have 1 extra element. + let n = r.len(); + let base_size = n / k; + let num_one_larger = n % k; + let num_before = base_size * i + i.min(num_one_larger); + let our_size = base_size + (i < num_one_larger) as usize; + r.start + num_before..r.start + num_before + our_size +} // Faster than collecting from a flattened iterator. pub fn flatten>(bufs: &[R], len: Option) -> Vec { diff --git a/crates/polars-utils/src/lib.rs b/crates/polars-utils/src/lib.rs index bd37b126969e..36c85519f42d 100644 --- a/crates/polars-utils/src/lib.rs +++ b/crates/polars-utils/src/lib.rs @@ -6,7 +6,7 @@ pub mod cache; pub mod cell; pub mod contention_pool; mod error; -mod functions; +pub mod functions; pub mod mem; pub mod slice; pub mod sort;