Skip to content

Commit

Permalink
refactor: Use Scalar instead of Series some aggregations (pola-rs#16277)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored and Wouittone committed Jun 22, 2024
1 parent 190db23 commit e768cf3
Show file tree
Hide file tree
Showing 27 changed files with 391 additions and 374 deletions.
174 changes: 85 additions & 89 deletions crates/polars-core/src/chunked_array/ops/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,24 @@ use super::float_sorted_arg_max::{
use crate::chunked_array::ChunkedArray;
use crate::datatypes::{BooleanChunked, PolarsNumericType};
use crate::prelude::*;
use crate::series::implementations::SeriesWrap;
use crate::series::IsSorted;

/// Aggregations that return [`Series`] of unit length. Those can be used in broadcasting operations.
pub trait ChunkAggSeries {
/// Get the sum of the [`ChunkedArray`] as a new [`Series`] of length 1.
fn sum_as_series(&self) -> Scalar {
fn sum_reduce(&self) -> Scalar {
unimplemented!()
}
/// Get the max of the [`ChunkedArray`] as a new [`Series`] of length 1.
fn max_as_series(&self) -> Series {
fn max_reduce(&self) -> Scalar {
unimplemented!()
}
/// Get the min of the [`ChunkedArray`] as a new [`Series`] of length 1.
fn min_as_series(&self) -> Series {
fn min_reduce(&self) -> Scalar {
unimplemented!()
}
/// Get the product of the [`ChunkedArray`] as a new [`Series`] of length 1.
fn prod_as_series(&self) -> Series {
fn prod_reduce(&self) -> Scalar {
unimplemented!()
}
}
Expand Down Expand Up @@ -263,75 +262,70 @@ where
Add<Output = <T::Native as Simd>::Simd> + compute::aggregate::Sum<T::Native>,
ChunkedArray<T>: IntoSeries,
{
fn sum_as_series(&self) -> Scalar {
fn sum_reduce(&self) -> Scalar {
let v: Option<T::Native> = self.sum();
Scalar::new(T::get_dtype().clone(), v.into())
Scalar::new(T::get_dtype(), v.into())
}

fn max_as_series(&self) -> Series {
fn max_reduce(&self) -> Scalar {
let v = ChunkAgg::max(self);
let mut ca: ChunkedArray<T> = [v].iter().copied().collect();
ca.rename(self.name());
ca.into_series()
Scalar::new(T::get_dtype(), v.into())
}

fn min_as_series(&self) -> Series {
fn min_reduce(&self) -> Scalar {
let v = ChunkAgg::min(self);
let mut ca: ChunkedArray<T> = [v].iter().copied().collect();
ca.rename(self.name());
ca.into_series()
Scalar::new(T::get_dtype(), v.into())
}

fn prod_as_series(&self) -> Series {
fn prod_reduce(&self) -> Scalar {
let mut prod = T::Native::one();
for opt_v in self.into_iter().flatten() {
prod = prod * opt_v;

for arr in self.downcast_iter() {
for v in arr.into_iter().flatten() {
prod = prod * *v
}
}
Self::from_slice_options(self.name(), &[Some(prod)]).into_series()
Scalar::new(T::get_dtype(), prod.into())
}
}

fn as_series<T>(name: &str, v: Option<T::Native>) -> Series
where
T: PolarsNumericType,
SeriesWrap<ChunkedArray<T>>: SeriesTrait,
{
let mut ca: ChunkedArray<T> = [v].into_iter().collect();
ca.rename(name);
ca.into_series()
}

impl<T> VarAggSeries for ChunkedArray<T>
where
T: PolarsIntegerType,
ChunkedArray<T>: ChunkVar,
{
fn var_as_series(&self, ddof: u8) -> Series {
as_series::<Float64Type>(self.name(), self.var(ddof))
fn var_reduce(&self, ddof: u8) -> Scalar {
let v = self.var(ddof);
Scalar::new(DataType::Float64, v.into())
}

fn std_as_series(&self, ddof: u8) -> Series {
as_series::<Float64Type>(self.name(), self.std(ddof))
fn std_reduce(&self, ddof: u8) -> Scalar {
let v = self.std(ddof);
Scalar::new(DataType::Float64, v.into())
}
}

impl VarAggSeries for Float32Chunked {
fn var_as_series(&self, ddof: u8) -> Series {
as_series::<Float32Type>(self.name(), self.var(ddof).map(|x| x as f32))
fn var_reduce(&self, ddof: u8) -> Scalar {
let v = self.var(ddof).map(|v| v as f32);
Scalar::new(DataType::Float32, v.into())
}

fn std_as_series(&self, ddof: u8) -> Series {
as_series::<Float32Type>(self.name(), self.std(ddof).map(|x| x as f32))
fn std_reduce(&self, ddof: u8) -> Scalar {
let v = self.std(ddof).map(|v| v as f32);
Scalar::new(DataType::Float32, v.into())
}
}

impl VarAggSeries for Float64Chunked {
fn var_as_series(&self, ddof: u8) -> Series {
as_series::<Float64Type>(self.name(), self.var(ddof))
fn var_reduce(&self, ddof: u8) -> Scalar {
let v = self.var(ddof);
Scalar::new(DataType::Float64, v.into())
}

fn std_as_series(&self, ddof: u8) -> Series {
as_series::<Float64Type>(self.name(), self.std(ddof))
fn std_reduce(&self, ddof: u8) -> Scalar {
let v = self.std(ddof);
Scalar::new(DataType::Float64, v.into())
}
}

Expand All @@ -342,68 +336,65 @@ where
<T::Native as Simd>::Simd:
Add<Output = <T::Native as Simd>::Simd> + compute::aggregate::Sum<T::Native>,
{
fn quantile_as_series(
fn quantile_reduce(
&self,
quantile: f64,
interpol: QuantileInterpolOptions,
) -> PolarsResult<Series> {
Ok(as_series::<Float64Type>(
self.name(),
self.quantile(quantile, interpol)?,
))
) -> PolarsResult<Scalar> {
let v = self.quantile(quantile, interpol)?;
Ok(Scalar::new(DataType::Float64, v.into()))
}

fn median_as_series(&self) -> Series {
as_series::<Float64Type>(self.name(), self.median())
fn median_reduce(&self) -> Scalar {
let v = self.median();
Scalar::new(DataType::Float64, v.into())
}
}

impl QuantileAggSeries for Float32Chunked {
fn quantile_as_series(
fn quantile_reduce(
&self,
quantile: f64,
interpol: QuantileInterpolOptions,
) -> PolarsResult<Series> {
Ok(as_series::<Float32Type>(
self.name(),
self.quantile(quantile, interpol)?,
))
) -> PolarsResult<Scalar> {
let v = self.quantile(quantile, interpol)?;
Ok(Scalar::new(DataType::Float32, v.into()))
}

fn median_as_series(&self) -> Series {
as_series::<Float32Type>(self.name(), self.median())
fn median_reduce(&self) -> Scalar {
let v = self.median();
Scalar::new(DataType::Float32, v.into())
}
}

impl QuantileAggSeries for Float64Chunked {
fn quantile_as_series(
fn quantile_reduce(
&self,
quantile: f64,
interpol: QuantileInterpolOptions,
) -> PolarsResult<Series> {
Ok(as_series::<Float64Type>(
self.name(),
self.quantile(quantile, interpol)?,
))
) -> PolarsResult<Scalar> {
let v = self.quantile(quantile, interpol)?;
Ok(Scalar::new(DataType::Float64, v.into()))
}

fn median_as_series(&self) -> Series {
as_series::<Float64Type>(self.name(), self.median())
fn median_reduce(&self) -> Scalar {
let v = self.median();
Scalar::new(DataType::Float64, v.into())
}
}

impl ChunkAggSeries for BooleanChunked {
fn sum_as_series(&self) -> Scalar {
fn sum_reduce(&self) -> Scalar {
let v = self.sum();
Scalar::new(IDX_DTYPE, v.into())
}
fn max_as_series(&self) -> Series {
fn max_reduce(&self) -> Scalar {
let v = self.max();
Series::new(self.name(), [v])
Scalar::new(DataType::Boolean, v.into())
}
fn min_as_series(&self) -> Series {
fn min_reduce(&self) -> Scalar {
let v = self.min();
Series::new(self.name(), [v])
Scalar::new(DataType::Boolean, v.into())
}
}

Expand Down Expand Up @@ -457,14 +448,16 @@ impl StringChunked {
}

impl ChunkAggSeries for StringChunked {
fn sum_as_series(&self) -> Scalar {
fn sum_reduce(&self) -> Scalar {
Scalar::new(DataType::String, AnyValue::Null)
}
fn max_as_series(&self) -> Series {
Series::new(self.name(), &[self.max_str()])
fn max_reduce(&self) -> Scalar {
let av: AnyValue = self.max_str().into();
Scalar::new(DataType::String, av.into_static().unwrap())
}
fn min_as_series(&self) -> Series {
Series::new(self.name(), &[self.min_str()])
fn min_reduce(&self) -> Scalar {
let av: AnyValue = self.min_str().into();
Scalar::new(DataType::String, av.into_static().unwrap())
}
}

Expand Down Expand Up @@ -529,11 +522,13 @@ impl CategoricalChunked {

#[cfg(feature = "dtype-categorical")]
impl ChunkAggSeries for CategoricalChunked {
fn min_as_series(&self) -> Series {
Series::new(self.name(), &[self.min_categorical()])
fn min_reduce(&self) -> Scalar {
let av: AnyValue = self.min_categorical().into();
Scalar::new(DataType::String, av.into_static().unwrap())
}
fn max_as_series(&self) -> Series {
Series::new(self.name(), &[self.max_categorical()])
fn max_reduce(&self) -> Scalar {
let av: AnyValue = self.max_categorical().into();
Scalar::new(DataType::String, av.into_static().unwrap())
}
}

Expand Down Expand Up @@ -588,14 +583,16 @@ impl BinaryChunked {
}

impl ChunkAggSeries for BinaryChunked {
fn sum_as_series(&self) -> Scalar {
fn sum_reduce(&self) -> Scalar {
unimplemented!()
}
fn max_as_series(&self) -> Series {
Series::new(self.name(), [self.max_binary()])
fn max_reduce(&self) -> Scalar {
let av: AnyValue = self.max_binary().into();
Scalar::new(self.dtype().clone(), av.into_static().unwrap())
}
fn min_as_series(&self) -> Series {
Series::new(self.name(), [self.min_binary()])
fn min_reduce(&self) -> Scalar {
let av: AnyValue = self.min_binary().into();
Scalar::new(self.dtype().clone(), av.into_static().unwrap())
}
}

Expand Down Expand Up @@ -686,18 +683,17 @@ mod test {
assert_eq!(ca.mean().unwrap(), 1.5);
assert_eq!(
ca.into_series()
.mean_as_series()
.f32()
.unwrap()
.get(0)
.mean_reduce()
.value()
.extract::<f32>()
.unwrap(),
1.5
);
// all null values case
let ca = Float32Chunked::full_null("", 3);
assert_eq!(ca.mean(), None);
assert_eq!(
ca.into_series().mean_as_series().f32().unwrap().get(0),
ca.into_series().mean_reduce().value().extract::<f32>(),
None
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use super::*;

pub trait QuantileAggSeries {
/// Get the median of the [`ChunkedArray`] as a new [`Series`] of length 1.
fn median_as_series(&self) -> Series;
fn median_reduce(&self) -> Scalar;
/// Get the quantile of the [`ChunkedArray`] as a new [`Series`] of length 1.
fn quantile_as_series(
fn quantile_reduce(
&self,
_quantile: f64,
_interpol: QuantileInterpolOptions,
) -> PolarsResult<Series>;
) -> PolarsResult<Scalar>;
}

/// helper
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-core/src/chunked_array/ops/aggregate/var.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use super::*;

pub trait VarAggSeries {
/// Get the variance of the [`ChunkedArray`] as a new [`Series`] of length 1.
fn var_as_series(&self, ddof: u8) -> Series;
fn var_reduce(&self, ddof: u8) -> Scalar;
/// Get the standard deviation of the [`ChunkedArray`] as a new [`Series`] of length 1.
fn std_as_series(&self, ddof: u8) -> Series;
fn std_reduce(&self, ddof: u8) -> Scalar;
}

impl<T> ChunkVar for ChunkedArray<T>
Expand Down
Loading

0 comments on commit e768cf3

Please sign in to comment.