Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] add list.value_counts() #2902

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions daft/daft/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,7 @@ def dt_truncate(expr: PyExpr, interval: str, relative_to: PyExpr) -> PyExpr: ...
# ---
def explode(expr: PyExpr) -> PyExpr: ...
def list_sort(expr: PyExpr, desc: PyExpr) -> PyExpr: ...
def list_value_counts(expr: PyExpr) -> PyExpr: ...
def list_join(expr: PyExpr, delimiter: PyExpr) -> PyExpr: ...
def list_count(expr: PyExpr, mode: CountMode) -> PyExpr: ...
def list_get(expr: PyExpr, idx: PyExpr, default: PyExpr) -> PyExpr: ...
Expand Down
61 changes: 46 additions & 15 deletions daft/expressions/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2922,6 +2922,37 @@ def join(self, delimiter: str | Expression) -> Expression:
delimiter_expr = Expression._to_expression(delimiter)
return Expression._from_pyexpr(native.list_join(self._expr, delimiter_expr._expr))

# todo: do we want type to be a Map expression? how should we do this?
andrewgazelka marked this conversation as resolved.
Show resolved Hide resolved
def value_counts(self) -> Expression:
"""Counts the occurrences of each unique value in the list.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give example usage

Returns:
Expression: A Map<X, UInt64> expression where the keys are unique elements from the
original list of type X, and the values are UInt64 counts representing
the number of times each element appears in the list.

Example:
>>> import daft
>>> df = daft.from_pydict({"letters": [["a", "b", "a"], ["b", "c", "b", "c"]]})
>>> df.with_column("value_counts", df["letters"].list.value_counts()).collect()
╭──────────────┬───────────────────╮
│ letters ┆ value_counts │
│ --- ┆ --- │
│ List[Utf8] ┆ Map[Utf8: UInt64] │
╞══════════════╪═══════════════════╡
│ [a, b, a] ┆ [{key: a, │
│ ┆ value: 2, │
│ ┆ }, {key: … │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ [b, c, b, c] ┆ [{key: b, │
│ ┆ value: 2, │
│ ┆ }, {key: … │
╰──────────────┴───────────────────╯
<BLANKLINE>
(Showing first 2 of 2 rows)
"""
return Expression._from_pyexpr(native.list_value_counts(self._expr))

