Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): defensively invalidate metadata and start on copying of min_value, max_value and distinct_count #16593

Merged
merged 1 commit into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions crates/polars-core/src/chunked_array/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use arrow::compute::cast::CastOptions;

use crate::chunked_array::metadata::MetadataProperties;
#[cfg(feature = "timezones")]
use crate::chunked_array::temporal::validate_time_zone;
#[cfg(feature = "dtype-datetime")]
Expand Down Expand Up @@ -306,13 +307,13 @@ impl BinaryChunked {
.map(|arr| arr.to_utf8view_unchecked().boxed())
.collect();
let field = Arc::new(Field::new(self.name(), DataType::String));
StringChunked::from_chunks_and_metadata(
chunks,
field,
Arc::new(self.effective_metadata().cast()),
true,
true,
)

let mut ca = StringChunked::new_with_compute_len(field, chunks);

use MetadataProperties as P;
ca.copy_metadata_cast(self, P::SORTED | P::FAST_EXPLODE_LIST);

ca
}
}

Expand All @@ -323,15 +324,13 @@ impl StringChunked {
.map(|arr| arr.to_binview().boxed())
.collect();
let field = Arc::new(Field::new(self.name(), DataType::Binary));
unsafe {
BinaryChunked::from_chunks_and_metadata(
chunks,
field,
Arc::new(self.effective_metadata().cast()),
true,
true,
)
}

let mut ca = BinaryChunked::new_with_compute_len(field, chunks);

use MetadataProperties as P;
ca.copy_metadata_cast(self, P::SORTED | P::FAST_EXPLODE_LIST);

ca
}
}

Expand Down
27 changes: 0 additions & 27 deletions crates/polars-core/src/chunked_array/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,33 +214,6 @@ where
ChunkedArray::new_with_compute_len(field, chunks)
}

/// Create a new ChunkedArray from self, where the chunks are replaced.
///
/// # Safety
/// The caller must ensure the dtypes of the chunks are correct
pub(crate) unsafe fn from_chunks_and_metadata(
chunks: Vec<ArrayRef>,
field: Arc<Field>,
metadata: Arc<Metadata<T>>,
keep_sorted: bool,
keep_fast_explode: bool,
) -> Self {
let mut out = ChunkedArray::new_with_compute_len(field, chunks);

let mut md = metadata;
if !keep_sorted {
let inner = Arc::make_mut(&mut md);
inner.set_sorted_flag(IsSorted::Not);
}
if !keep_fast_explode {
let inner = Arc::make_mut(&mut md);
inner.set_fast_explode_list(false);
}
out.md = Some(md);

out
}

