Skip to content

Commit

Permalink
feat(rust,py): add additional control to write_parquet::statistics
Browse files Browse the repository at this point in the history
…parameter (#16575)
  • Loading branch information
coastalwhite authored Jun 3, 2024
1 parent c6d8136 commit 9223b10
Show file tree
Hide file tree
Showing 34 changed files with 522 additions and 167 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/polars-compute/src/distinct_count.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use arrow::array::{Array, BooleanArray};

/// Kernel to calculate the number of unique non-null elements
pub trait DistinctCountKernel {
/// Calculate the number of unique non-null elements in [`Self`]
fn distinct_count(&self) -> usize;
}

impl DistinctCountKernel for BooleanArray {
fn distinct_count(&self) -> usize {
if self.len() - self.null_count() == 0 {
return 0;
}

let unset_bits = self.values().unset_bits();
2 - usize::from(unset_bits == 0 || unset_bits == self.values().len())
}
}
1 change: 1 addition & 0 deletions crates/polars-compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use arrow::types::NativeType;

pub mod arithmetic;
pub mod comparisons;
pub mod distinct_count;
pub mod filter;
pub mod float_sum;
pub mod if_then_else;
Expand Down
34 changes: 33 additions & 1 deletion crates/polars-compute/src/min_max/scalar.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use arrow::array::{Array, BinaryViewArray, PrimitiveArray, Utf8ViewArray};
use arrow::array::{Array, BinaryViewArray, BooleanArray, PrimitiveArray, Utf8ViewArray};
use arrow::types::NativeType;
use polars_utils::min_max::MinMax;

Expand Down Expand Up @@ -56,6 +56,38 @@ impl<T: NativeType + MinMax + super::NotSimdPrimitive> MinMaxKernel for [T] {
}
}

impl MinMaxKernel for BooleanArray {
type Scalar<'a> = bool;

fn min_ignore_nan_kernel(&self) -> Option<Self::Scalar<'_>> {
if self.len() - self.null_count() == 0 {
return None;
}

let unset_bits = self.values().unset_bits();
Some(unset_bits == 0)
}

fn max_ignore_nan_kernel(&self) -> Option<Self::Scalar<'_>> {
if self.len() - self.null_count() == 0 {
return None;
}

let set_bits = self.values().set_bits();
Some(set_bits > 0)
}

#[inline(always)]
fn min_propagate_nan_kernel(&self) -> Option<Self::Scalar<'_>> {
self.min_ignore_nan_kernel()
}

#[inline(always)]
fn max_propagate_nan_kernel(&self) -> Option<Self::Scalar<'_>> {
self.max_ignore_nan_kernel()
}
}

