Skip to content

Commit

Permalink
refactor(rust): defensively invalidate metadata
Browse files Browse the repository at this point in the history
With this PR, I change the way we clone `ChunkedArray`s. Now, we have to explicitly state after the clone that it is okay to copy over some metadata properties. This clears the way to properly handle other kinds metadata and I implemented such handling for `slice` and `rechunk`.
  • Loading branch information
coastalwhite committed May 30, 2024
1 parent 84ba2d0 commit f422695
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 125 deletions.
31 changes: 15 additions & 16 deletions crates/polars-core/src/chunked_array/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use arrow::compute::cast::CastOptions;

use crate::chunked_array::metadata::MetadataProperties;
#[cfg(feature = "timezones")]
use crate::chunked_array::temporal::validate_time_zone;
#[cfg(feature = "dtype-datetime")]
Expand Down Expand Up @@ -306,13 +307,13 @@ impl BinaryChunked {
.map(|arr| arr.to_utf8view_unchecked().boxed())
.collect();
let field = Arc::new(Field::new(self.name(), DataType::String));
StringChunked::from_chunks_and_metadata(
chunks,
field,
Arc::new(self.effective_metadata().cast()),
true,
true,
)

let mut ca = StringChunked::new_with_compute_len(field, chunks);

use MetadataProperties as P;
ca.copy_metadata_cast(self, P::SORTED | P::FAST_EXPLODE_LIST);

ca
}
}

Expand All @@ -323,15 +324,13 @@ impl StringChunked {
.map(|arr| arr.to_binview().boxed())
.collect();
let field = Arc::new(Field::new(self.name(), DataType::Binary));
unsafe {
BinaryChunked::from_chunks_and_metadata(
chunks,
field,
Arc::new(self.effective_metadata().cast()),
true,
true,
)
}

let mut ca = BinaryChunked::new_with_compute_len(field, chunks);

use MetadataProperties as P;
ca.copy_metadata_cast(self, P::SORTED | P::FAST_EXPLODE_LIST);

ca
}
}

Expand Down
27 changes: 0 additions & 27 deletions crates/polars-core/src/chunked_array/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,33 +214,6 @@ where
ChunkedArray::new_with_compute_len(field, chunks)
}

/// Create a new ChunkedArray from self, where the chunks are replaced.
///
/// # Safety
/// The caller must ensure the dtypes of the chunks are correct
pub(crate) unsafe fn from_chunks_and_metadata(
chunks: Vec<ArrayRef>,
field: Arc<Field>,
metadata: Arc<Metadata<T>>,
keep_sorted: bool,
keep_fast_explode: bool,
) -> Self {
let mut out = ChunkedArray::new_with_compute_len(field, chunks);

let mut md = metadata;
if !keep_sorted {
let inner = Arc::make_mut(&mut md);
inner.set_sorted_flag(IsSorted::Not);
}
if !keep_fast_explode {
let inner = Arc::make_mut(&mut md);
inner.set_fast_explode_list(false);
}
out.md = Some(md);

out
}

