diff --git a/crates/polars-arrow/src/array/boolean/mod.rs b/crates/polars-arrow/src/array/boolean/mod.rs index 0737dcfece55d..fc59d30c9d837 100644 --- a/crates/polars-arrow/src/array/boolean/mod.rs +++ b/crates/polars-arrow/src/array/boolean/mod.rs @@ -375,6 +375,59 @@ impl BooleanArray { validity, } } + + /// Calculate the number of unique non-null elements in [`Self`] + pub fn distinct_count(&self) -> usize { + if self.null_count() == 0 { + let mut seen = 0b00u8; + + let mut fast_iter = self.values.fast_iter_u64(); + + for value in &mut fast_iter { + let has_one = value.count_ones() != 0; + let has_zero = value.count_zeros() != 0; + + seen |= u8::from(has_one) << 1; + seen |= u8::from(has_zero); + + if seen == 0b11 { + return 2; + } + } + + let ([rem_lhs, rem_rhs], num_bits) = fast_iter.remainder(); + let num_bits = num_bits as u32; + + let (has_one, has_zero) = if num_bits >= 64 { + ( + rem_lhs.count_ones() != 0 && rem_rhs.count_ones() != 0, + rem_rhs.count_zeros() != 0 && (rem_rhs.count_zeros() - (num_bits - 64)) != 0, + ) + } else { + ( + rem_lhs.count_ones() != 0, + (rem_rhs.count_zeros() - num_bits) != 0, + ) + }; + + seen |= u8::from(has_one) << 1; + seen |= u8::from(has_zero); + + seen.count_ones() as usize + } else { + let mut seen = 0b00u8; + + for value in self.non_null_values_iter() { + seen |= 1 << u64::from(value); + + if seen == 0b111 { + return 3; + } + } + + seen.count_ones() as usize + } + } } impl Array for BooleanArray { diff --git a/crates/polars-core/src/chunked_array/metadata.rs b/crates/polars-core/src/chunked_array/metadata.rs index adae50d3ffabc..7b506c801a472 100644 --- a/crates/polars-core/src/chunked_array/metadata.rs +++ b/crates/polars-core/src/chunked_array/metadata.rs @@ -14,6 +14,7 @@ pub struct Metadata { min_value: Option, max_value: Option, + /// Number of unique non-null values distinct_count: Option, } diff --git a/crates/polars-core/src/series/comparison.rs b/crates/polars-core/src/series/comparison.rs index 364e865869389..7b49da5b48dd4 100644 --- a/crates/polars-core/src/series/comparison.rs +++ b/crates/polars-core/src/series/comparison.rs @@ -79,7 +79,7 @@ macro_rules! impl_compare { _ => unimplemented!(), }; out.rename(lhs.name()); - Ok(out) as PolarsResult + PolarsResult::Ok(out) }}; } diff --git a/crates/polars-io/Cargo.toml b/crates/polars-io/Cargo.toml index 70784b8485b67..ca188cecad1bb 100644 --- a/crates/polars-io/Cargo.toml +++ b/crates/polars-io/Cargo.toml @@ -64,7 +64,7 @@ json = [ "dtype-struct", "csv", ] -serde = ["dep:serde", "polars-core/serde-lazy"] +serde = ["dep:serde", "polars-core/serde-lazy", "polars-parquet/serde"] # support for arrows ipc file parsing ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"] # support for arrows streaming ipc file parsing diff --git a/crates/polars-io/src/parquet/write/mod.rs b/crates/polars-io/src/parquet/write/mod.rs index 5c58f749b512f..705cf2a96d6a3 100644 --- a/crates/polars-io/src/parquet/write/mod.rs +++ b/crates/polars-io/src/parquet/write/mod.rs @@ -6,5 +6,5 @@ mod writer; pub use batched_writer::BatchedWriter; pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel}; -pub use polars_parquet::write::RowGroupIterColumns; +pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions}; pub use writer::ParquetWriter; diff --git a/crates/polars-io/src/parquet/write/options.rs b/crates/polars-io/src/parquet/write/options.rs index ba5eade46a02b..d65a325522337 100644 --- a/crates/polars-io/src/parquet/write/options.rs +++ b/crates/polars-io/src/parquet/write/options.rs @@ -1,7 +1,7 @@ use polars_error::PolarsResult; use polars_parquet::write::{ BrotliLevel as BrotliLevelParquet, CompressionOptions, GzipLevel as GzipLevelParquet, - ZstdLevel as ZstdLevelParquet, + StatisticsOptions, ZstdLevel as ZstdLevelParquet, }; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -12,7 +12,7 @@ pub struct ParquetWriteOptions { /// Data page compression pub compression: ParquetCompression, /// Compute and write column statistics. - pub statistics: bool, + pub statistics: StatisticsOptions, /// If `None` will be all written to a single row group. pub row_group_size: Option, /// if `None` will be 1024^2 bytes diff --git a/crates/polars-io/src/parquet/write/writer.rs b/crates/polars-io/src/parquet/write/writer.rs index 620ac11c33512..4a6688c07fcc2 100644 --- a/crates/polars-io/src/parquet/write/writer.rs +++ b/crates/polars-io/src/parquet/write/writer.rs @@ -4,7 +4,8 @@ use std::sync::Mutex; use arrow::datatypes::PhysicalType; use polars_core::prelude::*; use polars_parquet::write::{ - to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, Version, WriteOptions, + to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, StatisticsOptions, + Version, WriteOptions, }; use super::batched_writer::BatchedWriter; @@ -18,7 +19,7 @@ pub struct ParquetWriter { /// Data page compression compression: CompressionOptions, /// Compute and write column statistics. - statistics: bool, + statistics: StatisticsOptions, /// if `None` will be 512^2 rows row_group_size: Option, /// if `None` will be 1024^2 bytes @@ -39,7 +40,7 @@ where ParquetWriter { writer, compression: ParquetCompression::default().into(), - statistics: true, + statistics: StatisticsOptions::default(), row_group_size: None, data_page_size: None, parallel: true, @@ -56,7 +57,7 @@ where } /// Compute and write statistic - pub fn with_statistics(mut self, statistics: bool) -> Self { + pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self { self.statistics = statistics; self } @@ -100,7 +101,7 @@ where fn materialize_options(&self) -> WriteOptions { WriteOptions { - write_statistics: self.statistics, + statistics: self.statistics, compression: self.compression, version: Version::V1, data_pagesize_limit: self.data_page_size, diff --git a/crates/polars-lazy/src/tests/mod.rs b/crates/polars-lazy/src/tests/mod.rs index fb7c04050cbb7..8b1a51212d189 100644 --- a/crates/polars-lazy/src/tests/mod.rs +++ b/crates/polars-lazy/src/tests/mod.rs @@ -94,7 +94,7 @@ fn init_files() { #[cfg(feature = "parquet")] { ParquetWriter::new(f) - .with_statistics(true) + .with_statistics(StatisticsOptions::full()) .finish(&mut df) .unwrap(); } diff --git a/crates/polars-parquet/src/arrow/write/binary/basic.rs b/crates/polars-parquet/src/arrow/write/binary/basic.rs index 8fdb053fe768e..895c1c3a762e3 100644 --- a/crates/polars-parquet/src/arrow/write/binary/basic.rs +++ b/crates/polars-parquet/src/arrow/write/binary/basic.rs @@ -9,7 +9,7 @@ use crate::parquet::encoding::{delta_bitpacked, Encoding}; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics}; use crate::write::utils::invalid_encoding; -use crate::write::Page; +use crate::write::{Page, StatisticsOptions}; pub(crate) fn encode_non_null_values<'a, I: Iterator>( iter: I, @@ -65,8 +65,8 @@ pub fn array_to_page( _ => return Err(invalid_encoding(encoding, array.data_type())), } - let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + let statistics = if options.has_statistics() { + Some(build_statistics(array, type_.clone(), &options.statistics)) } else { None }; @@ -89,21 +89,32 @@ pub fn array_to_page( pub(crate) fn build_statistics( array: &BinaryArray, primitive_type: PrimitiveType, + options: &StatisticsOptions, ) -> ParquetStatistics { BinaryStatistics { primitive_type, - null_count: Some(array.null_count() as i64), + null_count: options.null_count.then_some(array.null_count() as i64), distinct_count: None, - max_value: array - .iter() - .flatten() - .max_by(|x, y| ord_binary(x, y)) - .map(|x| x.to_vec()), - min_value: array - .iter() - .flatten() - .min_by(|x, y| ord_binary(x, y)) - .map(|x| x.to_vec()), + max_value: options + .max_value + .then(|| { + array + .iter() + .flatten() + .max_by(|x, y| ord_binary(x, y)) + .map(|x| x.to_vec()) + }) + .flatten(), + min_value: options + .min_value + .then(|| { + array + .iter() + .flatten() + .min_by(|x, y| ord_binary(x, y)) + .map(|x| x.to_vec()) + }) + .flatten(), } .serialize() } diff --git a/crates/polars-parquet/src/arrow/write/binary/nested.rs b/crates/polars-parquet/src/arrow/write/binary/nested.rs index b97f719dcc232..afc487f423334 100644 --- a/crates/polars-parquet/src/arrow/write/binary/nested.rs +++ b/crates/polars-parquet/src/arrow/write/binary/nested.rs @@ -24,8 +24,8 @@ where encode_plain(array, &mut buffer); - let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + let statistics = if options.has_statistics() { + Some(build_statistics(array, type_.clone(), &options.statistics)) } else { None }; diff --git a/crates/polars-parquet/src/arrow/write/binview/basic.rs b/crates/polars-parquet/src/arrow/write/binview/basic.rs index bbcf0845c0782..55b51d26b5f7e 100644 --- a/crates/polars-parquet/src/arrow/write/binview/basic.rs +++ b/crates/polars-parquet/src/arrow/write/binview/basic.rs @@ -7,7 +7,7 @@ use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics}; use crate::read::schema::is_nullable; use crate::write::binary::{encode_non_null_values, ord_binary}; use crate::write::utils::invalid_encoding; -use crate::write::{utils, Encoding, Page, WriteOptions}; +use crate::write::{utils, Encoding, Page, StatisticsOptions, WriteOptions}; pub(crate) fn encode_plain(array: &BinaryViewArray, buffer: &mut Vec) { let capacity = @@ -56,8 +56,8 @@ pub fn array_to_page( _ => return Err(invalid_encoding(encoding, array.data_type())), } - let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + let statistics = if options.has_statistics() { + Some(build_statistics(array, type_.clone(), &options.statistics)) } else { None }; @@ -81,21 +81,32 @@ pub fn array_to_page( pub(crate) fn build_statistics( array: &BinaryViewArray, primitive_type: PrimitiveType, + options: &StatisticsOptions, ) -> ParquetStatistics { BinaryStatistics { primitive_type, - null_count: Some(array.null_count() as i64), + null_count: options.null_count.then_some(array.null_count() as i64), distinct_count: None, - max_value: array - .iter() - .flatten() - .max_by(|x, y| ord_binary(x, y)) - .map(|x| x.to_vec()), - min_value: array - .iter() - .flatten() - .min_by(|x, y| ord_binary(x, y)) - .map(|x| x.to_vec()), + max_value: options + .max_value + .then(|| { + array + .iter() + .flatten() + .max_by(|x, y| ord_binary(x, y)) + .map(|x| x.to_vec()) + }) + .flatten(), + min_value: options + .min_value + .then(|| { + array + .iter() + .flatten() + .min_by(|x, y| ord_binary(x, y)) + .map(|x| x.to_vec()) + }) + .flatten(), } .serialize() } diff --git a/crates/polars-parquet/src/arrow/write/binview/nested.rs b/crates/polars-parquet/src/arrow/write/binview/nested.rs index 0cf7389b20110..9e76b23e6b190 100644 --- a/crates/polars-parquet/src/arrow/write/binview/nested.rs +++ b/crates/polars-parquet/src/arrow/write/binview/nested.rs @@ -20,8 +20,8 @@ pub fn array_to_page( encode_plain(array, &mut buffer); - let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + let statistics = if options.has_statistics() { + Some(build_statistics(array, type_.clone(), &options.statistics)) } else { None }; diff --git a/crates/polars-parquet/src/arrow/write/boolean/basic.rs b/crates/polars-parquet/src/arrow/write/boolean/basic.rs index 0f4414aaf16d6..f9fc4cf81efd9 100644 --- a/crates/polars-parquet/src/arrow/write/boolean/basic.rs +++ b/crates/polars-parquet/src/arrow/write/boolean/basic.rs @@ -8,6 +8,7 @@ use crate::parquet::encoding::Encoding; use crate::parquet::page::DataPage; use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::{BooleanStatistics, ParquetStatistics}; +use crate::write::StatisticsOptions; fn encode(iterator: impl Iterator, buffer: &mut Vec) -> PolarsResult<()> { // encode values using bitpacking @@ -59,8 +60,8 @@ pub fn array_to_page( encode_plain(array, is_optional, &mut buffer)?; - let statistics = if options.write_statistics { - Some(build_statistics(array)) + let statistics = if options.has_statistics() { + Some(build_statistics(array, &options.statistics)) } else { None }; @@ -79,12 +80,24 @@ pub fn array_to_page( ) } -pub(super) fn build_statistics(array: &BooleanArray) -> ParquetStatistics { +pub(super) fn build_statistics( + array: &BooleanArray, + options: &StatisticsOptions, +) -> ParquetStatistics { BooleanStatistics { - null_count: Some(array.null_count() as i64), - distinct_count: None, - max_value: array.iter().flatten().max(), - min_value: array.iter().flatten().min(), + null_count: options.null_count.then(|| array.null_count() as i64), + distinct_count: options + .distinct_count + .then(|| array.distinct_count().try_into().ok()) + .flatten(), + max_value: options + .max_value + .then(|| array.iter().flatten().max()) + .flatten(), + min_value: options + .min_value + .then(|| array.iter().flatten().min()) + .flatten(), } .serialize() } diff --git a/crates/polars-parquet/src/arrow/write/boolean/nested.rs b/crates/polars-parquet/src/arrow/write/boolean/nested.rs index eb7a66cfd32c7..3560bc167369b 100644 --- a/crates/polars-parquet/src/arrow/write/boolean/nested.rs +++ b/crates/polars-parquet/src/arrow/write/boolean/nested.rs @@ -23,8 +23,8 @@ pub fn array_to_page( encode_plain(array, is_optional, &mut buffer)?; - let statistics = if options.write_statistics { - Some(build_statistics(array)) + let statistics = if options.has_statistics() { + Some(build_statistics(array, &options.statistics)) } else { None }; diff --git a/crates/polars-parquet/src/arrow/write/dictionary.rs b/crates/polars-parquet/src/arrow/write/dictionary.rs index 1d26e41d9ba7c..5b397ed232778 100644 --- a/crates/polars-parquet/src/arrow/write/dictionary.rs +++ b/crates/polars-parquet/src/arrow/write/dictionary.rs @@ -190,8 +190,12 @@ macro_rules! dyn_prim { let buffer = primitive_encode_plain::<$from, $to>(values, false, vec![]); - let stats: Option = if $options.write_statistics { - let mut stats = primitive_build_statistics::<$from, $to>(values, $type_.clone()); + let stats: Option = if !$options.statistics.is_empty() { + let mut stats = primitive_build_statistics::<$from, $to>( + values, + $type_.clone(), + &$options.statistics, + ); stats.null_count = Some($array.null_count() as i64); Some(stats.serialize()) } else { @@ -240,8 +244,12 @@ pub fn array_to_pages( let mut buffer = vec![]; binary_encode_plain::(array, &mut buffer); - let stats = if options.write_statistics { - Some(binary_build_statistics(array, type_.clone())) + let stats = if options.has_statistics() { + Some(binary_build_statistics( + array, + type_.clone(), + &options.statistics, + )) } else { None }; @@ -256,8 +264,12 @@ pub fn array_to_pages( let mut buffer = vec![]; binview::encode_plain(array, &mut buffer); - let stats = if options.write_statistics { - Some(binview::build_statistics(array, type_.clone())) + let stats = if options.has_statistics() { + Some(binview::build_statistics( + array, + type_.clone(), + &options.statistics, + )) } else { None }; @@ -273,8 +285,12 @@ pub fn array_to_pages( let mut buffer = vec![]; binview::encode_plain(&array, &mut buffer); - let stats = if options.write_statistics { - Some(binview::build_statistics(&array, type_.clone())) + let stats = if options.has_statistics() { + Some(binview::build_statistics( + &array, + type_.clone(), + &options.statistics, + )) } else { None }; @@ -285,8 +301,12 @@ pub fn array_to_pages( let mut buffer = vec![]; binary_encode_plain::(values, &mut buffer); - let stats = if options.write_statistics { - Some(binary_build_statistics(values, type_.clone())) + let stats = if options.has_statistics() { + Some(binary_build_statistics( + values, + type_.clone(), + &options.statistics, + )) } else { None }; @@ -296,8 +316,12 @@ pub fn array_to_pages( let mut buffer = vec![]; let array = array.values().as_any().downcast_ref().unwrap(); fixed_binary_encode_plain(array, false, &mut buffer); - let stats = if options.write_statistics { - let stats = fixed_binary_build_statistics(array, type_.clone()); + let stats = if options.has_statistics() { + let stats = fixed_binary_build_statistics( + array, + type_.clone(), + &options.statistics, + ); Some(stats.serialize()) } else { None diff --git a/crates/polars-parquet/src/arrow/write/file.rs b/crates/polars-parquet/src/arrow/write/file.rs index cce916b0ae9af..d4162b8c08d5f 100644 --- a/crates/polars-parquet/src/arrow/write/file.rs +++ b/crates/polars-parquet/src/arrow/write/file.rs @@ -61,7 +61,7 @@ impl FileWriter { parquet_schema, FileWriteOptions { version: options.version, - write_statistics: options.write_statistics, + write_statistics: options.has_statistics(), }, created_by, ), diff --git a/crates/polars-parquet/src/arrow/write/fixed_len_bytes.rs b/crates/polars-parquet/src/arrow/write/fixed_len_bytes.rs index 46291e7312d63..bf15c0ab50ccf 100644 --- a/crates/polars-parquet/src/arrow/write/fixed_len_bytes.rs +++ b/crates/polars-parquet/src/arrow/write/fixed_len_bytes.rs @@ -3,7 +3,7 @@ use arrow::types::i256; use polars_error::PolarsResult; use super::binary::ord_binary; -use super::{utils, WriteOptions}; +use super::{utils, StatisticsOptions, WriteOptions}; use crate::arrow::read::schema::is_nullable; use crate::parquet::encoding::Encoding; use crate::parquet::page::DataPage; @@ -62,21 +62,32 @@ pub fn array_to_page( pub(super) fn build_statistics( array: &FixedSizeBinaryArray, primitive_type: PrimitiveType, + options: &StatisticsOptions, ) -> FixedLenStatistics { FixedLenStatistics { primitive_type, - null_count: Some(array.null_count() as i64), + null_count: options.null_count.then_some(array.null_count() as i64), distinct_count: None, - max_value: array - .iter() - .flatten() - .max_by(|x, y| ord_binary(x, y)) - .map(|x| x.to_vec()), - min_value: array - .iter() - .flatten() - .min_by(|x, y| ord_binary(x, y)) - .map(|x| x.to_vec()), + max_value: options + .max_value + .then(|| { + array + .iter() + .flatten() + .max_by(|x, y| ord_binary(x, y)) + .map(|x| x.to_vec()) + }) + .flatten(), + min_value: options + .min_value + .then(|| { + array + .iter() + .flatten() + .min_by(|x, y| ord_binary(x, y)) + .map(|x| x.to_vec()) + }) + .flatten(), } } @@ -84,21 +95,32 @@ pub(super) fn build_statistics_decimal( array: &PrimitiveArray, primitive_type: PrimitiveType, size: usize, + options: &StatisticsOptions, ) -> FixedLenStatistics { FixedLenStatistics { primitive_type, - null_count: Some(array.null_count() as i64), + null_count: options.null_count.then_some(array.null_count() as i64), distinct_count: None, - max_value: array - .iter() - .flatten() - .max() - .map(|x| x.to_be_bytes()[16 - size..].to_vec()), - min_value: array - .iter() - .flatten() - .min() - .map(|x| x.to_be_bytes()[16 - size..].to_vec()), + max_value: options + .max_value + .then(|| { + array + .iter() + .flatten() + .max() + .map(|x| x.to_be_bytes()[16 - size..].to_vec()) + }) + .flatten(), + min_value: options + .min_value + .then(|| { + array + .iter() + .flatten() + .min() + .map(|x| x.to_be_bytes()[16 - size..].to_vec()) + }) + .flatten(), } } @@ -106,21 +128,32 @@ pub(super) fn build_statistics_decimal256_with_i128( array: &PrimitiveArray, primitive_type: PrimitiveType, size: usize, + options: &StatisticsOptions, ) -> FixedLenStatistics { FixedLenStatistics { primitive_type, - null_count: Some(array.null_count() as i64), + null_count: options.null_count.then_some(array.null_count() as i64), distinct_count: None, - max_value: array - .iter() - .flatten() - .max() - .map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()), - min_value: array - .iter() - .flatten() - .min() - .map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()), + max_value: options + .max_value + .then(|| { + array + .iter() + .flatten() + .max() + .map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()) + }) + .flatten(), + min_value: options + .min_value + .then(|| { + array + .iter() + .flatten() + .min() + .map(|x| x.0.low().to_be_bytes()[16 - size..].to_vec()) + }) + .flatten(), } } @@ -128,20 +161,31 @@ pub(super) fn build_statistics_decimal256( array: &PrimitiveArray, primitive_type: PrimitiveType, size: usize, + options: &StatisticsOptions, ) -> FixedLenStatistics { FixedLenStatistics { primitive_type, - null_count: Some(array.null_count() as i64), + null_count: options.null_count.then_some(array.null_count() as i64), distinct_count: None, - max_value: array - .iter() - .flatten() - .max() - .map(|x| x.0.to_be_bytes()[32 - size..].to_vec()), - min_value: array - .iter() - .flatten() - .min() - .map(|x| x.0.to_be_bytes()[32 - size..].to_vec()), + max_value: options + .max_value + .then(|| { + array + .iter() + .flatten() + .max() + .map(|x| x.0.to_be_bytes()[32 - size..].to_vec()) + }) + .flatten(), + min_value: options + .min_value + .then(|| { + array + .iter() + .flatten() + .min() + .map(|x| x.0.to_be_bytes()[32 - size..].to_vec()) + }) + .flatten(), } } diff --git a/crates/polars-parquet/src/arrow/write/mod.rs b/crates/polars-parquet/src/arrow/write/mod.rs index e5f46b39a476a..90f876baed99a 100644 --- a/crates/polars-parquet/src/arrow/write/mod.rs +++ b/crates/polars-parquet/src/arrow/write/mod.rs @@ -50,11 +50,32 @@ pub use crate::parquet::write::{ }; pub use crate::parquet::{fallible_streaming_iterator, FallibleStreamingIterator}; +/// The statistics to write +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct StatisticsOptions { + pub min_value: bool, + pub max_value: bool, + pub distinct_count: bool, + pub null_count: bool, +} + +impl Default for StatisticsOptions { + fn default() -> Self { + Self { + min_value: true, + max_value: true, + distinct_count: false, + null_count: true, + } + } +} + /// Currently supported options to write to parquet #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct WriteOptions { /// Whether to write statistics - pub write_statistics: bool, + pub statistics: StatisticsOptions, /// The page and file version to use pub version: Version, /// The compression to apply to every page @@ -75,6 +96,40 @@ pub use sink::FileSink; use crate::write::dictionary::encode_as_dictionary_optional; +impl StatisticsOptions { + pub fn empty() -> Self { + Self { + min_value: false, + max_value: false, + distinct_count: false, + null_count: false, + } + } + + pub fn full() -> Self { + Self { + min_value: true, + max_value: true, + distinct_count: true, + null_count: true, + } + } + + pub fn is_empty(&self) -> bool { + !(self.min_value || self.max_value || self.distinct_count || self.null_count) + } + + pub fn is_full(&self) -> bool { + self.min_value && self.max_value && self.distinct_count && self.null_count + } +} + +impl WriteOptions { + pub fn has_statistics(&self) -> bool { + !self.statistics.is_empty() + } +} + /// returns offset and length to slice the leaf values pub fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) { // find the deepest recursive dremel structure as that one determines how many values we must @@ -441,8 +496,12 @@ pub fn array_to_page_simple( values.into(), array.validity().cloned(), ); - let statistics = if options.write_statistics { - Some(fixed_len_bytes::build_statistics(&array, type_.clone())) + let statistics = if options.has_statistics() { + Some(fixed_len_bytes::build_statistics( + &array, + type_.clone(), + &options.statistics, + )) } else { None }; @@ -464,8 +523,12 @@ pub fn array_to_page_simple( values.into(), array.validity().cloned(), ); - let statistics = if options.write_statistics { - Some(fixed_len_bytes::build_statistics(&array, type_.clone())) + let statistics = if options.has_statistics() { + Some(fixed_len_bytes::build_statistics( + &array, + type_.clone(), + &options.statistics, + )) } else { None }; @@ -473,8 +536,12 @@ pub fn array_to_page_simple( }, ArrowDataType::FixedSizeBinary(_) => { let array = array.as_any().downcast_ref().unwrap(); - let statistics = if options.write_statistics { - Some(fixed_len_bytes::build_statistics(array, type_.clone())) + let statistics = if options.has_statistics() { + Some(fixed_len_bytes::build_statistics( + array, + type_.clone(), + &options.statistics, + )) } else { None }; @@ -521,11 +588,12 @@ pub fn array_to_page_simple( ); } else if precision <= 38 { let size = decimal_length_from_precision(precision); - let statistics = if options.write_statistics { + let statistics = if options.has_statistics() { let stats = fixed_len_bytes::build_statistics_decimal256_with_i128( array, type_.clone(), size, + &options.statistics, ); Some(stats) } else { @@ -549,9 +617,13 @@ pub fn array_to_page_simple( .as_any() .downcast_ref::>() .unwrap(); - let statistics = if options.write_statistics { - let stats = - fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size); + let statistics = if options.has_statistics() { + let stats = fixed_len_bytes::build_statistics_decimal256( + array, + type_.clone(), + size, + &options.statistics, + ); Some(stats) } else { None @@ -611,9 +683,13 @@ pub fn array_to_page_simple( } else { let size = decimal_length_from_precision(precision); - let statistics = if options.write_statistics { - let stats = - fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size); + let statistics = if options.has_statistics() { + let stats = fixed_len_bytes::build_statistics_decimal( + array, + type_.clone(), + size, + &options.statistics, + ); Some(stats) } else { None @@ -750,9 +826,13 @@ fn array_to_page_nested( } else { let size = decimal_length_from_precision(precision); - let statistics = if options.write_statistics { - let stats = - fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size); + let statistics = if options.has_statistics() { + let stats = fixed_len_bytes::build_statistics_decimal( + array, + type_.clone(), + size, + &options.statistics, + ); Some(stats) } else { None @@ -807,11 +887,12 @@ fn array_to_page_nested( primitive::nested_array_to_page::(&array, options, type_, nested) } else if precision <= 38 { let size = decimal_length_from_precision(precision); - let statistics = if options.write_statistics { + let statistics = if options.has_statistics() { let stats = fixed_len_bytes::build_statistics_decimal256_with_i128( array, type_.clone(), size, + &options.statistics, ); Some(stats) } else { @@ -835,9 +916,13 @@ fn array_to_page_nested( .as_any() .downcast_ref::>() .unwrap(); - let statistics = if options.write_statistics { - let stats = - fixed_len_bytes::build_statistics_decimal256(array, type_.clone(), size); + let statistics = if options.has_statistics() { + let stats = fixed_len_bytes::build_statistics_decimal256( + array, + type_.clone(), + size, + &options.statistics, + ); Some(stats) } else { None diff --git a/crates/polars-parquet/src/arrow/write/primitive/basic.rs b/crates/polars-parquet/src/arrow/write/primitive/basic.rs index 7ea284d74dfcf..81a2101b37d6e 100644 --- a/crates/polars-parquet/src/arrow/write/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/write/primitive/basic.rs @@ -12,6 +12,7 @@ use crate::parquet::schema::types::PrimitiveType; use crate::parquet::statistics::PrimitiveStatistics; use crate::parquet::types::NativeType as ParquetNativeType; use crate::read::Page; +use crate::write::StatisticsOptions; pub(crate) fn encode_plain( array: &PrimitiveArray, @@ -136,8 +137,8 @@ where let buffer = encode(array, is_optional, buffer); - let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone()).serialize()) + let statistics = if options.has_statistics() { + Some(build_statistics(array, type_.clone(), &options.statistics).serialize()) } else { None }; @@ -159,6 +160,7 @@ where pub fn build_statistics( array: &PrimitiveArray, primitive_type: PrimitiveType, + options: &StatisticsOptions, ) -> PrimitiveStatistics

