Skip to content

Commit

Permalink
feat(rust): add cluster_with_columns plan optimization (#16274)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored May 22, 2024
1 parent 31e2087 commit c75dba9
Show file tree
Hide file tree
Showing 13 changed files with 659 additions and 6 deletions.
64 changes: 64 additions & 0 deletions crates/polars-arrow/src/bitmap/bitmap_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,50 @@ where
Bitmap::from_u8_vec(buffer, length)
}

/// Apply a bitwise operation `op` to two inputs and fold the result.
pub fn binary_fold<B, F, R>(lhs: &Bitmap, rhs: &Bitmap, op: F, init: B, fold: R) -> B
where
F: Fn(u64, u64) -> B,
R: Fn(B, B) -> B,
{
assert_eq!(lhs.len(), rhs.len());
let lhs_chunks = lhs.chunks();
let rhs_chunks = rhs.chunks();
let rem_lhs = lhs_chunks.remainder();
let rem_rhs = rhs_chunks.remainder();

let result = lhs_chunks
.zip(rhs_chunks)
.fold(init, |prev, (left, right)| fold(prev, op(left, right)));

fold(result, op(rem_lhs, rem_rhs))
}

/// Apply a bitwise operation `op` to two inputs and fold the result.
pub fn binary_fold_mut<B, F, R>(
lhs: &MutableBitmap,
rhs: &MutableBitmap,
op: F,
init: B,
fold: R,
) -> B
where
F: Fn(u64, u64) -> B,
R: Fn(B, B) -> B,
{
assert_eq!(lhs.len(), rhs.len());
let lhs_chunks = lhs.chunks();
let rhs_chunks = rhs.chunks();
let rem_lhs = lhs_chunks.remainder();
let rem_rhs = rhs_chunks.remainder();

let result = lhs_chunks
.zip(rhs_chunks)
.fold(init, |prev, (left, right)| fold(prev, op(left, right)));

fold(result, op(rem_lhs, rem_rhs))
}

fn unary_impl<F, I>(iter: I, op: F, length: usize) -> Bitmap
where
I: BitChunkIterExact<u64>,
Expand Down Expand Up @@ -226,6 +270,26 @@ fn eq(lhs: &Bitmap, rhs: &Bitmap) -> bool {
lhs_remainder.zip(rhs_remainder).all(|(x, y)| x == y)
}

pub fn intersects_with(lhs: &Bitmap, rhs: &Bitmap) -> bool {
binary_fold(
lhs,
rhs,
|lhs, rhs| lhs & rhs != 0,
false,
|lhs, rhs| lhs || rhs,
)
}

pub fn intersects_with_mut(lhs: &MutableBitmap, rhs: &MutableBitmap) -> bool {
binary_fold_mut(
lhs,
rhs,
|lhs, rhs| lhs & rhs != 0,
false,
|lhs, rhs| lhs || rhs,
)
}