pub(crate) unsafe fn from_chunks_and_dtype_unchecked(
name: &str,
chunks: Vec<ArrayRef>,
Expand Down
29 changes: 22 additions & 7 deletions crates/polars-core/src/chunked_array/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@ use serde::{Deserialize, Serialize};
use super::PolarsDataType;
use crate::series::IsSorted;

bitflags! {
#[derive(Default, Debug, Clone, Copy, PartialEq)]
pub struct MetadataProperties: u32 {
const SORTED = 0x01;
const FAST_EXPLODE_LIST = 0x02;
const MIN_VALUE = 0x04;
const MAX_VALUE = 0x08;
const DISTINCT_COUNT = 0x10;
}
}

pub struct Metadata<T: PolarsDataType> {
flags: MetadataFlags,

Expand All @@ -18,7 +29,7 @@ pub struct Metadata<T: PolarsDataType> {
}

bitflags! {
#[derive(Default, Debug, Clone, Copy,PartialEq)]
#[derive(Default, Debug, Clone, Copy, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize), serde(transparent))]
pub struct MetadataFlags: u8 {
const SORTED_ASC = 0x01;
Expand Down Expand Up @@ -53,6 +64,10 @@ impl MetadataFlags {
IsSorted::Not
}
}

pub fn get_fast_explode_list(&self) -> bool {
self.contains(MetadataFlags::FAST_EXPLODE_LIST)
}
}

impl<T: PolarsDataType> Default for Metadata<T> {
Expand Down Expand Up @@ -150,14 +165,14 @@ impl<T: PolarsDataType> Metadata<T> {
self.set_sorted_descending(descending);
}

pub fn set_distinct_count(&mut self, distinct_count: IdxSize) {
self.distinct_count = Some(distinct_count);
pub fn set_distinct_count(&mut self, distinct_count: Option<IdxSize>) {
self.distinct_count = distinct_count;
}
pub fn set_min_value(&mut self, min_value: T::OwnedPhysical) {
self.min_value = Some(min_value);
pub fn set_min_value(&mut self, min_value: Option<T::OwnedPhysical>) {
self.min_value = min_value;
}
pub fn set_max_value(&mut self, max_value: T::OwnedPhysical) {
self.max_value = Some(max_value);
pub fn set_max_value(&mut self, max_value: Option<T::OwnedPhysical>) {
self.max_value = max_value;
}

pub fn set_flags(&mut self, flags: MetadataFlags) {
Expand Down
113 changes: 89 additions & 24 deletions crates/polars-core/src/chunked_array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use std::slice::Iter;
use arrow::legacy::kernels::concatenate::concatenate_owned_unchecked;
use arrow::legacy::prelude::*;

use self::metadata::{Metadata, MetadataFlags};
use self::metadata::{Metadata, MetadataFlags, MetadataProperties};
use crate::series::IsSorted;
use crate::utils::{first_non_null, last_non_null};

Expand Down Expand Up @@ -216,7 +216,15 @@ impl<T: PolarsDataType> ChunkedArray<T> {
}

pub fn unset_fast_explode_list(&mut self) {
Arc::make_mut(self.metadata_mut()).set_fast_explode_list(false)
self.set_fast_explode_list(false)
}

pub fn set_fast_explode_list(&mut self, value: bool) {
Arc::make_mut(self.metadata_mut()).set_fast_explode_list(value)
}

pub fn get_fast_explode_list(&self) -> bool {
self.get_flags().get_fast_explode_list()
}

pub fn get_flags(&self) -> MetadataFlags {
Expand Down Expand Up @@ -246,6 +254,73 @@ impl<T: PolarsDataType> ChunkedArray<T> {
out
}

/// Copies [`Metadata`] properties from `other` with different underlying [`PolarsDataType`] into
/// `self`.
///
/// This does not copy the properties with a different type between the [`Metadata`]s (e.g.
/// `min_value` and `max_value`) and will panic on debug builds if that is attempted.
#[inline(always)]
pub fn copy_metadata_cast<O: PolarsDataType>(
&mut self,
other: &ChunkedArray<O>,
properties: MetadataProperties,
) {
use MetadataProperties as P;

if properties.is_empty() {
return;
}

debug_assert!(!properties.contains(P::MIN_VALUE));
debug_assert!(!properties.contains(P::MAX_VALUE));

let md = Arc::make_mut(self.metadata_mut());
let other_md = other.effective_metadata();

if properties.contains(P::SORTED) {
md.set_sorted_flag(other_md.is_sorted());
}
if properties.contains(P::FAST_EXPLODE_LIST) {
md.set_fast_explode_list(other_md.get_fast_explode_list());
}
if properties.contains(P::DISTINCT_COUNT) {
md.set_distinct_count(other_md.get_distinct_count());
}
}

/// Copies [`Metadata`] properties from `other` into `self`.
#[inline(always)]
pub fn copy_metadata(&mut self, other: &Self, properties: MetadataProperties) {
use MetadataProperties as P;

if properties.is_empty() {
return;
}

if properties.is_all() {
self.md.clone_from(&other.md);
}

let md = Arc::make_mut(self.metadata_mut());
let other_md = other.effective_metadata();

if properties.contains(P::SORTED) {
md.set_sorted_flag(other_md.is_sorted());
}
if properties.contains(P::FAST_EXPLODE_LIST) {
md.set_fast_explode_list(other_md.get_fast_explode_list());
}
if properties.contains(P::MIN_VALUE) {
md.set_min_value(other_md.get_max_value().cloned());
}
if properties.contains(P::MAX_VALUE) {
md.set_max_value(other_md.get_min_value().cloned());
}
if properties.contains(P::DISTINCT_COUNT) {
md.set_distinct_count(other_md.get_distinct_count());
}
}

/// Get the index of the first non null value in this [`ChunkedArray`].
pub fn first_non_null(&self) -> Option<usize> {
if self.null_count() == self.len() {
Expand Down Expand Up @@ -328,15 +403,16 @@ impl<T: PolarsDataType> ChunkedArray<T> {

pub fn clear(&self) -> Self {
// SAFETY: we keep the correct dtype
unsafe {
self.copy_with_chunks(
vec![new_empty_array(
self.chunks.first().unwrap().data_type().clone(),
)],
true,
true,
)
}
let mut ca = unsafe {
self.copy_with_chunks(vec![new_empty_array(
self.chunks.first().unwrap().data_type().clone(),
)])
};

use MetadataProperties as P;
ca.copy_metadata(self, P::SORTED | P::FAST_EXPLODE_LIST);

ca
}

/// Unpack a [`Series`] to the same physical type.
Expand Down Expand Up @@ -410,19 +486,8 @@ impl<T: PolarsDataType> ChunkedArray<T> {
///
/// # Safety
/// The caller must ensure the dtypes of the chunks are correct
unsafe fn copy_with_chunks(
&self,
chunks: Vec<ArrayRef>,
keep_sorted: bool,
keep_fast_explode: bool,
) -> Self {
Self::from_chunks_and_metadata(
chunks,
self.field.clone(),
self.metadata_owned_arc(),
keep_sorted,
keep_fast_explode,
)
unsafe fn copy_with_chunks(&self, chunks: Vec<ArrayRef>) -> Self {
Self::new_with_compute_len(self.field.clone(), chunks)
}

/// Get data type of [`ChunkedArray`].
Expand Down
23 changes: 21 additions & 2 deletions crates/polars-core/src/chunked_array/ops/arity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use arrow::array::{Array, StaticArray};
use arrow::compute::utils::combine_validities_and;
use polars_error::PolarsResult;

use crate::chunked_array::metadata::MetadataProperties;
use crate::datatypes::{ArrayCollectIterExt, ArrayFromIter};
use crate::prelude::{ChunkedArray, PolarsDataType, Series};
use crate::utils::{align_chunks_binary, align_chunks_binary_owned, align_chunks_ternary};
Expand Down Expand Up @@ -491,7 +492,17 @@ where
.zip(rhs.downcast_iter())
.map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
.collect();
lhs.copy_with_chunks(chunks, keep_sorted, keep_fast_explode)

let mut ca = lhs.copy_with_chunks(chunks);

use MetadataProperties as P;

let mut properties = P::empty();
properties.set(P::SORTED, keep_sorted);
properties.set(P::FAST_EXPLODE_LIST, keep_fast_explode);
ca.copy_metadata(&lhs, properties);

ca
}

#[inline]
Expand Down Expand Up @@ -538,7 +549,15 @@ where
.zip(rhs.downcast_iter())
.map(|(lhs_arr, rhs_arr)| op(lhs_arr, rhs_arr))
.collect::<Result<Vec<_>, E>>()?;
Ok(lhs.copy_with_chunks(chunks, keep_sorted, keep_fast_explode))
let mut ca = lhs.copy_with_chunks(chunks);

use MetadataProperties as P;
let mut properties = P::empty();
properties.set(P::SORTED, keep_sorted);
properties.set(P::FAST_EXPLODE_LIST, keep_fast_explode);
ca.copy_metadata(&lhs, properties);

Ok(ca)
}

#[inline]
Expand Down
40 changes: 38 additions & 2 deletions crates/polars-core/src/chunked_array/ops/chunkops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use arrow::legacy::kernels::concatenate::concatenate_owned_unchecked;
use polars_error::constants::LENGTH_LIMIT_MSG;

use super::*;
use crate::chunked_array::metadata::MetadataProperties;
#[cfg(feature = "object")]
use crate::chunked_array::object::builder::ObjectChunkedBuilder;
use crate::utils::slice_offsets;
Expand Down Expand Up @@ -116,7 +117,20 @@ impl<T: PolarsDataType> ChunkedArray<T> {
self.clone()
} else {
let chunks = inner_rechunk(&self.chunks);
unsafe { self.copy_with_chunks(chunks, true, true) }

let mut ca = unsafe { self.copy_with_chunks(chunks) };

use MetadataProperties as P;
ca.copy_metadata(
self,
P::SORTED
| P::FAST_EXPLODE_LIST
| P::MIN_VALUE
| P::MAX_VALUE
| P::DISTINCT_COUNT,
);

ca
}
},
}
Expand All @@ -133,8 +147,30 @@ impl<T: PolarsDataType> ChunkedArray<T> {
// A normal slice, slice the buffers and thus keep the whole memory allocated.
let exec = || {
let (chunks, len) = slice(&self.chunks, offset, length, self.len());
let mut out = unsafe { self.copy_with_chunks(chunks, true, true) };
let mut out = unsafe { self.copy_with_chunks(chunks) };

use MetadataProperties as P;
let mut properties = P::SORTED | P::FAST_EXPLODE_LIST;

if length != 0 {
let is_ascending = self.is_sorted_ascending_flag();
let is_descending = self.is_sorted_descending_flag();

let (raw_offset, slice_len) = slice_offsets(offset, length, self.len());
let is_until_end = raw_offset + slice_len == self.len();

let copy_min_value =
(raw_offset == 0 && is_ascending) || (is_until_end && is_descending);
let copy_max_value =
(raw_offset == 0 && is_descending) || (is_until_end && is_ascending);

properties.set(P::MIN_VALUE, copy_min_value);
properties.set(P::MAX_VALUE, copy_max_value);
}

out.copy_metadata(self, properties);
out.length = len as IdxSize;

out
};

Expand Down
Loading

0 comments on commit f422695

Please sign in to comment.