impl MinMaxKernel for BinaryViewArray {
type Scalar<'a> = &'a [u8];

Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/chunked_array/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub struct Metadata<T: PolarsDataType> {
min_value: Option<T::OwnedPhysical>,
max_value: Option<T::OwnedPhysical>,

/// Number of unique non-null values
distinct_count: Option<IdxSize>,
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/series/comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ macro_rules! impl_compare {
_ => unimplemented!(),
};
out.rename(lhs.name());
Ok(out) as PolarsResult<BooleanChunked>
PolarsResult::Ok(out)
}};
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ json = [
"dtype-struct",
"csv",
]
serde = ["dep:serde", "polars-core/serde-lazy"]
serde = ["dep:serde", "polars-core/serde-lazy", "polars-parquet/serde"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrows streaming ipc file parsing
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ mod writer;

pub use batched_writer::BatchedWriter;
pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel};
pub use polars_parquet::write::RowGroupIterColumns;
pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions};
pub use writer::ParquetWriter;
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use polars_error::PolarsResult;
use polars_parquet::write::{
BrotliLevel as BrotliLevelParquet, CompressionOptions, GzipLevel as GzipLevelParquet,
ZstdLevel as ZstdLevelParquet,
StatisticsOptions, ZstdLevel as ZstdLevelParquet,
};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand All @@ -12,7 +12,7 @@ pub struct ParquetWriteOptions {
/// Data page compression
pub compression: ParquetCompression,
/// Compute and write column statistics.
pub statistics: bool,
pub statistics: StatisticsOptions,
/// If `None` will be all written to a single row group.
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
Expand Down
11 changes: 6 additions & 5 deletions crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::sync::Mutex;
use arrow::datatypes::PhysicalType;
use polars_core::prelude::*;
use polars_parquet::write::{
to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, Version, WriteOptions,
to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, StatisticsOptions,
Version, WriteOptions,
};

use super::batched_writer::BatchedWriter;
Expand All @@ -18,7 +19,7 @@ pub struct ParquetWriter<W> {
/// Data page compression
compression: CompressionOptions,
/// Compute and write column statistics.
statistics: bool,
statistics: StatisticsOptions,
/// if `None` will be 512^2 rows
row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
Expand All @@ -39,7 +40,7 @@ where
ParquetWriter {
writer,
compression: ParquetCompression::default().into(),
statistics: true,
statistics: StatisticsOptions::default(),
row_group_size: None,
data_page_size: None,
parallel: true,
Expand All @@ -56,7 +57,7 @@ where
}

/// Compute and write statistic
pub fn with_statistics(mut self, statistics: bool) -> Self {
pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {
self.statistics = statistics;
self
}
Expand Down Expand Up @@ -100,7 +101,7 @@ where

fn materialize_options(&self) -> WriteOptions {
WriteOptions {
write_statistics: self.statistics,
statistics: self.statistics,
compression: self.compression,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn init_files() {
#[cfg(feature = "parquet")]
{
ParquetWriter::new(f)
.with_statistics(true)
.with_statistics(StatisticsOptions::full())
.finish(&mut df)
.unwrap();
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ethnum = { workspace = true }
fallible-streaming-iterator = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
num-traits = { workspace = true }
polars-compute = { workspace = true }
polars-error = { workspace = true }
polars-utils = { workspace = true }
simdutf8 = { workspace = true }
Expand Down
39 changes: 25 additions & 14 deletions crates/polars-parquet/src/arrow/write/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::parquet::encoding::{delta_bitpacked, Encoding};
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};
use crate::write::utils::invalid_encoding;
use crate::write::Page;
use crate::write::{Page, StatisticsOptions};

pub(crate) fn encode_non_null_values<'a, I: Iterator<Item = &'a [u8]>>(
iter: I,
Expand Down Expand Up @@ -65,8 +65,8 @@ pub fn array_to_page<O: Offset>(
_ => return Err(invalid_encoding(encoding, array.data_type())),
}

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand All @@ -89,21 +89,32 @@ pub fn array_to_page<O: Offset>(
pub(crate) fn build_statistics<O: Offset>(
array: &BinaryArray<O>,
primitive_type: PrimitiveType,
options: &StatisticsOptions,
) -> ParquetStatistics {
BinaryStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
null_count: options.null_count.then_some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
min_value: array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
max_value: options
.max_value
.then(|| {
array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
min_value: options
.min_value
.then(|| {
array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
}
.serialize()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ where

encode_plain(array, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand Down
39 changes: 25 additions & 14 deletions crates/polars-parquet/src/arrow/write/binview/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};
use crate::read::schema::is_nullable;
use crate::write::binary::{encode_non_null_values, ord_binary};
use crate::write::utils::invalid_encoding;
use crate::write::{utils, Encoding, Page, WriteOptions};
use crate::write::{utils, Encoding, Page, StatisticsOptions, WriteOptions};

pub(crate) fn encode_plain(array: &BinaryViewArray, buffer: &mut Vec<u8>) {
let capacity =
Expand Down Expand Up @@ -56,8 +56,8 @@ pub fn array_to_page(
_ => return Err(invalid_encoding(encoding, array.data_type())),
}

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand All @@ -81,21 +81,32 @@ pub fn array_to_page(
pub(crate) fn build_statistics(
array: &BinaryViewArray,
primitive_type: PrimitiveType,
options: &StatisticsOptions,
) -> ParquetStatistics {
BinaryStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
null_count: options.null_count.then_some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
min_value: array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
max_value: options
.max_value
.then(|| {
array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
min_value: options
.min_value
.then(|| {
array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
}
.serialize()
}
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/binview/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub fn array_to_page(

encode_plain(array, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand Down
30 changes: 23 additions & 7 deletions crates/polars-parquet/src/arrow/write/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::parquet::encoding::Encoding;
use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::{BooleanStatistics, ParquetStatistics};
use crate::write::StatisticsOptions;

fn encode(iterator: impl Iterator<Item = bool>, buffer: &mut Vec<u8>) -> PolarsResult<()> {
// encode values using bitpacking
Expand Down Expand Up @@ -59,8 +60,8 @@ pub fn array_to_page(

encode_plain(array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Some(build_statistics(array))
let statistics = if options.has_statistics() {
Some(build_statistics(array, &options.statistics))
} else {
None
};
Expand All @@ -79,12 +80,27 @@ pub fn array_to_page(
)
}

pub(super) fn build_statistics(array: &BooleanArray) -> ParquetStatistics {
pub(super) fn build_statistics(
array: &BooleanArray,
options: &StatisticsOptions,
) -> ParquetStatistics {
use polars_compute::distinct_count::DistinctCountKernel;
use polars_compute::min_max::MinMaxKernel;

BooleanStatistics {
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array.iter().flatten().max(),
min_value: array.iter().flatten().min(),
null_count: options.null_count.then(|| array.null_count() as i64),
distinct_count: options
.distinct_count
.then(|| array.distinct_count().try_into().ok())
.flatten(),
max_value: options
.max_value
.then(|| array.max_propagate_nan_kernel())
.flatten(),
min_value: options
.min_value
.then(|| array.min_propagate_nan_kernel())
.flatten(),
}
.serialize()
}
Loading

0 comments on commit 9223b10

Please sign in to comment.