Skip to content

Commit

Permalink
fix: Fix invalid list collection in expression engine (#19191)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 11, 2024
1 parent f8a7041 commit 0870a5d
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 104 deletions.
11 changes: 10 additions & 1 deletion crates/polars-core/src/chunked_array/builder/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ pub use null::*;
pub use primitive::*;

use super::*;
#[cfg(feature = "object")]
use crate::chunked_array::object::registry::get_object_builder;

pub trait ListBuilderTrait {
fn append_opt_series(&mut self, opt_s: Option<&Series>) -> PolarsResult<()> {
Expand Down Expand Up @@ -115,7 +117,14 @@ pub fn get_list_builder(

match &physical_type {
#[cfg(feature = "object")]
DataType::Object(_, _) => polars_bail!(opq = list_builder, &physical_type),
DataType::Object(_, _) => {
let builder = get_object_builder(PlSmallStr::EMPTY, 0).get_list_builder(
name,
value_capacity,
list_capacity,
);
Ok(Box::new(builder))
},
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => Ok(Box::new(AnonymousOwnedListBuilder::new(
name,
Expand Down
48 changes: 15 additions & 33 deletions crates/polars-core/src/chunked_array/from_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,42 +199,24 @@ impl FromIterator<Option<Series>> for ListChunked {
}
builder.finish()
} else {
match first_s.dtype() {
#[cfg(feature = "object")]
DataType::Object(_, _) => {
let mut builder =
first_s.get_list_builder(PlSmallStr::EMPTY, capacity * 5, capacity);
for _ in 0..init_null_count {
builder.append_null();
}
builder.append_series(first_s).unwrap();
// We don't know the needed capacity. We arbitrarily choose an average of 5 elements per series.
let mut builder = get_list_builder(
first_s.dtype(),
capacity * 5,
capacity,
PlSmallStr::EMPTY,
)
.unwrap();

for opt_s in it {
builder.append_opt_series(opt_s.as_ref()).unwrap();
}
builder.finish()
},
_ => {
// We don't know the needed capacity. We arbitrarily choose an average of 5 elements per series.
let mut builder = get_list_builder(
first_s.dtype(),
capacity * 5,
capacity,
PlSmallStr::EMPTY,
)
.unwrap();

for _ in 0..init_null_count {
builder.append_null();
}
builder.append_series(first_s).unwrap();
for _ in 0..init_null_count {
builder.append_null();
}
builder.append_series(first_s).unwrap();

for opt_s in it {
builder.append_opt_series(opt_s.as_ref()).unwrap();
}
builder.finish()
},
for opt_s in it {
builder.append_opt_series(opt_s.as_ref()).unwrap();
}
builder.finish()
}
},
}
Expand Down
32 changes: 6 additions & 26 deletions crates/polars-core/src/chunked_array/from_iterator_par.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,13 @@ fn materialize_list(
value_capacity: usize,
list_capacity: usize,
) -> ListChunked {
match &dtype {
#[cfg(feature = "object")]
DataType::Object(_, _) => {
let s = vectors
.iter()
.flatten()
.find_map(|opt_s| opt_s.as_ref())
.unwrap();
let mut builder = s.get_list_builder(name, value_capacity, list_capacity);

for v in vectors {
for val in v {
builder.append_opt_series(val.as_ref()).unwrap();
}
}
builder.finish()
},
dtype => {
let mut builder = get_list_builder(dtype, value_capacity, list_capacity, name).unwrap();
for v in vectors {
for val in v {
builder.append_opt_series(val.as_ref()).unwrap();
}
}
builder.finish()
},
let mut builder = get_list_builder(&dtype, value_capacity, list_capacity, name).unwrap();
for v in vectors {
for val in v {
builder.append_opt_series(val.as_ref()).unwrap();
}
}
builder.finish()
}

impl FromParallelIterator<Option<Series>> for ListChunked {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ impl<T: PolarsObject> ObjectChunked<T> {
}
}

struct ExtensionListBuilder<T: PolarsObject> {
pub(crate) struct ExtensionListBuilder<T: PolarsObject> {
values_builder: ObjectChunkedBuilder<T>,
offsets: Vec<i64>,
fast_explode: bool,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub(crate) mod drop;
mod list;
pub(super) mod list;
pub(crate) mod polars_extension;

use std::mem;
Expand Down
21 changes: 20 additions & 1 deletion crates/polars-core/src/chunked_array/object/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use polars_utils::pl_str::PlSmallStr;

use crate::chunked_array::object::builder::ObjectChunkedBuilder;
use crate::datatypes::AnyValue;
use crate::prelude::PolarsObject;
use crate::prelude::{ListBuilderTrait, PolarsObject};
use crate::series::{IntoSeries, Series};

/// Takes a `name` and `capacity` and constructs a new builder.
Expand Down Expand Up @@ -71,6 +71,13 @@ pub trait AnonymousObjectBuilder {
/// Take the current state and materialize as a [`Series`]
/// the builder should not be used after that.
fn to_series(&mut self) -> Series;

fn get_list_builder(
&self,
name: PlSmallStr,
values_capacity: usize,
list_capacity: usize,
) -> Box<dyn ListBuilderTrait>;
}

impl<T: PolarsObject> AnonymousObjectBuilder for ObjectChunkedBuilder<T> {
Expand All @@ -87,6 +94,18 @@ impl<T: PolarsObject> AnonymousObjectBuilder for ObjectChunkedBuilder<T> {
let builder = std::mem::take(self);
builder.finish().into_series()
}
fn get_list_builder(
&self,
name: PlSmallStr,
values_capacity: usize,
list_capacity: usize,
) -> Box<dyn ListBuilderTrait> {
Box::new(super::extension::list::ExtensionListBuilder::<T>::new(
name,
values_capacity,
list_capacity,
))
}
}

pub fn register_object_builder(
Expand Down
66 changes: 32 additions & 34 deletions crates/polars-expr/src/expressions/apply.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::borrow::Cow;

use polars_core::chunked_array::builder::get_list_builder;
use polars_core::prelude::*;
use polars_core::POOL;
#[cfg(feature = "parquet")]
Expand Down Expand Up @@ -265,46 +266,43 @@ impl ApplyExpr {
// Length of the items to iterate over.
let len = iters[0].size_hint().0;

if len == 0 {
drop(iters);

// Take the first aggregation context that as that is the input series.
let mut ac = acs.swap_remove(0);
ac.with_update_groups(UpdateGroups::No);

let agg_state = if self.function_returns_scalar {
AggState::AggregatedScalar(Series::new_empty(field.name().clone(), &field.dtype))
} else {
match self.collect_groups {
ApplyOptions::ElementWise | ApplyOptions::ApplyList => ac
.agg_state()
.map(|_| Series::new_empty(field.name().clone(), &field.dtype)),
ApplyOptions::GroupWise => AggState::AggregatedList(Series::new_empty(
field.name().clone(),
&DataType::List(Box::new(field.dtype.clone())),
)),
}
};

ac.with_agg_state(agg_state);
return Ok(ac);
}

let ca = (0..len)
.map(|_| {
let ca = if len == 0 {
let mut builder = get_list_builder(&field.dtype, len * 5, len, field.name)?;
for _ in 0..len {
container.clear();
for iter in &mut iters {
match iter.next().unwrap() {
None => return Ok(None),
None => {
builder.append_null();
},
Some(s) => container.push(s.deep_clone().into()),
}
}
self.function
let out = self
.function
.call_udf(&mut container)
.map(|r| r.map(|c| c.as_materialized_series().clone()))
})
.collect::<PolarsResult<ListChunked>>()?
.with_name(field.name.clone());
.map(|r| r.map(|c| c.as_materialized_series().clone()))?;

builder.append_opt_series(out.as_ref())?
}
builder.finish()
} else {
(0..len)
.map(|_| {
container.clear();
for iter in &mut iters {
match iter.next().unwrap() {
None => return Ok(None),
Some(s) => container.push(s.deep_clone().into()),
}
}
self.function
.call_udf(&mut container)
.map(|r| r.map(|c| c.as_materialized_series().clone()))
})
.collect::<PolarsResult<ListChunked>>()?
.with_name(field.name.clone())
};

drop(iters);

Expand Down Expand Up @@ -443,7 +441,7 @@ impl PhysicalExpr for ApplyExpr {
self.expr.to_field(input_schema, Context::Default)
}
#[cfg(feature = "parquet")]
fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> {
fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> {
let function = match &self.expr {
Expr::Function { function, .. } => function,
_ => return None,
Expand Down
7 changes: 0 additions & 7 deletions crates/polars-expr/src/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ impl AggState {
AggState::NotAggregated(s) => AggState::NotAggregated(func(s)?),
})
}

fn map<F>(&self, func: F) -> Self
where
F: FnOnce(&Series) -> Series,
{
self.try_map(|s| Ok(func(s))).unwrap()
}
}

// lazy update strategy
Expand Down
7 changes: 7 additions & 0 deletions py-polars/tests/unit/operations/test_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -1146,3 +1146,10 @@ def test_positional_by_with_list_or_tuple_17540() -> None:
pl.DataFrame({"a": [1, 2, 3]}).group_by(by=["a"])
with pytest.raises(TypeError, match="Hint: if you"):
pl.LazyFrame({"a": [1, 2, 3]}).group_by(by=["a"])


def test_group_by_agg_19173() -> None:
df = pl.DataFrame({"x": [1.0], "g": [0]})
out = df.head(0).group_by("g").agg((pl.col.x - pl.col.x.sum() * pl.col.x) ** 2)
assert out.to_dict(as_series=False) == {"g": [], "x": []}
assert out.schema == pl.Schema([("g", pl.Int64), ("x", pl.List(pl.Float64))])

0 comments on commit 0870a5d

Please sign in to comment.