where T: NativeType, @@ -167,21 +169,31 @@ where { PrimitiveStatistics::

{ primitive_type, - null_count: Some(array.null_count() as i64), + null_count: options.null_count.then_some(array.null_count() as i64), distinct_count: None, - max_value: array - .non_null_values_iter() - .map(|x| { - let x: P = x.as_(); - x + max_value: options + .max_value + .then(|| { + array + .non_null_values_iter() + .map(|x| { + let x: P = x.as_(); + x + }) + .max_by(|x, y| x.ord(y)) }) - .max_by(|x, y| x.ord(y)), - min_value: array - .non_null_values_iter() - .map(|x| { - let x: P = x.as_(); - x + .flatten(), + min_value: options + .min_value + .then(|| { + array + .non_null_values_iter() + .map(|x| { + let x: P = x.as_(); + x + }) + .min_by(|x, y| x.ord(y)) }) - .min_by(|x, y| x.ord(y)), + .flatten(), } } diff --git a/crates/polars-parquet/src/arrow/write/primitive/nested.rs b/crates/polars-parquet/src/arrow/write/primitive/nested.rs index 012846521d881..918afa6a4dc68 100644 --- a/crates/polars-parquet/src/arrow/write/primitive/nested.rs +++ b/crates/polars-parquet/src/arrow/write/primitive/nested.rs @@ -31,8 +31,8 @@ where let buffer = encode_plain(array, is_optional, buffer); - let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone()).serialize()) + let statistics = if options.has_statistics() { + Some(build_statistics(array, type_.clone(), &options.statistics).serialize()) } else { None }; diff --git a/crates/polars-parquet/src/arrow/write/sink.rs b/crates/polars-parquet/src/arrow/write/sink.rs index 8c645a8de2e16..ca93975a4c464 100644 --- a/crates/polars-parquet/src/arrow/write/sink.rs +++ b/crates/polars-parquet/src/arrow/write/sink.rs @@ -58,7 +58,7 @@ where parquet_schema.clone(), ParquetWriteOptions { version: options.version, - write_statistics: options.write_statistics, + write_statistics: options.has_statistics(), }, created_by, ); diff --git a/crates/polars/tests/it/io/parquet/arrow/mod.rs b/crates/polars/tests/it/io/parquet/arrow/mod.rs index abd7d8e1d9b41..f70866a58a502 100644 --- a/crates/polars/tests/it/io/parquet/arrow/mod.rs +++ b/crates/polars/tests/it/io/parquet/arrow/mod.rs @@ -564,7 +564,7 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { max_value: Box::new(Utf8ViewArray::from_slice([Some("def")])), }, "bool" => Statistics { - distinct_count: UInt64Array::from([None]).boxed(), + distinct_count: UInt64Array::from([Some(2)]).boxed(), null_count: UInt64Array::from([Some(4)]).boxed(), min_value: Box::new(BooleanArray::from_slice([false])), max_value: Box::new(BooleanArray::from_slice([true])), @@ -701,7 +701,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { max_value: new_list(Box::new(Int16Array::from_slice([10])), true).boxed(), }, "list_bool" => Statistics { - distinct_count: new_list(UInt64Array::from([None]).boxed(), true).boxed(), + distinct_count: new_list(UInt64Array::from([Some(2)]).boxed(), true).boxed(), null_count: new_list(UInt64Array::from([Some(1)]).boxed(), true).boxed(), min_value: new_list(Box::new(BooleanArray::from_slice([false])), true).boxed(), max_value: new_list(Box::new(BooleanArray::from_slice([true])), true).boxed(), @@ -1096,7 +1096,7 @@ pub fn pyarrow_struct_statistics(column: &str) -> Statistics { distinct_count: new_struct( vec![ Box::new(UInt64Array::from([None])), - Box::new(UInt64Array::from([None])), + Box::new(UInt64Array::from([Some(2)])), ], names.clone(), ) @@ -1130,7 +1130,7 @@ pub fn pyarrow_struct_statistics(column: &str) -> Statistics { new_struct( vec![ Box::new(UInt64Array::from([None])), - Box::new(UInt64Array::from([None])), + Box::new(UInt64Array::from([Some(2)])), ], names.clone(), ) @@ -1257,7 +1257,7 @@ fn integration_write( chunks: &[RecordBatchT>], ) -> PolarsResult> { let options = WriteOptions { - write_statistics: true, + statistics: StatisticsOptions::full(), compression: CompressionOptions::Uncompressed, version: Version::V1, data_pagesize_limit: None, diff --git a/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs b/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs index 28fa8afe31cea..c824ff7b38fc3 100644 --- a/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs +++ b/crates/polars/tests/it/io/parquet/arrow/read_indexes.rs @@ -29,7 +29,7 @@ fn pages( let parquet_schema = to_parquet_schema(&schema)?; let options = WriteOptions { - write_statistics: true, + statistics: StatisticsOptions::full(), compression: CompressionOptions::Uncompressed, version: Version::V1, data_pagesize_limit: None, @@ -79,7 +79,7 @@ fn read_with_indexes( expected: Box, ) -> PolarsResult<()> { let options = WriteOptions { - write_statistics: true, + statistics: StatisticsOptions::full(), compression: CompressionOptions::Uncompressed, version: Version::V1, data_pagesize_limit: None, diff --git a/crates/polars/tests/it/io/parquet/arrow/write.rs b/crates/polars/tests/it/io/parquet/arrow/write.rs index ed50a92dfcf61..f03e4e79e2b1c 100644 --- a/crates/polars/tests/it/io/parquet/arrow/write.rs +++ b/crates/polars/tests/it/io/parquet/arrow/write.rs @@ -45,7 +45,7 @@ fn round_trip_opt_stats( let schema = ArrowSchema::from(vec![field]); let options = WriteOptions { - write_statistics: true, + statistics: StatisticsOptions::full(), compression, version, data_pagesize_limit: None, diff --git a/crates/polars/tests/it/io/parquet/roundtrip.rs b/crates/polars/tests/it/io/parquet/roundtrip.rs index f283f2abcde70..dd55eac0e9f5a 100644 --- a/crates/polars/tests/it/io/parquet/roundtrip.rs +++ b/crates/polars/tests/it/io/parquet/roundtrip.rs @@ -6,7 +6,9 @@ use arrow::record_batch::RecordBatchT; use polars_error::PolarsResult; use polars_parquet::arrow::write::{FileWriter, WriteOptions}; use polars_parquet::read::read_metadata; -use polars_parquet::write::{CompressionOptions, Encoding, RowGroupIterator, Version}; +use polars_parquet::write::{ + CompressionOptions, Encoding, RowGroupIterator, StatisticsOptions, Version, +}; fn round_trip( array: &ArrayRef, @@ -18,7 +20,7 @@ fn round_trip( let schema = ArrowSchema::from(vec![field]); let options = WriteOptions { - write_statistics: true, + statistics: StatisticsOptions::full(), compression, version, data_pagesize_limit: None, diff --git a/py-polars/polars/dataframe/frame.py b/py-polars/polars/dataframe/frame.py index 86d858bb68c6d..96f3672aaac62 100644 --- a/py-polars/polars/dataframe/frame.py +++ b/py-polars/polars/dataframe/frame.py @@ -3287,7 +3287,7 @@ def write_parquet( *, compression: ParquetCompression = "zstd", compression_level: int | None = None, - statistics: bool = True, + statistics: bool | str | dict[str, bool] = True, row_group_size: int | None = None, data_page_size: int | None = None, use_pyarrow: bool = False, @@ -3315,6 +3315,19 @@ def write_parquet( statistics Write statistics to the parquet headers. This is the default behavior. + + Possible values: + + - `True`: enable default set of statistics (default) + - `False`: disable all statistics + - "full": calculate and write all available statistics. Cannot be + combined with `use_pyarrow`. + - `{ "statistic-key": True / False, ... }`. Cannot be combined with + `use_pyarrow`. Available keys: + - "min": column minimum value (default: `True`) + - "max": column maximum value (default: `True`) + - "distinct_count": number of unique column values (default: `False`) + - "null_count": number of null values in column (default: `True`) row_group_size Size of the row groups in number of rows. Defaults to 512^2 rows. data_page_size @@ -3366,6 +3379,10 @@ def write_parquet( file = normalize_filepath(file) if use_pyarrow: + if statistics == "full" or isinstance(statistics, dict): + msg = "write_parquet with `use_pyarrow=True` allows only boolean values for `statistics`" + raise ValueError(msg) + tbl = self.to_arrow() data = {} @@ -3405,6 +3422,23 @@ def write_parquet( ) else: + if isinstance(statistics, bool) and statistics: + statistics = { + "min": True, + "max": True, + "distinct_count": False, + "null_count": True, + } + elif isinstance(statistics, bool) and not statistics: + statistics = {} + elif statistics == "full": + statistics = { + "min": True, + "max": True, + "distinct_count": True, + "null_count": True, + } + self._df.write_parquet( file, compression, diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index aa2a3025e9243..796e105be3555 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -2035,7 +2035,7 @@ def sink_parquet( *, compression: str = "zstd", compression_level: int | None = None, - statistics: bool = True, + statistics: bool | str | dict[str, bool] = True, row_group_size: int | None = None, data_pagesize_limit: int | None = None, maintain_order: bool = True, @@ -2073,6 +2073,19 @@ def sink_parquet( - "zstd" : min-level: 1, max-level: 22. statistics Write statistics to the parquet headers. This is the default behavior. + + Possible values: + + - `True`: enable default set of statistics (default) + - `False`: disable all statistics + - "full": calculate and write all available statistics. Cannot be + combined with `use_pyarrow`. + - `{ "statistic-key": True / False, ... }`. Cannot be combined with + `use_pyarrow`. Available keys: + - "min": column minimum value (default: `True`) + - "max": column maximum value (default: `True`) + - "distinct_count": number of unique column values (default: `False`) + - "null_count": number of null values in column (default: `True`) row_group_size Size of the row groups in number of rows. If None (default), the chunks of the `DataFrame` are @@ -2115,6 +2128,23 @@ def sink_parquet( no_optimization=no_optimization, ) + if isinstance(statistics, bool) and statistics: + statistics = { + "min": True, + "max": True, + "distinct_count": False, + "null_count": True, + } + elif isinstance(statistics, bool) and not statistics: + statistics = {} + elif statistics == "full": + statistics = { + "min": True, + "max": True, + "distinct_count": True, + "null_count": True, + } + return lf.sink_parquet( path=normalize_filepath(path), compression=compression, diff --git a/py-polars/src/conversion/mod.rs b/py-polars/src/conversion/mod.rs index 83623612a5cb3..4d90648c37e66 100644 --- a/py-polars/src/conversion/mod.rs +++ b/py-polars/src/conversion/mod.rs @@ -17,6 +17,7 @@ use polars_core::utils::arrow::array::Array; use polars_core::utils::arrow::types::NativeType; use polars_core::utils::materialize_dyn_int; use polars_lazy::prelude::*; +use polars_parquet::write::StatisticsOptions; use polars_utils::total_ord::{TotalEq, TotalHash}; use pyo3::basic::CompareOp; use pyo3::exceptions::{PyTypeError, PyValueError}; @@ -437,6 +438,32 @@ impl ToPyObject for Wrap { } } +impl<'s> FromPyObject<'s> for Wrap { + fn extract_bound(ob: &Bound<'s, PyAny>) -> PyResult { + let mut statistics = StatisticsOptions::empty(); + + let dict = ob.downcast::()?; + for (key, val) in dict { + let key = key.extract::()?; + let val = val.extract::()?; + + match key.as_ref() { + "min" => statistics.min_value = val, + "max" => statistics.max_value = val, + "distinct_count" => statistics.distinct_count = val, + "null_count" => statistics.null_count = val, + _ => { + return Err(PyTypeError::new_err(format!( + "'{key}' is not a valid statistic option", + ))) + }, + } + } + + Ok(Wrap(statistics)) + } +} + impl<'s> FromPyObject<'s> for Wrap> { fn extract_bound(ob: &Bound<'s, PyAny>) -> PyResult { let vals = ob.extract::>>>()?; diff --git a/py-polars/src/dataframe/io.rs b/py-polars/src/dataframe/io.rs index 151e80435c806..a9552610a62a4 100644 --- a/py-polars/src/dataframe/io.rs +++ b/py-polars/src/dataframe/io.rs @@ -6,6 +6,8 @@ use std::ops::Deref; use polars::io::avro::AvroCompression; use polars::io::mmap::{try_create_file, ReaderBytes}; use polars::io::RowIndex; +#[cfg(feature = "parquet")] +use polars_parquet::arrow::write::StatisticsOptions; use pyo3::prelude::*; use pyo3::pybacked::PyBackedStr; @@ -442,7 +444,7 @@ impl PyDataFrame { py_f: PyObject, compression: &str, compression_level: Option, - statistics: bool, + statistics: Wrap, row_group_size: Option, data_page_size: Option, ) -> PyResult<()> { @@ -453,7 +455,7 @@ impl PyDataFrame { py.allow_threads(|| { ParquetWriter::new(f) .with_compression(compression) - .with_statistics(statistics) + .with_statistics(statistics.0) .with_row_group_size(row_group_size) .with_data_page_size(data_page_size) .finish(&mut self.df) @@ -463,7 +465,7 @@ impl PyDataFrame { let buf = get_file_like(py_f, true)?; ParquetWriter::new(buf) .with_compression(compression) - .with_statistics(statistics) + .with_statistics(statistics.0) .with_row_group_size(row_group_size) .with_data_page_size(data_page_size) .finish(&mut self.df) diff --git a/py-polars/src/lazyframe/mod.rs b/py-polars/src/lazyframe/mod.rs index 7aa96c9640a96..c54a0cc6576c7 100644 --- a/py-polars/src/lazyframe/mod.rs +++ b/py-polars/src/lazyframe/mod.rs @@ -11,6 +11,8 @@ use polars::io::cloud::CloudOptions; use polars::io::{HiveOptions, RowIndex}; use polars::time::*; use polars_core::prelude::*; +#[cfg(feature = "parquet")] +use polars_parquet::arrow::write::StatisticsOptions; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::pybacked::{PyBackedBytes, PyBackedStr}; @@ -659,7 +661,7 @@ impl PyLazyFrame { path: PathBuf, compression: &str, compression_level: Option, - statistics: bool, + statistics: Wrap, row_group_size: Option, data_pagesize_limit: Option, maintain_order: bool, @@ -668,7 +670,7 @@ impl PyLazyFrame { let options = ParquetWriteOptions { compression, - statistics, + statistics: statistics.0, row_group_size, data_pagesize_limit, maintain_order,