pub(crate) unsafe fn from_chunks_and_dtype_unchecked(
name: &str,
chunks: Vec<ArrayRef>,
Expand Down
39 changes: 22 additions & 17 deletions crates/polars-core/src/chunked_array/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ use serde::{Deserialize, Serialize};
use super::PolarsDataType;
use crate::series::IsSorted;

bitflags! {
#[derive(Default, Debug, Clone, Copy, PartialEq)]
pub struct MetadataProperties: u32 {
const SORTED = 0x01;
const FAST_EXPLODE_LIST = 0x02;
const MIN_VALUE = 0x04;
const MAX_VALUE = 0x08;
const DISTINCT_COUNT = 0x10;
}
}

pub struct Metadata<T: PolarsDataType> {
flags: MetadataFlags,

Expand All @@ -18,7 +29,7 @@ pub struct Metadata<T: PolarsDataType> {
}

bitflags! {
#[derive(Default, Debug, Clone, Copy,PartialEq)]
#[derive(Default, Debug, Clone, Copy, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(transparent))]
pub struct MetadataFlags: u8 {
const SORTED_ASC = 0x01;
Expand Down Expand Up @@ -53,6 +64,10 @@ impl MetadataFlags {
IsSorted::Not
}
}

pub fn get_fast_explode_list(&self) -> bool {
self.contains(MetadataFlags::FAST_EXPLODE_LIST)
}
}

impl<T: PolarsDataType> Default for Metadata<T> {
Expand Down Expand Up @@ -93,16 +108,6 @@ impl<T: PolarsDataType> Metadata<T> {
distinct_count: None,
};

pub fn cast<R: PolarsDataType>(&self) -> Metadata<R> {
// @TODO: It might be possible to cast the min_value, max_value and distinct_values
Metadata {
flags: self.flags,
min_value: None,
max_value: None,
distinct_count: None,
}
}

pub fn is_sorted_ascending(&self) -> bool {
self.flags.contains(MetadataFlags::SORTED_ASC)
}
Expand Down Expand Up @@ -150,14 +155,14 @@ impl<T: PolarsDataType> Metadata<T> {
self.set_sorted_descending(descending);
}

pub fn set_distinct_count(&mut self, distinct_count: IdxSize) {
self.distinct_count = Some(distinct_count);
pub fn set_distinct_count(&mut self, distinct_count: Option<IdxSize>) {
self.distinct_count = distinct_count;
}
pub fn set_min_value(&mut self, min_value: T::OwnedPhysical) {
self.min_value = Some(min_value);
pub fn set_min_value(&mut self, min_value: Option<T::OwnedPhysical>) {
self.min_value = min_value;
}
pub fn set_max_value(&mut self, max_value: T::OwnedPhysical) {
self.max_value = Some(max_value);
pub fn set_max_value(&mut self, max_value: Option<T::OwnedPhysical>) {
self.max_value = max_value;
}

pub fn set_flags(&mut self, flags: MetadataFlags) {
Expand Down
168 changes: 143 additions & 25 deletions crates/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::slice::Iter;
use arrow::legacy::kernels::concatenate::concatenate_owned_unchecked;
use arrow::legacy::prelude::*;

use self::metadata::{Metadata, MetadataFlags};
use self::metadata::{Metadata, MetadataFlags, MetadataProperties};
use crate::series::IsSorted;
use crate::utils::{first_non_null, last_non_null};

Expand Down Expand Up @@ -176,6 +176,7 @@ impl<T: PolarsDataType> ChunkedArray<T> {
/// Get a reference to the used [`Metadata`]
///
/// This results a reference to an empty [`Metadata`] if its unset for this [`ChunkedArray`].
#[inline(always)]
pub fn effective_metadata(&self) -> &Metadata<T> {
self.md.as_ref().map_or(&Metadata::DEFAULT, AsRef::as_ref)
}
Expand Down Expand Up @@ -216,7 +217,15 @@ impl<T: PolarsDataType> ChunkedArray<T> {
}

pub fn unset_fast_explode_list(&mut self) {
Arc::make_mut(self.metadata_mut()).set_fast_explode_list(false)
self.set_fast_explode_list(false)
}

pub fn set_fast_explode_list(&mut self, value: bool) {
Arc::make_mut(self.metadata_mut()).set_fast_explode_list(value)
}

pub fn get_fast_explode_list(&self) -> bool {
self.get_flags().get_fast_explode_list()
}

pub fn get_flags(&self) -> MetadataFlags {
Expand All @@ -231,7 +240,7 @@ impl<T: PolarsDataType> ChunkedArray<T> {
}

pub fn is_sorted_flag(&self) -> IsSorted {
self.effective_metadata().is_sorted()
self.md.as_ref().map_or(IsSorted::Not, |md| md.is_sorted())
}

/// Set the 'sorted' bit meta info.
Expand All @@ -246,6 +255,125 @@ impl<T: PolarsDataType> ChunkedArray<T> {
out
}

pub fn get_min_value(&self) -> Option<&T::OwnedPhysical> {
self.md.as_ref()?.get_min_value()
}

pub fn get_max_value(&self) -> Option<&T::OwnedPhysical> {
self.md.as_ref()?.get_max_value()
}

pub fn get_distinct_count(&self) -> Option<IdxSize> {
self.md.as_ref()?.get_distinct_count()
}

/// Copies [`Metadata`] properties specified by `props` from `other` with different underlying [`PolarsDataType`] into
/// `self`.
///
/// This does not copy the properties with a different type between the [`Metadata`]s (e.g.
/// `min_value` and `max_value`) and will panic on debug builds if that is attempted.
#[inline(always)]
pub fn copy_metadata_cast<O: PolarsDataType>(
&mut self,
other: &ChunkedArray<O>,
props: MetadataProperties,
) {
use MetadataProperties as P;

// If you add a property, add it here and below to ensure that metadata is copied
// properly.
debug_assert!(
{
props
- (P::SORTED
| P::FAST_EXPLODE_LIST
| P::MIN_VALUE
| P::MAX_VALUE
| P::DISTINCT_COUNT)
}
.is_empty(),
"A MetadataProperty was not added to the copy_metadata_cast check"
);

// We add a fast path here for if both metadatas are empty, as this is quite a common case.
if props.is_empty() || (self.md.is_none() && other.md.is_none()) {
return;
}

debug_assert!(!props.contains(P::MIN_VALUE));
debug_assert!(!props.contains(P::MAX_VALUE));

let md = Arc::make_mut(self.metadata_mut());
let other_md = other.effective_metadata();

if props.contains(P::SORTED) {
md.set_sorted_flag(other_md.is_sorted());
}
if props.contains(P::FAST_EXPLODE_LIST) {
md.set_fast_explode_list(other_md.get_fast_explode_list());
}
if props.contains(P::DISTINCT_COUNT) {
md.set_distinct_count(other_md.get_distinct_count());
}
}

/// Copies [`Metadata`] properties specified by `props` from `other` into `self`.
#[inline(always)]
pub fn copy_metadata(&mut self, other: &Self, props: MetadataProperties) {
use MetadataProperties as P;

// If you add a property add it here and below to ensure that metadata is copied properly.
debug_assert!(
{
props
- (P::SORTED
| P::FAST_EXPLODE_LIST
| P::MIN_VALUE
| P::MAX_VALUE
| P::DISTINCT_COUNT)
}
.is_empty(),
"A MetadataProperty was not added to the copy_metadata check"
);

// We add a fast path here for if both metadatas are empty, as this is quite a common case.
if props.is_empty() || (self.md.is_none() && other.md.is_none()) {
return;
}

// This checks whether we are okay to just clone the Arc.
if props.is_all()
|| ((props.contains(P::SORTED) || self.is_sorted_flag() == other.is_sorted_flag())
&& (props.contains(P::FAST_EXPLODE_LIST)
|| self.get_fast_explode_list() == other.get_fast_explode_list())
&& (props.contains(P::MIN_VALUE) || self.get_min_value() == other.get_min_value())
&& (props.contains(P::MAX_VALUE) || self.get_max_value() == other.get_max_value())
&& (props.contains(P::DISTINCT_COUNT)
|| self.get_distinct_count() == other.get_distinct_count()))
{
self.md.clone_from(&other.md)
}

let md = Arc::make_mut(self.metadata_mut());
let other_md = other.effective_metadata();

if props.contains(P::SORTED) {
md.set_sorted_flag(other_md.is_sorted());
}
if props.contains(P::FAST_EXPLODE_LIST) {
md.set_fast_explode_list(other_md.get_fast_explode_list());
}
if props.contains(P::MIN_VALUE) {
md.set_min_value(other_md.get_max_value().cloned());
}
if props.contains(P::MAX_VALUE) {
md.set_max_value(other_md.get_min_value().cloned());
}
if props.contains(P::DISTINCT_COUNT) {
md.set_distinct_count(other_md.get_distinct_count());
}
}

/// Get the index of the first non null value in this [`ChunkedArray`].
pub fn first_non_null(&self) -> Option<usize> {
if self.null_count() == self.len() {
Expand Down Expand Up @@ -328,15 +456,16 @@ impl<T: PolarsDataType> ChunkedArray<T> {

pub fn clear(&self) -> Self {
// SAFETY: we keep the correct dtype
unsafe {
self.copy_with_chunks(
vec![new_empty_array(
self.chunks.first().unwrap().data_type().clone(),
)],
true,
true,
)
}
let mut ca = unsafe {
self.copy_with_chunks(vec![new_empty_array(
self.chunks.first().unwrap().data_type().clone(),
)])
};

use MetadataProperties as P;
ca.copy_metadata(self, P::SORTED | P::FAST_EXPLODE_LIST);

ca
}

/// Unpack a [`Series`] to the same physical type.
Expand Down Expand Up @@ -410,19 +539,8 @@ impl<T: PolarsDataType> ChunkedArray<T> {
///
/// # Safety
/// The caller must ensure the dtypes of the chunks are correct
unsafe fn copy_with_chunks(
&self,
chunks: Vec<ArrayRef>,
keep_sorted: bool,
keep_fast_explode: bool,
) -> Self {
Self::from_chunks_and_metadata(
chunks,
self.field.clone(),
self.metadata_owned_arc(),
keep_sorted,
keep_fast_explode,
)
unsafe fn copy_with_chunks(&self, chunks: Vec<ArrayRef>) -> Self {
Self::new_with_compute_len(self.field.clone(), chunks)
}

/// Get data type of [`ChunkedArray`].
Expand Down
Loading