impl PartialEq for Bitmap {
fn eq(&self, other: &Self) -> bool {
eq(self, other)
Expand Down
9 changes: 8 additions & 1 deletion crates/polars-arrow/src/bitmap/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use either::Either;
use polars_error::{polars_bail, PolarsResult};

use super::utils::{count_zeros, fmt, get_bit, get_bit_unchecked, BitChunk, BitChunks, BitmapIter};
use super::{chunk_iter_to_vec, IntoIter, MutableBitmap};
use super::{chunk_iter_to_vec, intersects_with, IntoIter, MutableBitmap};
use crate::bitmap::aligned::AlignedBitmapSlice;
use crate::bitmap::iterator::{
FastU32BitmapIter, FastU56BitmapIter, FastU64BitmapIter, TrueIdxIter,
Expand Down Expand Up @@ -474,6 +474,13 @@ impl Bitmap {
unset_bit_count_cache,
}
}

/// Checks whether two [`Bitmap`]s have shared set bits.
///
/// This is an optimized version of `(self & other) != 0000..`.
pub fn intersects_with(&self, other: &Self) -> bool {
intersects_with(self, other)
}
}

impl<P: AsRef<[bool]>> From<P> for Bitmap {
Expand Down
24 changes: 22 additions & 2 deletions crates/polars-arrow/src/bitmap/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use std::sync::Arc;
use polars_error::{polars_bail, PolarsResult};

use super::utils::{
count_zeros, fmt, get_bit, set, set_bit, BitChunk, BitChunksExactMut, BitmapIter,
count_zeros, fmt, get_bit, set, set_bit, BitChunk, BitChunks, BitChunksExactMut, BitmapIter,
};
use super::Bitmap;
use super::{intersects_with_mut, Bitmap};
use crate::bitmap::utils::{get_bit_unchecked, merge_reversed, set_bit_unchecked};
use crate::trusted_len::TrustedLen;

Expand Down Expand Up @@ -246,6 +246,15 @@ impl MutableBitmap {
count_zeros(&self.buffer, 0, self.length)
}

/// Returns the number of set bits on this [`MutableBitmap`].
///
/// Guaranteed to be `<= self.len()`.
/// # Implementation
/// This function is `O(N)`
pub fn set_bits(&self) -> usize {
self.length - self.unset_bits()
}

/// Returns the number of unset bits on this [`MutableBitmap`].
#[deprecated(since = "0.13.0", note = "use `unset_bits` instead")]
pub fn null_count(&self) -> usize {
Expand Down Expand Up @@ -335,11 +344,22 @@ impl MutableBitmap {
self.buffer.shrink_to_fit();
}

/// Returns an iterator over bits in bit chunks [`BitChunk`].
///
/// This iterator is useful to operate over multiple bits via e.g. bitwise.
pub fn chunks<T: BitChunk>(&self) -> BitChunks<T> {
BitChunks::new(&self.buffer, 0, self.length)
}

/// Returns an iterator over mutable slices, [`BitChunksExactMut`]
pub(crate) fn bitchunks_exact_mut<T: BitChunk>(&mut self) -> BitChunksExactMut<T> {
BitChunksExactMut::new(&mut self.buffer, self.length)
}

pub fn intersects_with(&self, other: &Self) -> bool {
intersects_with_mut(self, other)
}

pub fn freeze(self) -> Bitmap {
self.into()
}
Expand Down
7 changes: 7 additions & 0 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl LazyFrame {
self.with_optimizations(OptState {
projection_pushdown: false,
predicate_pushdown: false,
cluster_with_columns: false,
type_coercion: true,
simplify_expr: false,
slice_pushdown: false,
Expand All @@ -150,6 +151,12 @@ impl LazyFrame {
self
}

/// Toggle cluster with columns optimization.
pub fn with_cluster_with_columns(mut self, toggle: bool) -> Self {
self.opt_state.cluster_with_columns = toggle;
self
}

/// Toggle predicate pushdown optimization.
pub fn with_predicate_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.predicate_pushdown = toggle;
Expand Down
144 changes: 144 additions & 0 deletions crates/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,147 @@ fn test_flatten_unions() -> PolarsResult<()> {
}
Ok(())
}

fn num_occurrences(s: &str, needle: &str) -> usize {
let mut i = 0;
let mut num = 0;

while let Some(n) = s[i..].find(needle) {
i += n + 1;
num += 1;
}

num
}

#[test]
fn test_cluster_with_columns() -> Result<(), Box<dyn std::error::Error>> {
use polars_core::prelude::*;

let df = df!("foo" => &[0.5, 1.7, 3.2],
"bar" => &[4.1, 1.5, 9.2])?;

let df = df
.lazy()
.without_optimizations()
.with_cluster_with_columns(true)
.with_columns([col("foo") * lit(2.0)])
.with_columns([col("bar") / lit(1.5)]);

let unoptimized = df.clone().to_alp().unwrap();
let optimized = df.clone().to_alp_optimized().unwrap();

let unoptimized = unoptimized.describe();
let optimized = optimized.describe();

println!("\n---\n");

println!("Unoptimized:\n{unoptimized}",);
println!("\n---\n");
println!("Optimized:\n{optimized}");

assert_eq!(num_occurrences(&unoptimized, "WITH_COLUMNS"), 2);
assert_eq!(num_occurrences(&optimized, "WITH_COLUMNS"), 1);

Ok(())
}

#[test]
fn test_cluster_with_columns_dependency() -> Result<(), Box<dyn std::error::Error>> {
use polars_core::prelude::*;

let df = df!("foo" => &[0.5, 1.7, 3.2],
"bar" => &[4.1, 1.5, 9.2])?;

let df = df
.lazy()
.without_optimizations()
.with_cluster_with_columns(true)
.with_columns([col("foo").alias("buzz")])
.with_columns([col("buzz")]);

let unoptimized = df.clone().to_alp().unwrap();
let optimized = df.clone().to_alp_optimized().unwrap();

let unoptimized = unoptimized.describe();
let optimized = optimized.describe();

println!("\n---\n");

println!("Unoptimized:\n{unoptimized}",);
println!("\n---\n");
println!("Optimized:\n{optimized}");

assert_eq!(num_occurrences(&unoptimized, "WITH_COLUMNS"), 2);
assert_eq!(num_occurrences(&optimized, "WITH_COLUMNS"), 2);

Ok(())
}

#[test]
fn test_cluster_with_columns_partial() -> Result<(), Box<dyn std::error::Error>> {
use polars_core::prelude::*;

let df = df!("foo" => &[0.5, 1.7, 3.2],
"bar" => &[4.1, 1.5, 9.2])?;

let df = df
.lazy()
.without_optimizations()
.with_cluster_with_columns(true)
.with_columns([col("foo").alias("buzz")])
.with_columns([col("buzz"), col("foo") * lit(2.0)]);

let unoptimized = df.clone().to_alp().unwrap();
let optimized = df.clone().to_alp_optimized().unwrap();

let unoptimized = unoptimized.describe();
let optimized = optimized.describe();

println!("\n---\n");

println!("Unoptimized:\n{unoptimized}",);
println!("\n---\n");
println!("Optimized:\n{optimized}");

assert!(unoptimized.contains(r#"[col("buzz"), [(col("foo")) * (2.0)]]"#));
assert!(unoptimized.contains(r#"[col("foo").alias("buzz")]"#));
assert!(optimized.contains(r#"[col("buzz")]"#));
assert!(optimized.contains(r#"[col("foo").alias("buzz"), [(col("foo")) * (2.0)]]"#));

Ok(())
}

#[test]
fn test_cluster_with_columns_chain() -> Result<(), Box<dyn std::error::Error>> {
use polars_core::prelude::*;

let df = df!("foo" => &[0.5, 1.7, 3.2],
"bar" => &[4.1, 1.5, 9.2])?;

let df = df
.lazy()
.without_optimizations()
.with_cluster_with_columns(true)
.with_columns([col("foo").alias("foo1")])
.with_columns([col("foo").alias("foo2")])
.with_columns([col("foo").alias("foo3")])
.with_columns([col("foo").alias("foo4")]);

let unoptimized = df.clone().to_alp().unwrap();
let optimized = df.clone().to_alp_optimized().unwrap();

let unoptimized = unoptimized.describe();
let optimized = optimized.describe();

println!("\n---\n");

println!("Unoptimized:\n{unoptimized}",);
println!("\n---\n");
println!("Optimized:\n{optimized}");

assert_eq!(num_occurrences(&unoptimized, "WITH_COLUMNS"), 4);
assert_eq!(num_occurrences(&optimized, "WITH_COLUMNS"), 1);

Ok(())
}
3 changes: 3 additions & 0 deletions crates/polars-plan/src/frame/opt_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub struct OptState {
pub projection_pushdown: bool,
/// Apply predicates/filters as early as possible.
pub predicate_pushdown: bool,
/// Cluster sequential `with_columns` calls to independent calls.
pub cluster_with_columns: bool,
/// Run many type coercion optimization rules until fixed point.
pub type_coercion: bool,
/// Run many expression optimization rules until fixed point.
Expand Down Expand Up @@ -36,6 +38,7 @@ impl Default for OptState {
OptState {
projection_pushdown: true,
predicate_pushdown: true,
cluster_with_columns: true,
type_coercion: true,
simplify_expr: true,
slice_pushdown: true,
Expand Down
Loading

0 comments on commit c75dba9

Please sign in to comment.