def count(self, mode: CountMode = CountMode.Valid) -> Expression:
"""Counts the number of elements in each list

Expand Down Expand Up @@ -3069,21 +3100,21 @@ def get(self, key: Expression) -> Expression:
>>> df = daft.from_arrow(pa.table({"map_col": pa_array}))
>>> df = df.with_column("a", df["map_col"].map.get("a"))
>>> df.show()
╭──────────────────────────────────────┬───────╮
│ map_col ┆ a │
│ --- ┆ --- │
│ Map[Struct[key: Utf8, value: Int64]] ┆ Int64 │
╞══════════════════════════════════════╪═══════╡
│ [{key: a, ┆ 1 │
│ value: 1, ┆ │
│ }] ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ [] ┆ None │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ [{key: b, ┆ None │
│ value: 2, ┆ │
│ }] ┆ │
╰──────────────────────────────────────┴───────╯
╭──────────────────┬───────╮
│ map_col ┆ a │
│ --- ┆ --- │
│ Map[Utf8: Int64] ┆ Int64 │
╞══════════════════╪═══════╡
│ [{key: a, ┆ 1 │
│ value: 1, ┆ │
│ }] ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ [] ┆ None │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ [{key: b, ┆ None │
│ value: 2, ┆ │
│ }] ┆ │
╰──────────────────┴───────╯
<BLANKLINE>
(Showing first 3 of 3 rows)

Expand Down
1 change: 1 addition & 0 deletions src/arrow2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ streaming-iterator = {version = "0.1", optional = true}
# for division/remainder optimization at runtime
strength_reduce = {version = "0.2", optional = true}
thiserror = {workspace = true}
tracing = "0.1.40"
zstd = {version = "0.12", optional = true}

# parquet support
Expand Down
10 changes: 8 additions & 2 deletions src/arrow2/src/array/list/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,18 @@ impl<O: Offset> ListArray<O> {
if O::IS_LARGE {
match data_type.to_logical_type() {
DataType::LargeList(child) => Ok(child.as_ref()),
_ => Err(Error::oos("ListArray<i64> expects DataType::LargeList")),
got => {
let msg = format!("ListArray<i64> expects DataType::LargeList, but got {got:?}");
Err(Error::oos(msg))
},
}
} else {
match data_type.to_logical_type() {
DataType::List(child) => Ok(child.as_ref()),
_ => Err(Error::oos("ListArray<i32> expects DataType::List")),
got => {
let msg = format!("ListArray<i32> expects DataType::List, but got {got:?}");
Err(Error::oos(msg))
},
}
}
}
Expand Down
68 changes: 56 additions & 12 deletions src/arrow2/src/array/map/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use super::{new_empty_array, specification::try_check_offsets_bounds, Array, ListArray};
use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
error::Error,
offset::OffsetsBuffer,
};

use super::{new_empty_array, specification::try_check_offsets_bounds, Array};

mod ffi;
pub(super) mod fmt;
mod iterator;
#[allow(unused)]
pub use iterator::*;


/// An array representing a (key, value), both of arbitrary logical types.
#[derive(Clone)]
pub struct MapArray {
Expand Down Expand Up @@ -41,20 +41,27 @@ impl MapArray {
try_check_offsets_bounds(&offsets, field.len())?;

let inner_field = Self::try_get_field(&data_type)?;
if let DataType::Struct(inner) = inner_field.data_type() {
if inner.len() != 2 {
return Err(Error::InvalidArgumentError(
"MapArray's inner `Struct` must have 2 fields (keys and maps)".to_string(),
));
}
} else {

let inner_data_type = inner_field.data_type();
let DataType::Struct(inner) = inner_data_type else {
return Err(Error::InvalidArgumentError(
"MapArray expects `DataType::Struct` as its inner logical type".to_string(),
format!("MapArray expects `DataType::Struct` as its inner logical type, but found {inner_data_type:?}"),
));
};

let inner_len = inner.len();
if inner_len != 2 {
let msg = format!(
"MapArray's inner `Struct` must have 2 fields (keys and maps), but found {} fields",
inner_len
);
return Err(Error::InvalidArgumentError(msg));
}
if field.data_type() != inner_field.data_type() {

let field_data_type = field.data_type();
if field_data_type != inner_field.data_type() {
return Err(Error::InvalidArgumentError(
"MapArray expects `field.data_type` to match its inner DataType".to_string(),
format!("MapArray expects `field.data_type` to match its inner DataType, but found \n{field_data_type:?}\nvs\n\n\n{inner_field:?}"),
));
}

Expand Down Expand Up @@ -195,6 +202,43 @@ impl MapArray {
impl Array for MapArray {
impl_common_array!();

fn convert_logical_type(&self, target: DataType) -> Box<dyn Array> {
tracing::trace!("converting logical type to\n{target:#?}");
let outer_is_map = matches!(target, DataType::Map { .. });

if outer_is_map {
// we can do simple conversion
let mut new = self.to_boxed();
new.change_type(target);
return new;
}

let DataType::LargeList(target_inner) = &target else {
panic!("MapArray can only be converted to Map or LargeList");
};

let DataType::Map(current_inner, _) = self.data_type() else {
unreachable!("Somehow DataType is not Map for a MapArray");
};

let current_inner_physical = current_inner.data_type.to_physical_type();
let target_inner_physical = target_inner.data_type.to_physical_type();

if current_inner_physical != target_inner_physical {
panic!("inner types are not equal");
}

let mut field = self.field.clone();
field.change_type(target_inner.data_type.clone());

let offsets = self.offsets().clone();
let offsets = offsets.map(|offset| offset as i64);

let list = ListArray::new(target, offsets, field, self.validity.clone());

Box::new(list)
}

fn validity(&self) -> Option<&Bitmap> {
self.validity.as_ref()
}
Expand Down
99 changes: 95 additions & 4 deletions src/arrow2/src/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {
/// When the validity is [`None`], all slots are valid.
fn validity(&self) -> Option<&Bitmap>;

fn direct_children<'a>(&'a mut self) -> Box<dyn Iterator<Item=&'a mut dyn Array> + 'a> {
Box::new(core::iter::empty())
}

/// The number of null slots on this [`Array`].
/// # Implementation
/// This is `O(1)` since the number of null elements is pre-computed.
Expand Down Expand Up @@ -183,7 +187,7 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static {
/// ```
/// # Panics
/// Panics iff the `data_type`'s [`PhysicalType`] is not equal to array's `PhysicalType`.
fn to_type(&self, data_type: DataType) -> Box<dyn Array> {
fn convert_logical_type(&self, data_type: DataType) -> Box<dyn Array> {
let mut new = self.to_boxed();
new.change_type(data_type);
new
Expand Down Expand Up @@ -634,15 +638,26 @@ macro_rules! impl_common_array {
fn change_type(&mut self, data_type: DataType) {
if data_type.to_physical_type() != self.data_type().to_physical_type() {
panic!(
"Converting array with logical type {:?} to logical type {:?} failed, physical types do not match: {:?} -> {:?}",
"Converting array with logical type\n{:#?}\n\nto logical type\n{:#?}\nfailed, physical types do not match: {:?} -> {:?}",
self.data_type(),
data_type,
self.data_type().to_physical_type(),
data_type.to_physical_type(),
);
}
self.data_type = data_type;

self.data_type = data_type.clone();

let mut children = self.direct_children();

data_type.direct_children(|child| {
let Some(child_elem) = children.next() else {
return;
};
child_elem.change_type(child.clone());
})
}

};
}

Expand Down Expand Up @@ -737,7 +752,7 @@ pub(crate) use self::ffi::ToFfi;
/// This is similar to [`Extend`], but accepted the creation to error.
pub trait TryExtend<A> {
/// Fallible version of [`Extend::extend`].
fn try_extend<I: IntoIterator<Item = A>>(&mut self, iter: I) -> Result<()>;
fn try_extend<I: IntoIterator<Item=A>>(&mut self, iter: I) -> Result<()>;
}

/// A trait describing the ability of a struct to receive new items.
Expand Down Expand Up @@ -774,3 +789,79 @@ pub unsafe trait GenericBinaryArray<O: crate::offset::Offset>: Array {
/// The offsets of the array
fn offsets(&self) -> &[O];
}


#[cfg(test)]
mod tests {
use super::*;
use crate::datatypes::{DataType, Field, TimeUnit, IntervalUnit};
use crate::array::{Int32Array, Int64Array, Float32Array, Utf8Array, BooleanArray, ListArray, StructArray, UnionArray, MapArray};

#[test]
fn test_int32_to_date32() {
let array = Int32Array::from_slice([1, 2, 3]);
let result = array.convert_logical_type(DataType::Date32);
assert_eq!(result.data_type(), &DataType::Date32);
}

#[test]
fn test_int64_to_timestamp() {
let array = Int64Array::from_slice([1000, 2000, 3000]);
let result = array.convert_logical_type(DataType::Timestamp(TimeUnit::Millisecond, None));
assert_eq!(result.data_type(), &DataType::Timestamp(TimeUnit::Millisecond, None));
}

#[test]
fn test_boolean_to_boolean() {
let array = BooleanArray::from_slice([true, false, true]);
let result = array.convert_logical_type(DataType::Boolean);
assert_eq!(result.data_type(), &DataType::Boolean);
}

#[test]
fn test_list_to_list() {
let values = Int32Array::from_slice([1, 2, 3, 4, 5]);
let offsets = vec![0, 2, 5];
let list_array = ListArray::try_new(
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
offsets.try_into().unwrap(),
Box::new(values),
None,
).unwrap();
let result = list_array.convert_logical_type(DataType::List(Box::new(Field::new("item", DataType::Int32, true))));
assert_eq!(result.data_type(), &DataType::List(Box::new(Field::new("item", DataType::Int32, true))));
}

#[test]
fn test_struct_to_struct() {
let boolean = BooleanArray::from_slice([true, false, true]);
let int = Int32Array::from_slice([1, 2, 3]);
let struct_array = StructArray::try_new(
DataType::Struct(vec![
Field::new("b", DataType::Boolean, true),
Field::new("i", DataType::Int32, true),
]),
vec![
Box::new(boolean) as Box<dyn Array>,
Box::new(int) as Box<dyn Array>,
],
None,
).unwrap();
let result = struct_array.convert_logical_type(DataType::Struct(vec![
Field::new("b", DataType::Boolean, true),
Field::new("i", DataType::Int32, true),
]));
assert_eq!(result.data_type(), &DataType::Struct(vec![
Field::new("b", DataType::Boolean, true),
Field::new("i", DataType::Int32, true),
]));
}

#[test]
#[should_panic]
fn test_invalid_conversion() {
let array = Int32Array::from_slice([1, 2, 3]);
array.convert_logical_type(DataType::Utf8);
}
}

9 changes: 9 additions & 0 deletions src/arrow2/src/array/struct_/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::ops::DerefMut;
use crate::{
bitmap::Bitmap,
datatypes::{DataType, Field},
Expand Down Expand Up @@ -246,6 +247,14 @@ impl StructArray {
impl Array for StructArray {
impl_common_array!();

fn direct_children<'a>(&'a mut self) -> Box<dyn Iterator<Item=&'a mut dyn Array> + 'a> {
let iter = self.values
.iter_mut()
.map(|x| x.deref_mut());

Box::new(iter)
}

fn validity(&self) -> Option<&Bitmap> {
self.validity.as_ref()
}
Expand Down
Loading
Loading