From 10bd0477e4cc9ba6d884180d0b455f85d27028e2 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Thu, 10 Oct 2024 16:21:49 +0200 Subject: [PATCH] refactor(rust): Reduction -> GroupedReduction for the new streaming engine (#19176) --- Cargo.lock | 1 + crates/polars-arrow/src/bitmap/immutable.rs | 5 +- crates/polars-arrow/src/bitmap/mutable.rs | 69 ++- crates/polars-arrow/src/bitmap/utils/mod.rs | 43 +- .../src/chunked_array/ops/aggregate/mod.rs | 4 +- crates/polars-core/src/datatypes/mod.rs | 3 +- crates/polars-expr/Cargo.toml | 3 +- crates/polars-expr/src/reduce/convert.rs | 63 +-- crates/polars-expr/src/reduce/len.rs | 66 ++- crates/polars-expr/src/reduce/mean.rs | 166 +++++-- crates/polars-expr/src/reduce/min_max.rs | 451 +++++++++++++++--- crates/polars-expr/src/reduce/mod.rs | 327 ++++++++++++- crates/polars-expr/src/reduce/nan_min_max.rs | 141 ------ crates/polars-expr/src/reduce/sum.rs | 147 ++++-- crates/polars-stream/src/nodes/reduce.rs | 58 +-- crates/polars-utils/src/float.rs | 10 +- .../polars/tests/it/arrow/bitmap/utils/mod.rs | 32 +- 17 files changed, 1117 insertions(+), 472 deletions(-) delete mode 100644 crates/polars-expr/src/reduce/nan_min_max.rs diff --git a/Cargo.lock b/Cargo.lock index b889fde7503f..131162aeddcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2995,6 +2995,7 @@ version = "0.43.1" dependencies = [ "ahash", "bitflags", + "num-traits", "once_cell", "polars-arrow", "polars-compute", diff --git a/crates/polars-arrow/src/bitmap/immutable.rs b/crates/polars-arrow/src/bitmap/immutable.rs index c9aa0b681b4a..127b64b800a5 100644 --- a/crates/polars-arrow/src/bitmap/immutable.rs +++ b/crates/polars-arrow/src/bitmap/immutable.rs @@ -5,7 +5,7 @@ use std::sync::LazyLock; use either::Either; use polars_error::{polars_bail, PolarsResult}; -use super::utils::{count_zeros, fmt, get_bit, get_bit_unchecked, BitChunk, BitChunks, BitmapIter}; +use super::utils::{count_zeros, fmt, get_bit_unchecked, BitChunk, BitChunks, BitmapIter}; use super::{chunk_iter_to_vec, intersects_with, num_intersections_with, IntoIter, MutableBitmap}; use crate::array::Splitable; use crate::bitmap::aligned::AlignedBitmapSlice; @@ -334,7 +334,8 @@ impl Bitmap { /// Panics iff `i >= self.len()`. #[inline] pub fn get_bit(&self, i: usize) -> bool { - get_bit(&self.storage, self.offset + i) + assert!(i < self.len()); + unsafe { self.get_bit_unchecked(i) } } /// Unsafely returns whether the bit at position `i` is set. diff --git a/crates/polars-arrow/src/bitmap/mutable.rs b/crates/polars-arrow/src/bitmap/mutable.rs index d030682a63a7..900722a9e0b7 100644 --- a/crates/polars-arrow/src/bitmap/mutable.rs +++ b/crates/polars-arrow/src/bitmap/mutable.rs @@ -1,12 +1,11 @@ use std::hint::unreachable_unchecked; use polars_error::{polars_bail, PolarsResult}; +use polars_utils::vec::PushUnchecked; -use super::utils::{ - count_zeros, fmt, get_bit, set, set_bit, BitChunk, BitChunks, BitChunksExactMut, BitmapIter, -}; +use super::utils::{count_zeros, fmt, BitChunk, BitChunks, BitChunksExactMut, BitmapIter}; use super::{intersects_with_mut, Bitmap}; -use crate::bitmap::utils::{get_bit_unchecked, merge_reversed, set_bit_unchecked}; +use crate::bitmap::utils::{get_bit_unchecked, merge_reversed, set_bit_in_byte}; use crate::storage::SharedStorage; use crate::trusted_len::TrustedLen; @@ -118,8 +117,8 @@ impl MutableBitmap { if self.length % 8 == 0 { self.buffer.push(0); } - let byte = unsafe { self.buffer.as_mut_slice().last_mut().unwrap_unchecked() }; - *byte = set(*byte, self.length % 8, value); + let byte = unsafe { self.buffer.last_mut().unwrap_unchecked() }; + *byte = set_bit_in_byte(*byte, self.length % 8, value); self.length += 1; } @@ -144,7 +143,8 @@ impl MutableBitmap { /// Panics iff `index >= self.len()`. #[inline] pub fn get(&self, index: usize) -> bool { - get_bit(&self.buffer, index) + assert!(index < self.len()); + unsafe { self.get_unchecked(index) } } /// Returns whether the position `index` is set. @@ -161,7 +161,28 @@ impl MutableBitmap { /// Panics iff `index >= self.len()`. #[inline] pub fn set(&mut self, index: usize, value: bool) { - set_bit(self.buffer.as_mut_slice(), index, value) + assert!(index < self.len()); + unsafe { + self.set_unchecked(index, value); + } + } + + /// Sets the position `index` to the OR of its original value and `value`. + /// + /// # Safety + /// It's undefined behavior if index >= self.len(). + #[inline] + pub unsafe fn or_pos_unchecked(&mut self, index: usize, value: bool) { + *self.buffer.get_unchecked_mut(index / 8) |= (value as u8) << (index % 8); + } + + /// Sets the position `index` to the AND of its original value and `value`. + /// + /// # Safety + /// It's undefined behavior if index >= self.len(). + #[inline] + pub unsafe fn and_pos_unchecked(&mut self, index: usize, value: bool) { + *self.buffer.get_unchecked_mut(index / 8) &= (value as u8) << (index % 8); } /// constructs a new iterator over the bits of [`MutableBitmap`]. @@ -192,6 +213,17 @@ impl MutableBitmap { } } + /// Resizes the [`MutableBitmap`] to the specified length, inserting value + /// if the length is bigger than the current length. + pub fn resize(&mut self, length: usize, value: bool) { + if let Some(additional) = length.checked_sub(self.len()) { + self.extend_constant(additional, value); + } else { + self.buffer.truncate(length.saturating_add(7) / 8); + self.length = length; + } + } + /// Initializes a zeroed [`MutableBitmap`]. #[inline] pub fn from_len_zeroed(length: usize) -> Self { @@ -230,10 +262,10 @@ impl MutableBitmap { #[inline] pub unsafe fn push_unchecked(&mut self, value: bool) { if self.length % 8 == 0 { - self.buffer.push(0); + self.buffer.push_unchecked(0); } - let byte = self.buffer.as_mut_slice().last_mut().unwrap(); - *byte = set(*byte, self.length % 8, value); + let byte = self.buffer.last_mut().unwrap_unchecked(); + *byte = set_bit_in_byte(*byte, self.length % 8, value); self.length += 1; } @@ -330,7 +362,8 @@ impl MutableBitmap { /// Caller must ensure that `index < self.len()` #[inline] pub unsafe fn set_unchecked(&mut self, index: usize, value: bool) { - set_bit_unchecked(self.buffer.as_mut_slice(), index, value) + let byte = self.buffer.get_unchecked_mut(index / 8); + *byte = set_bit_in_byte(*byte, index % 8, value); } /// Shrinks the capacity of the [`MutableBitmap`] to fit its current length. @@ -566,10 +599,10 @@ impl MutableBitmap { self.buffer.push(0); } // the iterator will not fill the last byte - let byte = self.buffer.as_mut_slice().last_mut().unwrap(); + let byte = self.buffer.last_mut().unwrap(); let mut i = bit_offset; for value in iterator { - *byte = set(*byte, i, value); + *byte = set_bit_in_byte(*byte, i, value); i += 1; } self.length += length; @@ -581,9 +614,9 @@ impl MutableBitmap { if bit_offset != 0 { // we are in the middle of a byte; lets finish it - let byte = self.buffer.as_mut_slice().last_mut().unwrap(); + let byte = self.buffer.last_mut().unwrap(); (bit_offset..8).for_each(|i| { - *byte = set(*byte, i, iterator.next().unwrap()); + *byte = set_bit_in_byte(*byte, i, iterator.next().unwrap()); }); self.length += 8 - bit_offset; length -= 8 - bit_offset; @@ -650,7 +683,7 @@ impl MutableBitmap { let data = buffer.as_mut_slice(); data[..chunks].iter_mut().try_for_each(|byte| { (0..8).try_for_each(|i| { - *byte = set(*byte, i, iterator.next().unwrap()?); + *byte = set_bit_in_byte(*byte, i, iterator.next().unwrap()?); Ok(()) }) })?; @@ -658,7 +691,7 @@ impl MutableBitmap { if reminder != 0 { let last = &mut data[chunks]; iterator.enumerate().try_for_each(|(i, value)| { - *last = set(*last, i, value?); + *last = set_bit_in_byte(*last, i, value?); Ok(()) })?; } diff --git a/crates/polars-arrow/src/bitmap/utils/mod.rs b/crates/polars-arrow/src/bitmap/utils/mod.rs index ebd47c2530dc..01cccd81bd68 100644 --- a/crates/polars-arrow/src/bitmap/utils/mod.rs +++ b/crates/polars-arrow/src/bitmap/utils/mod.rs @@ -25,53 +25,34 @@ pub fn is_set(byte: u8, i: usize) -> bool { } /// Sets bit at position `i` in `byte`. -#[inline] -pub fn set(byte: u8, i: usize, value: bool) -> u8 { +#[inline(always)] +pub fn set_bit_in_byte(byte: u8, i: usize, value: bool) -> u8 { debug_assert!(i < 8); - let mask = !(1 << i); let insert = (value as u8) << i; (byte & mask) | insert } -/// Sets bit at position `i` in `bytes`. -/// # Panics -/// This function panics iff `i >= bytes.len() * 8`. -#[inline] -pub fn set_bit(bytes: &mut [u8], i: usize, value: bool) { - bytes[i / 8] = set(bytes[i / 8], i % 8, value); -} - -/// Sets bit at position `i` in `bytes` without doing bound checks -/// # Safety -/// `i >= bytes.len() * 8` results in undefined behavior. -#[inline] -pub unsafe fn set_bit_unchecked(bytes: &mut [u8], i: usize, value: bool) { - let byte = bytes.get_unchecked_mut(i / 8); - *byte = set(*byte, i % 8, value); -} - -/// Returns whether bit at position `i` in `bytes` is set. -/// # Panic -/// This function panics iff `i >= bytes.len() * 8`. -#[inline] -pub fn get_bit(bytes: &[u8], i: usize) -> bool { - let byte = bytes[i / 8]; - let bit = (byte >> (i % 8)) & 1; - bit != 0 -} - /// Returns whether bit at position `i` in `bytes` is set or not. /// /// # Safety /// `i >= bytes.len() * 8` results in undefined behavior. -#[inline] +#[inline(always)] pub unsafe fn get_bit_unchecked(bytes: &[u8], i: usize) -> bool { let byte = *bytes.get_unchecked_release(i / 8); let bit = (byte >> (i % 8)) & 1; bit != 0 } +/// Sets bit at position `i` in `bytes` without doing bound checks. +/// # Safety +/// `i >= bytes.len() * 8` results in undefined behavior. +#[inline(always)] +pub unsafe fn set_bit_unchecked(bytes: &mut [u8], i: usize, value: bool) { + let byte = bytes.get_unchecked_mut(i / 8); + *byte = set_bit_in_byte(*byte, i % 8, value); +} + /// Returns the number of bytes required to hold `bits` bits. #[inline] pub fn bytes_for(bits: usize) -> usize { diff --git a/crates/polars-core/src/chunked_array/ops/aggregate/mod.rs b/crates/polars-core/src/chunked_array/ops/aggregate/mod.rs index cf79b0acb473..071073460ff3 100644 --- a/crates/polars-core/src/chunked_array/ops/aggregate/mod.rs +++ b/crates/polars-core/src/chunked_array/ops/aggregate/mod.rs @@ -563,7 +563,7 @@ impl ChunkAggSeries for CategoricalChunked { } impl BinaryChunked { - pub(crate) fn max_binary(&self) -> Option<&[u8]> { + pub fn max_binary(&self) -> Option<&[u8]> { if self.is_empty() { return None; } @@ -587,7 +587,7 @@ impl BinaryChunked { } } - pub(crate) fn min_binary(&self) -> Option<&[u8]> { + pub fn min_binary(&self) -> Option<&[u8]> { if self.is_empty() { return None; } diff --git a/crates/polars-core/src/datatypes/mod.rs b/crates/polars-core/src/datatypes/mod.rs index 64266a0066db..712466482ce2 100644 --- a/crates/polars-core/src/datatypes/mod.rs +++ b/crates/polars-core/src/datatypes/mod.rs @@ -35,7 +35,7 @@ use bytemuck::Zeroable; pub use dtype::*; pub use field::*; pub use into_scalar::*; -use num_traits::{Bounded, FromPrimitive, Num, NumCast, One, Zero}; +use num_traits::{AsPrimitive, Bounded, FromPrimitive, Num, NumCast, One, Zero}; use polars_compute::arithmetic::HasPrimitiveArithmeticKernel; use polars_compute::float_sum::FloatSum; use polars_utils::abs_diff::AbsDiff; @@ -356,6 +356,7 @@ pub trait NumericNative: + IsFloat + HasPrimitiveArithmeticKernel::Native> + FloatSum + + AsPrimitive + MinMax + IsNull { diff --git a/crates/polars-expr/Cargo.toml b/crates/polars-expr/Cargo.toml index 1b2b6063de9b..d53585b17d43 100644 --- a/crates/polars-expr/Cargo.toml +++ b/crates/polars-expr/Cargo.toml @@ -12,6 +12,7 @@ description = "Physical expression implementation of the Polars project." ahash = { workspace = true } arrow = { workspace = true } bitflags = { workspace = true } +num-traits = { workspace = true } once_cell = { workspace = true } polars-compute = { workspace = true } polars-core = { workspace = true, features = ["lazy", "zip_with", "random"] } @@ -72,5 +73,5 @@ bitwise = ["polars-core/bitwise", "polars-plan/bitwise"] round_series = ["polars-plan/round_series", "polars-ops/round_series"] is_between = ["polars-plan/is_between"] dynamic_group_by = ["polars-plan/dynamic_group_by", "polars-time", "temporal"] -propagate_nans = ["polars-plan/propagate_nans"] +propagate_nans = ["polars-plan/propagate_nans", "polars-ops/propagate_nans"] panic_on_schema = ["polars-plan/panic_on_schema"] diff --git a/crates/polars-expr/src/reduce/convert.rs b/crates/polars-expr/src/reduce/convert.rs index af3f72733efd..3573192ae16f 100644 --- a/crates/polars-expr/src/reduce/convert.rs +++ b/crates/polars-expr/src/reduce/convert.rs @@ -1,21 +1,19 @@ -use polars_core::error::feature_gated; +// use polars_core::error::feature_gated; use polars_plan::prelude::*; use polars_utils::arena::{Arena, Node}; -use super::len::LenReduce; -use super::mean::MeanReduce; -use super::min_max::{MaxReduce, MinReduce}; -#[cfg(feature = "propagate_nans")] -use super::nan_min_max::{NanMaxReduce, NanMinReduce}; -use super::sum::SumReduce; use super::*; +use crate::reduce::len::LenReduce; +use crate::reduce::mean::new_mean_reduction; +use crate::reduce::min_max::{new_max_reduction, new_min_reduction}; +use crate::reduce::sum::new_sum_reduction; /// Converts a node into a reduction + its associated selector expression. pub fn into_reduction( node: Node, expr_arena: &mut Arena, schema: &Schema, -) -> PolarsResult<(Box, Node)> { +) -> PolarsResult<(Box, Node)> { let get_dt = |node| { expr_arena .get(node) @@ -23,59 +21,22 @@ pub fn into_reduction( }; let out = match expr_arena.get(node) { AExpr::Agg(agg) => match agg { - IRAggExpr::Sum(input) => ( - Box::new(SumReduce::new(get_dt(*input)?)) as Box, - *input, - ), + IRAggExpr::Sum(input) => (new_sum_reduction(get_dt(*input)?), *input), + IRAggExpr::Mean(input) => (new_mean_reduction(get_dt(*input)?), *input), IRAggExpr::Min { propagate_nans, input, - } => { - let dt = get_dt(*input)?; - if *propagate_nans && dt.is_float() { - feature_gated!("propagate_nans", { - let out: Box = match dt { - DataType::Float32 => Box::new(NanMinReduce::::new()), - DataType::Float64 => Box::new(NanMinReduce::::new()), - _ => unreachable!(), - }; - (out, *input) - }) - } else { - ( - Box::new(MinReduce::new(dt.clone())) as Box, - *input, - ) - } - }, + } => (new_min_reduction(get_dt(*input)?, *propagate_nans), *input), IRAggExpr::Max { propagate_nans, input, - } => { - let dt = get_dt(*input)?; - if *propagate_nans && dt.is_float() { - feature_gated!("propagate_nans", { - let out: Box = match dt { - DataType::Float32 => Box::new(NanMaxReduce::::new()), - DataType::Float64 => Box::new(NanMaxReduce::::new()), - _ => unreachable!(), - }; - (out, *input) - }) - } else { - (Box::new(MaxReduce::new(dt.clone())) as _, *input) - } - }, - IRAggExpr::Mean(input) => { - let out: Box = Box::new(MeanReduce::new(get_dt(*input)?)); - (out, *input) - }, - _ => unreachable!(), + } => (new_max_reduction(get_dt(*input)?, *propagate_nans), *input), + _ => todo!(), }, AExpr::Len => { // Compute length on the first column, or if none exist we'll use // a zero-length dummy series. - let out: Box = Box::new(LenReduce::new()); + let out: Box = Box::new(LenReduce::default()); let expr = if let Some(first_column) = schema.iter_names().next() { expr_arena.add(AExpr::Column(first_column.as_str().into())) } else { diff --git a/crates/polars-expr/src/reduce/len.rs b/crates/polars-expr/src/reduce/len.rs index 1e11a505410d..db8aee647824 100644 --- a/crates/polars-expr/src/reduce/len.rs +++ b/crates/polars-expr/src/reduce/len.rs @@ -2,41 +2,63 @@ use polars_core::error::constants::LENGTH_LIMIT_MSG; use super::*; -#[derive(Clone)] -pub struct LenReduce {} +#[derive(Default)] +pub struct LenReduce { + groups: Vec, +} -impl LenReduce { - pub fn new() -> Self { - Self {} +impl GroupedReduction for LenReduce { + fn new_empty(&self) -> Box { + Box::new(Self::default()) } -} -impl Reduction for LenReduce { - fn new_reducer(&self) -> Box { - Box::new(LenReduceState { len: 0 }) + fn resize(&mut self, num_groups: IdxSize) { + self.groups.resize(num_groups as usize, 0); } -} -pub struct LenReduceState { - len: u64, -} + fn update_group(&mut self, values: &Series, group_idx: IdxSize) -> PolarsResult<()> { + self.groups[group_idx as usize] += values.len() as u64; + Ok(()) + } -impl ReductionState for LenReduceState { - fn update(&mut self, batch: &Series) -> PolarsResult<()> { - self.len += batch.len() as u64; + unsafe fn update_groups( + &mut self, + values: &Series, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { + assert!(values.len() == group_idxs.len()); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for g in group_idxs.iter() { + *self.groups.get_unchecked_mut(*g as usize) += 1; + } + } Ok(()) } - fn combine(&mut self, other: &dyn ReductionState) -> PolarsResult<()> { + unsafe fn combine( + &mut self, + other: &dyn GroupedReduction, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { let other = other.as_any().downcast_ref::().unwrap(); - self.len += other.len; + assert!(self.groups.len() == other.groups.len()); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, v) in group_idxs.iter().zip(other.groups.iter()) { + *self.groups.get_unchecked_mut(*g as usize) += v; + } + } Ok(()) } - fn finalize(&self) -> PolarsResult { - #[allow(clippy::useless_conversion)] - let as_idx: IdxSize = self.len.try_into().expect(LENGTH_LIMIT_MSG); - Ok(Scalar::new(IDX_DTYPE, as_idx.into())) + fn finalize(&mut self) -> PolarsResult { + let ca: IdxCa = self + .groups + .drain(..) + .map(|l| IdxSize::try_from(l).expect(LENGTH_LIMIT_MSG)) + .collect_ca(PlSmallStr::EMPTY); + Ok(ca.into_series()) } fn as_any(&self) -> &dyn Any { diff --git a/crates/polars-expr/src/reduce/mean.rs b/crates/polars-expr/src/reduce/mean.rs index e8b19b342de6..0caa2ccabcb8 100644 --- a/crates/polars-expr/src/reduce/mean.rs +++ b/crates/polars-expr/src/reduce/mean.rs @@ -1,56 +1,148 @@ +use std::marker::PhantomData; + +use num_traits::{AsPrimitive, Zero}; +use polars_core::with_match_physical_numeric_polars_type; + use super::*; -#[derive(Clone)] -pub struct MeanReduce { - dtype: DataType, +pub fn new_mean_reduction(dtype: DataType) -> Box { + use DataType::*; + use VecGroupedReduction as VGR; + match dtype { + Boolean => Box::new(VGR::::new(dtype)), + _ if dtype.is_numeric() || dtype.is_temporal() => { + with_match_physical_numeric_polars_type!(dtype.to_physical(), |$T| { + Box::new(VGR::>::new(dtype)) + }) + }, + #[cfg(feature = "dtype-decimal")] + Decimal(_, _) => Box::new(VGR::>::new(dtype)), + _ => unimplemented!(), + } } -impl MeanReduce { - pub fn new(dtype: DataType) -> Self { - Self { dtype } +fn finish_output(values: Vec<(f64, usize)>, dtype: &DataType) -> Series { + match dtype { + DataType::Float32 => { + let ca: Float32Chunked = values + .into_iter() + .map(|(s, c)| (c != 0).then(|| (s / c as f64) as f32)) + .collect_ca(PlSmallStr::EMPTY); + ca.into_series() + }, + dt if dt.is_numeric() => { + let ca: Float64Chunked = values + .into_iter() + .map(|(s, c)| (c != 0).then(|| s / c as f64)) + .collect_ca(PlSmallStr::EMPTY); + ca.into_series() + }, + #[cfg(feature = "dtype-decimal")] + DataType::Decimal(_prec, scale) => { + let inv_scale_factor = 1.0 / 10u128.pow(scale.unwrap() as u32) as f64; + let ca: Float64Chunked = values + .into_iter() + .map(|(s, c)| (c != 0).then(|| s / c as f64 * inv_scale_factor)) + .collect_ca(PlSmallStr::EMPTY); + ca.into_series() + }, + #[cfg(feature = "dtype-datetime")] + DataType::Date => { + const MS_IN_DAY: i64 = 86_400_000; + let ca: Int64Chunked = values + .into_iter() + .map(|(s, c)| (c != 0).then(|| (s / c as f64 * MS_IN_DAY as f64) as i64)) + .collect_ca(PlSmallStr::EMPTY); + ca.into_datetime(TimeUnit::Milliseconds, None).into_series() + }, + DataType::Datetime(_, _) | DataType::Duration(_) | DataType::Time => { + let ca: Int64Chunked = values + .into_iter() + .map(|(s, c)| (c != 0).then(|| (s / c as f64) as i64)) + .collect_ca(PlSmallStr::EMPTY); + ca.into_series().cast(dtype).unwrap() + }, + _ => unimplemented!(), } } -impl Reduction for MeanReduce { - fn new_reducer(&self) -> Box { - Box::new(MeanReduceState { - dtype: self.dtype.clone(), - sum: 0.0, - count: 0, - }) +struct NumMeanReducer(PhantomData); + +impl Reducer for NumMeanReducer +where + T: PolarsNumericType, + ChunkedArray: ChunkAgg + IntoSeries, +{ + type Dtype = T; + type Value = (f64, usize); + + #[inline(always)] + fn init() -> Self::Value { + (0.0, 0) + } + + fn cast_series(s: &Series) -> Cow<'_, Series> { + s.to_physical_repr() + } + + #[inline(always)] + fn combine(a: &mut Self::Value, b: &Self::Value) { + a.0 += b.0; + a.1 += b.1; } -} -pub struct MeanReduceState { - dtype: DataType, - sum: f64, - count: u64, + #[inline(always)] + fn reduce_one(a: &mut Self::Value, b: Option) { + a.0 += b.unwrap_or(T::Native::zero()).as_(); + a.1 += b.is_some() as usize; + } + + fn reduce_ca(v: &mut Self::Value, ca: &ChunkedArray) { + v.0 += ChunkAgg::_sum_as_f64(ca); + v.1 += ca.len() - ca.null_count(); + } + + fn finish(v: Vec, m: Option, dtype: &DataType) -> PolarsResult { + assert!(m.is_none()); + Ok(finish_output(v, dtype)) + } } -impl ReductionState for MeanReduceState { - fn update(&mut self, batch: &Series) -> PolarsResult<()> { - let count = batch.len() as u64 - batch.null_count() as u64; - self.count += count; - self.sum += batch._sum_as_f64(); - Ok(()) +struct BoolMeanReducer; + +impl Reducer for BoolMeanReducer { + type Dtype = BooleanType; + type Value = (usize, usize); + + #[inline(always)] + fn init() -> Self::Value { + (0, 0) + } + + #[inline(always)] + fn combine(a: &mut Self::Value, b: &Self::Value) { + a.0 += b.0; + a.1 += b.1; } - fn combine(&mut self, other: &dyn ReductionState) -> PolarsResult<()> { - let other = other.as_any().downcast_ref::().unwrap(); - self.sum += other.sum; - self.count += other.count; - Ok(()) + #[inline(always)] + fn reduce_one(a: &mut Self::Value, b: Option) { + a.0 += b.unwrap_or(false) as usize; + a.1 += b.is_some() as usize; } - fn finalize(&self) -> PolarsResult { - let val = (self.count > 0).then(|| self.sum / self.count as f64); - Ok(polars_core::scalar::reduce::mean_reduce( - val, - self.dtype.clone(), - )) + fn reduce_ca(v: &mut Self::Value, ca: &ChunkedArray) { + v.0 += ca.sum().unwrap_or(0) as usize; + v.1 += ca.len() - ca.null_count(); } - fn as_any(&self) -> &dyn Any { - self + fn finish(v: Vec, m: Option, dtype: &DataType) -> PolarsResult { + assert!(m.is_none()); + assert!(dtype == &DataType::Boolean); + let ca: Float64Chunked = v + .into_iter() + .map(|(s, c)| s as f64 / c as f64) + .collect_ca(PlSmallStr::EMPTY); + Ok(ca.into_series()) } } diff --git a/crates/polars-expr/src/reduce/min_max.rs b/crates/polars-expr/src/reduce/min_max.rs index 27cf3d5b5727..f1ec0cbcc5d2 100644 --- a/crates/polars-expr/src/reduce/min_max.rs +++ b/crates/polars-expr/src/reduce/min_max.rs @@ -1,56 +1,342 @@ +use std::borrow::Cow; +use std::marker::PhantomData; + +use arrow::array::BooleanArray; +use arrow::bitmap::Bitmap; +use num_traits::Bounded; +use polars_core::with_match_physical_integer_polars_type; +#[cfg(feature = "propagate_nans")] +use polars_ops::prelude::nan_propagating_aggregate::ca_nan_agg; +use polars_utils::float::IsFloat; +use polars_utils::min_max::MinMax; + use super::*; -#[derive(Clone)] -pub struct MinReduce { - dtype: DataType, +pub fn new_min_reduction(dtype: DataType, propagate_nans: bool) -> Box { + use DataType::*; + use VecMaskGroupedReduction as VMGR; + match dtype { + Boolean => Box::new(BoolMinGroupedReduction::default()), + #[cfg(feature = "propagate_nans")] + Float32 if propagate_nans => Box::new(VMGR::>::new(dtype)), + #[cfg(feature = "propagate_nans")] + Float64 if propagate_nans => Box::new(VMGR::>::new(dtype)), + Float32 => Box::new(VMGR::>::new(dtype)), + Float64 => Box::new(VMGR::>::new(dtype)), + String | Binary => Box::new(VecGroupedReduction::::new(dtype)), + _ if dtype.is_integer() || dtype.is_temporal() => { + with_match_physical_integer_polars_type!(dtype.to_physical(), |$T| { + Box::new(VMGR::>::new(dtype)) + }) + }, + #[cfg(feature = "dtype-decimal")] + Decimal(_, _) => Box::new(VMGR::>::new(dtype)), + _ => unimplemented!(), + } } -impl MinReduce { - pub fn new(dtype: DataType) -> Self { - Self { dtype } +pub fn new_max_reduction(dtype: DataType, propagate_nans: bool) -> Box { + use DataType::*; + use VecMaskGroupedReduction as VMGR; + match dtype { + Boolean => Box::new(BoolMaxGroupedReduction::default()), + #[cfg(feature = "propagate_nans")] + Float32 if propagate_nans => Box::new(VMGR::>::new(dtype)), + #[cfg(feature = "propagate_nans")] + Float64 if propagate_nans => Box::new(VMGR::>::new(dtype)), + Float32 => Box::new(VMGR::>::new(dtype)), + Float64 => Box::new(VMGR::>::new(dtype)), + String | Binary => Box::new(VecGroupedReduction::::new(dtype)), + _ if dtype.is_integer() || dtype.is_temporal() => { + with_match_physical_integer_polars_type!(dtype.to_physical(), |$T| { + Box::new(VMGR::>::new(dtype)) + }) + }, + #[cfg(feature = "dtype-decimal")] + Decimal(_, _) => Box::new(VMGR::>::new(dtype)), + _ => unimplemented!(), } } -impl Reduction for MinReduce { - fn new_reducer(&self) -> Box { - Box::new(MinReduceState { - value: Scalar::new(self.dtype.clone(), AnyValue::Null), - }) +// These two variants ignore nans. +struct MinReducer(PhantomData); +struct MaxReducer(PhantomData); + +// These two variants propagate nans. +#[cfg(feature = "propagate_nans")] +struct NanMinReducer(PhantomData); +#[cfg(feature = "propagate_nans")] +struct NanMaxReducer(PhantomData); + +impl NumericReducer for MinReducer +where + T: PolarsNumericType, + ChunkedArray: ChunkAgg, +{ + type Dtype = T; + + #[inline(always)] + fn init() -> T::Native { + if T::Native::is_float() { + T::Native::nan_value() + } else { + T::Native::max_value() + } + } + + #[inline(always)] + fn combine(a: T::Native, b: T::Native) -> T::Native { + MinMax::min_ignore_nan(a, b) + } + + #[inline(always)] + fn reduce_ca(ca: &ChunkedArray) -> Option { + ChunkAgg::min(ca) + } +} + +impl NumericReducer for MaxReducer +where + T: PolarsNumericType, + ChunkedArray: ChunkAgg, +{ + type Dtype = T; + + #[inline(always)] + fn init() -> T::Native { + if T::Native::is_float() { + T::Native::nan_value() + } else { + T::Native::min_value() + } + } + + #[inline(always)] + fn combine(a: T::Native, b: T::Native) -> T::Native { + MinMax::max_ignore_nan(a, b) + } + + #[inline(always)] + fn reduce_ca(ca: &ChunkedArray) -> Option { + ChunkAgg::max(ca) + } +} + +#[cfg(feature = "propagate_nans")] +impl NumericReducer for NanMinReducer { + type Dtype = T; + + #[inline(always)] + fn init() -> T::Native { + T::Native::max_value() + } + + #[inline(always)] + fn combine(a: T::Native, b: T::Native) -> T::Native { + MinMax::min_propagate_nan(a, b) + } + + #[inline(always)] + fn reduce_ca(ca: &ChunkedArray) -> Option { + ca_nan_agg(ca, MinMax::min_propagate_nan) } } -struct MinReduceState { - value: Scalar, +#[cfg(feature = "propagate_nans")] +impl NumericReducer for NanMaxReducer { + type Dtype = T; + + #[inline(always)] + fn init() -> T::Native { + T::Native::min_value() + } + + #[inline(always)] + fn combine(a: T::Native, b: T::Native) -> T::Native { + MinMax::max_propagate_nan(a, b) + } + + #[inline(always)] + fn reduce_ca(ca: &ChunkedArray) -> Option { + ca_nan_agg(ca, MinMax::max_propagate_nan) + } } -impl MinReduceState { - fn update_with_value(&mut self, other: &AnyValue<'static>) { - // AnyValue uses total ordering, so NaN is greater than any value. - // This means other < self.value.value() already ignores incoming NaNs. - // We still must check if self is NaN and if so replace. - if self.value.is_null() - || !other.is_null() && (other < self.value.value() || self.value.is_nan()) - { - self.value.update(other.clone()); +struct BinaryMinReducer; +struct BinaryMaxReducer; + +impl Reducer for BinaryMinReducer { + type Dtype = BinaryType; + type Value = Option>; // TODO: evaluate SmallVec. + + fn init() -> Self::Value { + None + } + + #[inline(always)] + fn cast_series(s: &Series) -> Cow<'_, Series> { + Cow::Owned(s.cast(&DataType::Binary).unwrap()) + } + + fn combine(a: &mut Self::Value, b: &Self::Value) { + Self::reduce_one(a, b.as_deref()) + } + + fn reduce_one(a: &mut Self::Value, b: Option<&[u8]>) { + match (a, b) { + (_, None) => {}, + (l @ None, Some(r)) => *l = Some(r.to_owned()), + (Some(l), Some(r)) => { + if l.as_slice() > r { + l.clear(); + l.extend_from_slice(r); + } + }, } } + + fn reduce_ca(v: &mut Self::Value, ca: &BinaryChunked) { + Self::reduce_one(v, ca.min_binary()) + } + + fn finish(v: Vec, m: Option, dtype: &DataType) -> PolarsResult { + assert!(m.is_none()); // This should only be used with VecGroupedReduction. + let ca: BinaryChunked = v.into_iter().collect_ca(PlSmallStr::EMPTY); + ca.into_series().cast(dtype) + } +} + +impl Reducer for BinaryMaxReducer { + type Dtype = BinaryType; + type Value = Option>; // TODO: evaluate SmallVec. + + #[inline(always)] + fn init() -> Self::Value { + None + } + + #[inline(always)] + fn cast_series(s: &Series) -> Cow<'_, Series> { + Cow::Owned(s.cast(&DataType::Binary).unwrap()) + } + + #[inline(always)] + fn combine(a: &mut Self::Value, b: &Self::Value) { + Self::reduce_one(a, b.as_deref()) + } + + #[inline(always)] + fn reduce_one(a: &mut Self::Value, b: Option<&[u8]>) { + match (a, b) { + (_, None) => {}, + (l @ None, Some(r)) => *l = Some(r.to_owned()), + (Some(l), Some(r)) => { + if l.as_slice() < r { + l.clear(); + l.extend_from_slice(r); + } + }, + } + } + + #[inline(always)] + fn reduce_ca(v: &mut Self::Value, ca: &BinaryChunked) { + Self::reduce_one(v, ca.max_binary()) + } + + #[inline(always)] + fn finish(v: Vec, m: Option, dtype: &DataType) -> PolarsResult { + assert!(m.is_none()); // This should only be used with VecGroupedReduction. + let ca: BinaryChunked = v.into_iter().collect_ca(PlSmallStr::EMPTY); + ca.into_series().cast(dtype) + } +} + +#[derive(Default)] +pub struct BoolMinGroupedReduction { + values: MutableBitmap, + mask: MutableBitmap, } -impl ReductionState for MinReduceState { - fn update(&mut self, batch: &Series) -> PolarsResult<()> { - let sc = batch.min_reduce()?; - self.update_with_value(sc.value()); +impl GroupedReduction for BoolMinGroupedReduction { + fn new_empty(&self) -> Box { + Box::new(Self::default()) + } + + fn resize(&mut self, num_groups: IdxSize) { + self.values.resize(num_groups as usize, true); + self.mask.resize(num_groups as usize, false); + } + + fn update_group(&mut self, values: &Series, group_idx: IdxSize) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &DataType::Boolean); + let ca: &BooleanChunked = values.as_ref().as_ref(); + if !ca.all() { + self.values.set(group_idx as usize, false); + } + if ca.len() != ca.null_count() { + self.mask.set(group_idx as usize, true); + } Ok(()) } - fn combine(&mut self, other: &dyn ReductionState) -> PolarsResult<()> { + unsafe fn update_groups( + &mut self, + values: &Series, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &DataType::Boolean); + assert!(values.len() == group_idxs.len()); + let ca: &BooleanChunked = values.as_ref().as_ref(); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, ov) in group_idxs.iter().zip(ca.iter()) { + self.values + .and_pos_unchecked(*g as usize, ov.unwrap_or(true)); + self.mask.or_pos_unchecked(*g as usize, ov.is_some()); + } + } + Ok(()) + } + + unsafe fn combine( + &mut self, + other: &dyn GroupedReduction, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { let other = other.as_any().downcast_ref::().unwrap(); - self.update_with_value(other.value.value()); + assert!(self.values.len() == other.values.len()); + assert!(self.mask.len() == other.mask.len()); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, (v, o)) in group_idxs + .iter() + .zip(other.values.iter().zip(other.mask.iter())) + { + self.values.and_pos_unchecked(*g as usize, v); + self.mask.or_pos_unchecked(*g as usize, o); + } + } Ok(()) } - fn finalize(&self) -> PolarsResult { - Ok(self.value.clone()) + fn finalize(&mut self) -> PolarsResult { + let v = core::mem::take(&mut self.values); + let m = core::mem::take(&mut self.mask); + let arr = BooleanArray::from(v.freeze()) + .with_validity(Some(m.freeze())) + .boxed(); + Ok(unsafe { + Series::from_chunks_and_dtype_unchecked( + PlSmallStr::EMPTY, + vec![arr], + &DataType::Boolean, + ) + }) } fn as_any(&self) -> &dyn Any { @@ -58,58 +344,91 @@ impl ReductionState for MinReduceState { } } -#[derive(Clone)] -pub struct MaxReduce { - dtype: DataType, +#[derive(Default)] +pub struct BoolMaxGroupedReduction { + values: MutableBitmap, + mask: MutableBitmap, } -impl MaxReduce { - pub fn new(dtype: DataType) -> Self { - Self { dtype } +impl GroupedReduction for BoolMaxGroupedReduction { + fn new_empty(&self) -> Box { + Box::new(Self::default()) } -} -impl Reduction for MaxReduce { - fn new_reducer(&self) -> Box { - Box::new(MaxReduceState { - value: Scalar::new(self.dtype.clone(), AnyValue::Null), - }) + fn resize(&mut self, num_groups: IdxSize) { + self.values.resize(num_groups as usize, false); + self.mask.resize(num_groups as usize, false); } -} -struct MaxReduceState { - value: Scalar, -} - -impl MaxReduceState { - fn update_with_value(&mut self, other: &AnyValue<'static>) { - // AnyValue uses total ordering, so NaN is greater than any value. - // This means other > self.value.value() might have false positives. - // We also must check if self is NaN and if so replace. - if self.value.is_null() - || !other.is_null() - && (other > self.value.value() && !other.is_nan() || self.value.is_nan()) - { - self.value.update(other.clone()); + fn update_group(&mut self, values: &Series, group_idx: IdxSize) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &DataType::Boolean); + let ca: &BooleanChunked = values.as_ref().as_ref(); + if ca.any() { + self.values.set(group_idx as usize, true); + } + if ca.len() != ca.null_count() { + self.mask.set(group_idx as usize, true); } + Ok(()) } -} -impl ReductionState for MaxReduceState { - fn update(&mut self, batch: &Series) -> PolarsResult<()> { - let sc = batch.max_reduce()?; - self.update_with_value(sc.value()); + unsafe fn update_groups( + &mut self, + values: &Series, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &DataType::Boolean); + assert!(values.len() == group_idxs.len()); + let ca: &BooleanChunked = values.as_ref().as_ref(); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, ov) in group_idxs.iter().zip(ca.iter()) { + self.values + .or_pos_unchecked(*g as usize, ov.unwrap_or(false)); + self.mask.or_pos_unchecked(*g as usize, ov.is_some()); + } + } Ok(()) } - fn combine(&mut self, other: &dyn ReductionState) -> PolarsResult<()> { + unsafe fn combine( + &mut self, + other: &dyn GroupedReduction, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { let other = other.as_any().downcast_ref::().unwrap(); - self.update_with_value(other.value.value()); + assert!(self.values.len() == other.values.len()); + assert!(self.mask.len() == other.mask.len()); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, (v, o)) in group_idxs + .iter() + .zip(other.values.iter().zip(other.mask.iter())) + { + self.values.or_pos_unchecked(*g as usize, v); + self.mask.or_pos_unchecked(*g as usize, o); + } + } Ok(()) } - fn finalize(&self) -> PolarsResult { - Ok(self.value.clone()) + fn finalize(&mut self) -> PolarsResult { + let v = core::mem::take(&mut self.values); + let m = core::mem::take(&mut self.mask); + let arr = BooleanArray::from(v.freeze()) + .with_validity(Some(m.freeze())) + .boxed(); + Ok(unsafe { + Series::from_chunks_and_dtype_unchecked( + PlSmallStr::EMPTY, + vec![arr], + &DataType::Boolean, + ) + }) } fn as_any(&self) -> &dyn Any { diff --git a/crates/polars-expr/src/reduce/mod.rs b/crates/polars-expr/src/reduce/mod.rs index 26f9749b4479..8fc0620f27fe 100644 --- a/crates/polars-expr/src/reduce/mod.rs +++ b/crates/polars-expr/src/reduce/mod.rs @@ -2,39 +2,326 @@ mod convert; mod len; mod mean; mod min_max; -#[cfg(feature = "propagate_nans")] -mod nan_min_max; +// #[cfg(feature = "propagate_nans")] +// mod nan_min_max; mod sum; use std::any::Any; +use std::borrow::Cow; +use std::marker::PhantomData; +use arrow::array::PrimitiveArray; +use arrow::bitmap::{Bitmap, MutableBitmap}; pub use convert::into_reduction; use polars_core::prelude::*; -pub trait Reduction: Send { - /// Create a new reducer for this Reduction. - fn new_reducer(&self) -> Box; -} +/// A reduction with groups. +/// +/// Each group has its own reduction state that values can be aggregated into. +pub trait GroupedReduction: Any + Send { + /// Returns a new empty reduction. + fn new_empty(&self) -> Box; + + /// Resizes this GroupedReduction to the given number of groups. + /// + /// While not an actual member of the trait, the safety preconditions below + /// refer to self.num_groups() as given by the last call of this function. + fn resize(&mut self, num_groups: IdxSize); -pub trait ReductionState: Any + Send { - /// Adds the given series into the reduction. - fn update(&mut self, batch: &Series) -> PolarsResult<()>; + /// Updates the specified group with the given values. + fn update_group(&mut self, values: &Series, group_idx: IdxSize) -> PolarsResult<()>; - /// Adds the elements of the given series at the given indices into the reduction. + /// Updates this GroupedReduction with new values. values[i] should + /// be added to reduction self[group_idxs[i]]. /// /// # Safety - /// Implementations may elide bound checks. - unsafe fn update_gathered(&mut self, batch: &Series, idx: &[IdxSize]) -> PolarsResult<()> { - let batch = batch.take_unchecked_from_slice(idx); - self.update(&batch) - } + /// group_idxs[i] < self.num_groups() for all i. + unsafe fn update_groups(&mut self, values: &Series, group_idxs: &[IdxSize]) + -> PolarsResult<()>; - /// Combines this reduction with another. - fn combine(&mut self, other: &dyn ReductionState) -> PolarsResult<()>; + /// Combines this GroupedReduction with another. Group other[i] + /// should be combined into group self[group_idxs[i]]. + /// + /// # Safety + /// group_idxs[i] < self.num_groups() for all i. + unsafe fn combine( + &mut self, + other: &dyn GroupedReduction, + group_idxs: &[IdxSize], + ) -> PolarsResult<()>; - /// Returns a final result from the reduction. - fn finalize(&self) -> PolarsResult; + /// Returns the finalized value per group as a Series. + /// + /// After this operation the number of groups is reset to 0. + fn finalize(&mut self) -> PolarsResult; - /// Returns this ReductionState as a dyn Any. + /// Returns this GroupedReduction as a dyn Any. fn as_any(&self) -> &dyn Any; } + +// Helper traits used in the VecGroupedReduction and VecMaskGroupedReduction to +// reduce code duplication. +pub trait Reducer: Send + Sync + 'static { + type Dtype: PolarsDataType; + type Value: Clone + Send + Sync + 'static; + fn init() -> Self::Value; + #[inline(always)] + fn cast_series(s: &Series) -> Cow<'_, Series> { + Cow::Borrowed(s) + } + fn combine(a: &mut Self::Value, b: &Self::Value); + fn reduce_one(a: &mut Self::Value, b: Option<::Physical<'_>>); + fn reduce_ca(v: &mut Self::Value, ca: &ChunkedArray); + fn finish(v: Vec, m: Option, dtype: &DataType) -> PolarsResult; +} + +pub trait NumericReducer: Send + Sync + 'static { + type Dtype: PolarsNumericType; + fn init() -> ::Native; + fn combine( + a: ::Native, + b: ::Native, + ) -> ::Native; + fn reduce_ca( + ca: &ChunkedArray, + ) -> Option<::Native>; +} + +impl Reducer for T { + type Dtype = ::Dtype; + type Value = <::Dtype as PolarsNumericType>::Native; + + #[inline(always)] + fn init() -> Self::Value { + ::init() + } + + #[inline(always)] + fn cast_series(s: &Series) -> Cow<'_, Series> { + s.to_physical_repr() + } + + #[inline(always)] + fn combine(a: &mut Self::Value, b: &Self::Value) { + *a = ::combine(*a, *b); + } + + #[inline(always)] + fn reduce_one(a: &mut Self::Value, b: Option<::Physical<'_>>) { + if let Some(b) = b { + *a = ::combine(*a, b); + } + } + + #[inline(always)] + fn reduce_ca(v: &mut Self::Value, ca: &ChunkedArray) { + if let Some(r) = ::reduce_ca(ca) { + *v = ::combine(*v, r); + } + } + + fn finish(v: Vec, m: Option, dtype: &DataType) -> PolarsResult { + let arr = Box::new(PrimitiveArray::::from_vec(v).with_validity(m)); + Ok(unsafe { Series::from_chunks_and_dtype_unchecked(PlSmallStr::EMPTY, vec![arr], dtype) }) + } +} + +pub struct VecGroupedReduction { + values: Vec, + in_dtype: DataType, + reducer: PhantomData, +} + +impl VecGroupedReduction { + fn new(in_dtype: DataType) -> Self { + Self { + values: Vec::new(), + in_dtype, + reducer: PhantomData, + } + } +} + +impl GroupedReduction for VecGroupedReduction +where + R: Reducer, +{ + fn new_empty(&self) -> Box { + Box::new(Self { + values: Vec::new(), + in_dtype: self.in_dtype.clone(), + reducer: PhantomData, + }) + } + + fn resize(&mut self, num_groups: IdxSize) { + self.values.resize(num_groups as usize, R::init()); + } + + fn update_group(&mut self, values: &Series, group_idx: IdxSize) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &self.in_dtype); + let values = R::cast_series(values); + let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); + R::reduce_ca(&mut self.values[group_idx as usize], ca); + Ok(()) + } + + unsafe fn update_groups( + &mut self, + values: &Series, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &self.in_dtype); + assert!(values.len() == group_idxs.len()); + let values = R::cast_series(values); + let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, ov) in group_idxs.iter().zip(ca.iter()) { + let grp = self.values.get_unchecked_mut(*g as usize); + R::reduce_one(grp, ov); + } + } + Ok(()) + } + + unsafe fn combine( + &mut self, + other: &dyn GroupedReduction, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { + let other = other.as_any().downcast_ref::().unwrap(); + assert!(self.in_dtype == other.in_dtype); + assert!(self.values.len() == other.values.len()); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, v) in group_idxs.iter().zip(other.values.iter()) { + let grp = self.values.get_unchecked_mut(*g as usize); + R::combine(grp, v); + } + } + Ok(()) + } + + fn finalize(&mut self) -> PolarsResult { + let v = core::mem::take(&mut self.values); + R::finish(v, None, &self.in_dtype) + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub struct VecMaskGroupedReduction { + values: Vec, + mask: MutableBitmap, + in_dtype: DataType, + reducer: PhantomData, +} + +impl VecMaskGroupedReduction { + fn new(in_dtype: DataType) -> Self { + Self { + values: Vec::new(), + mask: MutableBitmap::new(), + in_dtype, + reducer: PhantomData, + } + } +} + +impl GroupedReduction for VecMaskGroupedReduction +where + R: Reducer, +{ + fn new_empty(&self) -> Box { + Box::new(Self { + values: Vec::new(), + mask: MutableBitmap::new(), + in_dtype: self.in_dtype.clone(), + reducer: PhantomData, + }) + } + + fn resize(&mut self, num_groups: IdxSize) { + self.values.resize(num_groups as usize, R::init()); + self.mask.resize(num_groups as usize, false); + } + + fn update_group(&mut self, values: &Series, group_idx: IdxSize) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &self.in_dtype); + let values = values.to_physical_repr(); + let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); + R::reduce_ca(&mut self.values[group_idx as usize], ca); + if ca.len() != ca.null_count() { + self.mask.set(group_idx as usize, true); + } + Ok(()) + } + + unsafe fn update_groups( + &mut self, + values: &Series, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &self.in_dtype); + assert!(values.len() == group_idxs.len()); + let values = values.to_physical_repr(); + let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, ov) in group_idxs.iter().zip(ca.iter()) { + if let Some(v) = ov { + let grp = self.values.get_unchecked_mut(*g as usize); + R::reduce_one(grp, Some(v)); + self.mask.set_unchecked(*g as usize, true); + } + } + } + Ok(()) + } + + unsafe fn combine( + &mut self, + other: &dyn GroupedReduction, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { + let other = other.as_any().downcast_ref::().unwrap(); + assert!(self.in_dtype == other.in_dtype); + assert!(self.values.len() == other.values.len()); + assert!(self.mask.len() == other.mask.len()); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, (v, o)) in group_idxs + .iter() + .zip(other.values.iter().zip(other.mask.iter())) + { + if o { + let grp = self.values.get_unchecked_mut(*g as usize); + R::combine(grp, v); + self.mask.set_unchecked(*g as usize, true); + } + } + } + Ok(()) + } + + fn finalize(&mut self) -> PolarsResult { + let v = core::mem::take(&mut self.values); + let m = core::mem::take(&mut self.mask); + R::finish(v, Some(m.freeze()), &self.in_dtype) + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/crates/polars-expr/src/reduce/nan_min_max.rs b/crates/polars-expr/src/reduce/nan_min_max.rs deleted file mode 100644 index 4a42ce37d3a5..000000000000 --- a/crates/polars-expr/src/reduce/nan_min_max.rs +++ /dev/null @@ -1,141 +0,0 @@ -use std::marker::PhantomData; - -use polars_compute::min_max::MinMaxKernel; -use polars_core::datatypes::PolarsFloatType; -use polars_utils::min_max::MinMax; - -use super::*; - -#[derive(Clone)] -pub struct NanMinReduce { - _phantom: PhantomData, -} - -impl NanMinReduce { - pub fn new() -> Self { - Self { - _phantom: PhantomData, - } - } -} - -impl Reduction for NanMinReduce -where - F::Array: for<'a> MinMaxKernel = F::Native>, -{ - fn new_reducer(&self) -> Box { - Box::new(NanMinReduceState:: { value: None }) - } -} - -struct NanMinReduceState { - value: Option, -} - -impl NanMinReduceState { - fn update_with_value(&mut self, other: Option) { - if let Some(other) = other { - if let Some(value) = self.value { - self.value = Some(MinMax::min_propagate_nan(value, other)); - } else { - self.value = Some(other); - } - } - } -} - -impl ReductionState for NanMinReduceState -where - F::Array: for<'a> MinMaxKernel = F::Native>, -{ - fn update(&mut self, batch: &Series) -> PolarsResult<()> { - let ca = batch.unpack::().unwrap(); - let reduced = ca - .downcast_iter() - .filter_map(MinMaxKernel::min_propagate_nan_kernel) - .reduce(MinMax::min_propagate_nan); - self.update_with_value(reduced); - Ok(()) - } - - fn combine(&mut self, other: &dyn ReductionState) -> PolarsResult<()> { - let other = other.as_any().downcast_ref::().unwrap(); - self.update_with_value(other.value); - Ok(()) - } - - fn finalize(&self) -> PolarsResult { - Ok(Scalar::new(F::get_dtype(), AnyValue::from(self.value))) - } - - fn as_any(&self) -> &dyn Any { - self - } -} - -#[derive(Clone)] -pub struct NanMaxReduce { - _phantom: PhantomData, -} - -impl NanMaxReduce { - pub fn new() -> Self { - Self { - _phantom: PhantomData, - } - } -} - -impl Reduction for NanMaxReduce -where - F::Array: for<'a> MinMaxKernel = F::Native>, -{ - fn new_reducer(&self) -> Box { - Box::new(NanMaxReduceState:: { value: None }) - } -} - -struct NanMaxReduceState { - value: Option, -} - -impl NanMaxReduceState { - fn update_with_value(&mut self, other: Option) { - if let Some(other) = other { - if let Some(value) = self.value { - self.value = Some(MinMax::max_propagate_nan(value, other)); - } else { - self.value = Some(other); - } - } - } -} - -impl ReductionState for NanMaxReduceState -where - F::Array: for<'a> MinMaxKernel = F::Native>, -{ - fn update(&mut self, batch: &Series) -> PolarsResult<()> { - let ca = batch.unpack::().unwrap(); - let reduced = ca - .downcast_iter() - .filter_map(MinMaxKernel::max_propagate_nan_kernel) - .reduce(MinMax::max_propagate_nan); - self.update_with_value(reduced); - Ok(()) - } - - fn combine(&mut self, other: &dyn ReductionState) -> PolarsResult<()> { - let other = other.as_any().downcast_ref::().unwrap(); - self.update_with_value(other.value); - Ok(()) - } - - fn finalize(&self) -> PolarsResult { - Ok(Scalar::new(F::get_dtype(), AnyValue::from(self.value))) - } - - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/crates/polars-expr/src/reduce/sum.rs b/crates/polars-expr/src/reduce/sum.rs index 0f1d094ded3f..2b5d9d79c13f 100644 --- a/crates/polars-expr/src/reduce/sum.rs +++ b/crates/polars-expr/src/reduce/sum.rs @@ -1,58 +1,141 @@ -use polars_core::prelude::{AnyValue, DataType}; +use std::borrow::Cow; + +use arrow::array::PrimitiveArray; +use num_traits::Zero; use super::*; -#[derive(Clone)] -pub struct SumReduce { - dtype: DataType, +pub struct SumReduce { + sums: Vec, + in_dtype: DataType, } -impl SumReduce { - pub fn new(dtype: DataType) -> Self { - // We cast small dtypes up in the sum, we must also do this when - // returning the empty sum to be consistent. - use DataType::*; - let dtype = match dtype { - Boolean => IDX_DTYPE, - Int8 | UInt8 | Int16 | UInt16 => Int64, - dt => dt, - }; - Self { dtype } +impl SumReduce { + fn new(in_dtype: DataType) -> Self { + SumReduce { + sums: Vec::new(), + in_dtype, + } } } -impl Reduction for SumReduce { - fn new_reducer(&self) -> Box { - let value = Scalar::new(self.dtype.clone(), AnyValue::zero_sum(&self.dtype)); - Box::new(SumReduceState { value }) +pub fn new_sum_reduction(dtype: DataType) -> Box { + use DataType::*; + match dtype { + Boolean => Box::new(SumReduce::::new(dtype)), + Int8 | UInt8 | Int16 | UInt16 => Box::new(SumReduce::::new(dtype)), + UInt32 => Box::new(SumReduce::::new(dtype)), + UInt64 => Box::new(SumReduce::::new(dtype)), + Int32 => Box::new(SumReduce::::new(dtype)), + Int64 => Box::new(SumReduce::::new(dtype)), + Float32 => Box::new(SumReduce::::new(dtype)), + Float64 => Box::new(SumReduce::::new(dtype)), + #[cfg(feature = "dtype-decimal")] + Decimal(_, _) => Box::new(SumReduce::::new(dtype)), + Duration(_) => Box::new(SumReduce::::new(dtype)), + _ => unimplemented!(), } } -struct SumReduceState { - value: Scalar, +fn cast_sum_input<'a>(s: &'a Series, dt: &DataType) -> PolarsResult> { + use DataType::*; + match dt { + Boolean => Ok(Cow::Owned(s.cast(&IDX_DTYPE)?)), + Int8 | UInt8 | Int16 | UInt16 => Ok(Cow::Owned(s.cast(&Int64)?)), + #[cfg(feature = "dtype-decimal")] + Decimal(_, _) => Ok(Cow::Owned( + s.decimal().unwrap().physical().clone().into_series(), + )), + #[cfg(feature = "dtype-duration")] + Duration(_) => Ok(Cow::Owned( + s.duration().unwrap().physical().clone().into_series(), + )), + _ => Ok(Cow::Borrowed(s)), + } } -impl SumReduceState { - fn add_value(&mut self, other: &AnyValue<'_>) { - self.value.update(self.value.value().add(other)); +fn out_dtype(in_dtype: &DataType) -> DataType { + use DataType::*; + match in_dtype { + Boolean => IDX_DTYPE, + Int8 | UInt8 | Int16 | UInt16 => Int64, + dt => dt.clone(), } } -impl ReductionState for SumReduceState { - fn update(&mut self, batch: &Series) -> PolarsResult<()> { - let reduced = batch.sum_reduce()?; - self.add_value(reduced.value()); +impl GroupedReduction for SumReduce +where + T: PolarsNumericType, + ChunkedArray: ChunkAgg + IntoSeries, +{ + fn new_empty(&self) -> Box { + Box::new(Self { + sums: Vec::new(), + in_dtype: self.in_dtype.clone(), + }) + } + + fn resize(&mut self, num_groups: IdxSize) { + self.sums.resize(num_groups as usize, T::Native::zero()); + } + + fn update_group(&mut self, values: &Series, group_idx: IdxSize) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &self.in_dtype); + let values = cast_sum_input(values, &self.in_dtype)?; + let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); + self.sums[group_idx as usize] += ChunkAgg::sum(ca).unwrap_or(T::Native::zero()); + Ok(()) + } + + unsafe fn update_groups( + &mut self, + values: &Series, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { + // TODO: we should really implement a sum-as-other-type operation instead + // of doing this materialized cast. + assert!(values.dtype() == &self.in_dtype); + let values = cast_sum_input(values, &self.in_dtype)?; + assert!(values.len() == group_idxs.len()); + let ca: &ChunkedArray = values.as_ref().as_ref().as_ref(); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, v) in group_idxs.iter().zip(ca.iter()) { + *self.sums.get_unchecked_mut(*g as usize) += v.unwrap_or(T::Native::zero()); + } + } Ok(()) } - fn combine(&mut self, other: &dyn ReductionState) -> PolarsResult<()> { + unsafe fn combine( + &mut self, + other: &dyn GroupedReduction, + group_idxs: &[IdxSize], + ) -> PolarsResult<()> { let other = other.as_any().downcast_ref::().unwrap(); - self.add_value(other.value.value()); + assert!(self.in_dtype == other.in_dtype); + assert!(self.sums.len() == other.sums.len()); + unsafe { + // SAFETY: indices are in-bounds guaranteed by trait. + for (g, v) in group_idxs.iter().zip(other.sums.iter()) { + *self.sums.get_unchecked_mut(*g as usize) += *v; + } + } Ok(()) } - fn finalize(&self) -> PolarsResult { - Ok(self.value.clone()) + fn finalize(&mut self) -> PolarsResult { + let v = core::mem::take(&mut self.sums); + let arr = Box::new(PrimitiveArray::::from_vec(v)); + Ok(unsafe { + Series::from_chunks_and_dtype_unchecked( + PlSmallStr::EMPTY, + vec![arr], + &out_dtype(&self.in_dtype), + ) + }) } fn as_any(&self) -> &dyn Any { diff --git a/crates/polars-stream/src/nodes/reduce.rs b/crates/polars-stream/src/nodes/reduce.rs index 15048daba4f8..ded581a4cf38 100644 --- a/crates/polars-stream/src/nodes/reduce.rs +++ b/crates/polars-stream/src/nodes/reduce.rs @@ -1,7 +1,9 @@ use std::sync::Arc; +use polars_core::frame::column::ScalarColumn; +use polars_core::prelude::Column; use polars_core::schema::{Schema, SchemaExt}; -use polars_expr::reduce::{Reduction, ReductionState}; +use polars_expr::reduce::GroupedReduction; use polars_utils::itertools::Itertools; use super::compute_node_prelude::*; @@ -11,8 +13,7 @@ use crate::morsel::SourceToken; enum ReduceState { Sink { selectors: Vec, - reductions: Vec>, - reduction_states: Vec>, + reductions: Vec>, }, Source(Option), Done, @@ -26,15 +27,13 @@ pub struct ReduceNode { impl ReduceNode { pub fn new( selectors: Vec, - reductions: Vec>, + reductions: Vec>, output_schema: Arc, ) -> Self { - let reduction_states = reductions.iter().map(|r| r.new_reducer()).collect(); Self { state: ReduceState::Sink { selectors, reductions, - reduction_states, }, output_schema, } @@ -42,8 +41,7 @@ impl ReduceNode { fn spawn_sink<'env, 's>( selectors: &'env [StreamExpr], - reductions: &'env mut [Box], - reduction_states: &'env mut [Box], + reductions: &'env mut [Box], scope: &'s TaskScope<'s, 'env>, recv: RecvPort<'_>, state: &'s ExecutionState, @@ -53,14 +51,20 @@ impl ReduceNode { .parallel() .into_iter() .map(|mut recv| { - let mut local_reducers: Vec<_> = - reductions.iter().map(|d| d.new_reducer()).collect(); + let mut local_reducers: Vec<_> = reductions + .iter() + .map(|d| { + let mut r = d.new_empty(); + r.resize(1); + r + }) + .collect(); scope.spawn_task(TaskPriority::High, async move { while let Ok(morsel) = recv.recv().await { for (reducer, selector) in local_reducers.iter_mut().zip(selectors) { let input = selector.evaluate(morsel.df(), state).await?; - reducer.update(&input)?; + reducer.update_group(&input, 0)?; } } @@ -72,8 +76,11 @@ impl ReduceNode { join_handles.push(scope.spawn_task(TaskPriority::High, async move { for task in parallel_tasks { let local_reducers = task.await?; - for (r1, r2) in reduction_states.iter_mut().zip(local_reducers) { - r1.combine(&*r2)?; + for (r1, r2) in reductions.iter_mut().zip(local_reducers) { + r1.resize(1); + unsafe { + r1.combine(&*r2, &[0])?; + } } } @@ -111,18 +118,14 @@ impl ComputeNode for ReduceNode { self.state = ReduceState::Done; }, // Input is done, transition to being a source. - ReduceState::Sink { - reduction_states, .. - } if matches!(recv[0], PortState::Done) => { - let columns = reduction_states + ReduceState::Sink { reductions, .. } if matches!(recv[0], PortState::Done) => { + let columns = reductions .iter_mut() .zip(self.output_schema.iter_fields()) .map(|(r, field)| { - r.finalize().map(|scalar| { - scalar - .into_column(field.name.clone()) - .cast(&field.dtype) - .unwrap() + r.finalize().map(|s| { + let s = s.with_name(field.name.clone()).cast(&field.dtype).unwrap(); + Column::Scalar(ScalarColumn::unit_scalar_from_series(s)) }) }) .try_collect_vec()?; @@ -169,19 +172,10 @@ impl ComputeNode for ReduceNode { ReduceState::Sink { selectors, reductions, - reduction_states, } => { assert!(send[0].is_none()); let recv_port = recv[0].take().unwrap(); - Self::spawn_sink( - selectors, - reductions, - reduction_states, - scope, - recv_port, - state, - join_handles, - ) + Self::spawn_sink(selectors, reductions, scope, recv_port, state, join_handles) }, ReduceState::Source(df) => { assert!(recv[0].is_none()); diff --git a/crates/polars-utils/src/float.rs b/crates/polars-utils/src/float.rs index 30d084985782..30d47397c28e 100644 --- a/crates/polars-utils/src/float.rs +++ b/crates/polars-utils/src/float.rs @@ -1,6 +1,6 @@ /// # Safety /// unsafe code downstream relies on the correct is_float call -pub unsafe trait IsFloat: private::Sealed { +pub unsafe trait IsFloat: private::Sealed + Sized { fn is_float() -> bool { false } @@ -13,6 +13,10 @@ pub unsafe trait IsFloat: private::Sealed { false } + fn nan_value() -> Self { + unimplemented!() + } + #[allow(clippy::wrong_self_convention)] fn is_nan(&self) -> bool where @@ -78,6 +82,10 @@ macro_rules! impl_is_float { $is_f64 } + fn nan_value() -> Self { + Self::NAN + } + #[inline] fn is_nan(&self) -> bool { <$tp>::is_nan(*self) diff --git a/crates/polars/tests/it/arrow/bitmap/utils/mod.rs b/crates/polars/tests/it/arrow/bitmap/utils/mod.rs index 12af43e4e949..ebd8d983dec0 100644 --- a/crates/polars/tests/it/arrow/bitmap/utils/mod.rs +++ b/crates/polars/tests/it/arrow/bitmap/utils/mod.rs @@ -16,22 +16,24 @@ fn get_bit_basics() { 0b00000000, 0b00000001, 0b00000010, 0b00000100, 0b00001000, 0b00010000, 0b00100000, 0b01000000, 0b11111111, ]; - for i in 0..8 { - assert!(!get_bit(input, i)); + unsafe { + for i in 0..8 { + assert!(!get_bit_unchecked(input, i)); + } + assert!(get_bit_unchecked(input, 8)); + for i in 8 + 1..2 * 8 { + assert!(!get_bit_unchecked(input, i)); + } + assert!(get_bit_unchecked(input, 2 * 8 + 1)); + for i in 2 * 8 + 2..3 * 8 { + assert!(!get_bit_unchecked(input, i)); + } + assert!(get_bit_unchecked(input, 3 * 8 + 2)); + for i in 3 * 8 + 3..4 * 8 { + assert!(!get_bit_unchecked(input, i)); + } + assert!(get_bit_unchecked(input, 4 * 8 + 3)); } - assert!(get_bit(input, 8)); - for i in 8 + 1..2 * 8 { - assert!(!get_bit(input, i)); - } - assert!(get_bit(input, 2 * 8 + 1)); - for i in 2 * 8 + 2..3 * 8 { - assert!(!get_bit(input, i)); - } - assert!(get_bit(input, 3 * 8 + 2)); - for i in 3 * 8 + 3..4 * 8 { - assert!(!get_bit(input, i)); - } - assert!(get_bit(input, 4 * 8 + 3)); } #[test]