Skip to content

Commit

Permalink
feat(rust,py): detail write_parquet statistics par
Browse files Browse the repository at this point in the history
Adds additional control over which statistics are written into Parquet files
through the `write_parquet` parameter `statistics`.

It is now possible to specify `"full"` to also attempt to add the
`distinct_count` statistic (currently only added for `Booleans`). It is also
possible to give a `dict[str, bool]` to specify individual statistics `min`,
`max`, `distinct_count` and `null_count`.

Fixes #16441
  • Loading branch information
coastalwhite committed May 31, 2024
1 parent 0eb8384 commit 5a335d0
Show file tree
Hide file tree
Showing 30 changed files with 518 additions and 166 deletions.
53 changes: 53 additions & 0 deletions crates/polars-arrow/src/array/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,59 @@ impl BooleanArray {
validity,
}
}

/// Calculate the number of unique non-null elements in [`Self`]
pub fn distinct_count(&self) -> usize {
if self.null_count() == 0 {
let mut seen = 0b00u8;

let mut fast_iter = self.values.fast_iter_u64();

for value in &mut fast_iter {
let has_one = value.count_ones() != 0;
let has_zero = value.count_zeros() != 0;

seen |= u8::from(has_one) << 1;
seen |= u8::from(has_zero);

if seen == 0b11 {
return 2;
}
}

let ([rem_lhs, rem_rhs], num_bits) = fast_iter.remainder();
let num_bits = num_bits as u32;

let (has_one, has_zero) = if num_bits >= 64 {
(
rem_lhs.count_ones() != 0 && rem_rhs.count_ones() != 0,
rem_rhs.count_zeros() != 0 && (rem_rhs.count_zeros() - (num_bits - 64)) != 0,
)
} else {
(
rem_lhs.count_ones() != 0,
(rem_rhs.count_zeros() - num_bits) != 0,
)
};

seen |= u8::from(has_one) << 1;
seen |= u8::from(has_zero);

seen.count_ones() as usize
} else {
let mut seen = 0b00u8;

for value in self.non_null_values_iter() {
seen |= 1 << u64::from(value);

if seen == 0b111 {
return 3;
}
}

seen.count_ones() as usize
}
}
}

impl Array for BooleanArray {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-core/src/chunked_array/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct Metadata<T: PolarsDataType> {
min_value: Option<T::OwnedPhysical>,
max_value: Option<T::OwnedPhysical>,

/// Number of unique non-null values
distinct_count: Option<IdxSize>,
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-core/src/series/comparison.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ macro_rules! impl_compare {
_ => unimplemented!(),
};
out.rename(lhs.name());
Ok(out) as PolarsResult<BooleanChunked>
PolarsResult::Ok(out)
}};
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ json = [
"dtype-struct",
"csv",
]
serde = ["dep:serde", "polars-core/serde-lazy"]
serde = ["dep:serde", "polars-core/serde-lazy", "polars-parquet/serde"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrows streaming ipc file parsing
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ mod writer;

pub use batched_writer::BatchedWriter;
pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel};
pub use polars_parquet::write::RowGroupIterColumns;
pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions};
pub use writer::ParquetWriter;
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use polars_error::PolarsResult;
use polars_parquet::write::{
BrotliLevel as BrotliLevelParquet, CompressionOptions, GzipLevel as GzipLevelParquet,
ZstdLevel as ZstdLevelParquet,
StatisticsOptions, ZstdLevel as ZstdLevelParquet,
};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand All @@ -12,7 +12,7 @@ pub struct ParquetWriteOptions {
/// Data page compression
pub compression: ParquetCompression,
/// Compute and write column statistics.
pub statistics: bool,
pub statistics: StatisticsOptions,
/// If `None` will be all written to a single row group.
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
Expand Down
11 changes: 6 additions & 5 deletions crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::sync::Mutex;
use arrow::datatypes::PhysicalType;
use polars_core::prelude::*;
use polars_parquet::write::{
to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, Version, WriteOptions,
to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, StatisticsOptions,
Version, WriteOptions,
};

use super::batched_writer::BatchedWriter;
Expand All @@ -18,7 +19,7 @@ pub struct ParquetWriter<W> {
/// Data page compression
compression: CompressionOptions,
/// Compute and write column statistics.
statistics: bool,
statistics: StatisticsOptions,
/// if `None` will be 512^2 rows
row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
Expand All @@ -39,7 +40,7 @@ where
ParquetWriter {
writer,
compression: ParquetCompression::default().into(),
statistics: true,
statistics: StatisticsOptions::default(),
row_group_size: None,
data_page_size: None,
parallel: true,
Expand All @@ -56,7 +57,7 @@ where
}

/// Compute and write statistic
pub fn with_statistics(mut self, statistics: bool) -> Self {
pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {
self.statistics = statistics;
self
}
Expand Down Expand Up @@ -100,7 +101,7 @@ where

fn materialize_options(&self) -> WriteOptions {
WriteOptions {
write_statistics: self.statistics,
statistics: self.statistics,
compression: self.compression,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn init_files() {
#[cfg(feature = "parquet")]
{
ParquetWriter::new(f)
.with_statistics(true)
.with_statistics(StatisticsOptions::full())
.finish(&mut df)
.unwrap();
}
Expand Down
39 changes: 25 additions & 14 deletions crates/polars-parquet/src/arrow/write/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::parquet::encoding::{delta_bitpacked, Encoding};
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};
use crate::write::utils::invalid_encoding;
use crate::write::Page;
use crate::write::{Page, StatisticsOptions};

pub(crate) fn encode_non_null_values<'a, I: Iterator<Item = &'a [u8]>>(
iter: I,
Expand Down Expand Up @@ -65,8 +65,8 @@ pub fn array_to_page<O: Offset>(
_ => return Err(invalid_encoding(encoding, array.data_type())),
}

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand All @@ -89,21 +89,32 @@ pub fn array_to_page<O: Offset>(
pub(crate) fn build_statistics<O: Offset>(
array: &BinaryArray<O>,
primitive_type: PrimitiveType,
options: &StatisticsOptions,
) -> ParquetStatistics {
BinaryStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
null_count: options.null_count.then_some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
min_value: array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
max_value: options
.max_value
.then(|| {
array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
min_value: options
.min_value
.then(|| {
array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
}
.serialize()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ where

encode_plain(array, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand Down
39 changes: 25 additions & 14 deletions crates/polars-parquet/src/arrow/write/binview/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};
use crate::read::schema::is_nullable;
use crate::write::binary::{encode_non_null_values, ord_binary};
use crate::write::utils::invalid_encoding;
use crate::write::{utils, Encoding, Page, WriteOptions};
use crate::write::{utils, Encoding, Page, StatisticsOptions, WriteOptions};

pub(crate) fn encode_plain(array: &BinaryViewArray, buffer: &mut Vec<u8>) {
let capacity =
Expand Down Expand Up @@ -56,8 +56,8 @@ pub fn array_to_page(
_ => return Err(invalid_encoding(encoding, array.data_type())),
}

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand All @@ -81,21 +81,32 @@ pub fn array_to_page(
pub(crate) fn build_statistics(
array: &BinaryViewArray,
primitive_type: PrimitiveType,
options: &StatisticsOptions,
) -> ParquetStatistics {
BinaryStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
null_count: options.null_count.then_some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
min_value: array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
max_value: options
.max_value
.then(|| {
array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
min_value: options
.min_value
.then(|| {
array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
}
.serialize()
}
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/binview/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub fn array_to_page(

encode_plain(array, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand Down
27 changes: 20 additions & 7 deletions crates/polars-parquet/src/arrow/write/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::parquet::encoding::Encoding;
use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::{BooleanStatistics, ParquetStatistics};
use crate::write::StatisticsOptions;

fn encode(iterator: impl Iterator<Item = bool>, buffer: &mut Vec<u8>) -> PolarsResult<()> {
// encode values using bitpacking
Expand Down Expand Up @@ -59,8 +60,8 @@ pub fn array_to_page(

encode_plain(array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Some(build_statistics(array))
let statistics = if options.has_statistics() {
Some(build_statistics(array, &options.statistics))
} else {
None
};
Expand All @@ -79,12 +80,24 @@ pub fn array_to_page(
)
}

pub(super) fn build_statistics(array: &BooleanArray) -> ParquetStatistics {
pub(super) fn build_statistics(
array: &BooleanArray,
options: &StatisticsOptions,
) -> ParquetStatistics {
BooleanStatistics {
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array.iter().flatten().max(),
min_value: array.iter().flatten().min(),
null_count: options.null_count.then(|| array.null_count() as i64),
distinct_count: options
.distinct_count
.then(|| array.distinct_count().try_into().ok())
.flatten(),
max_value: options
.max_value
.then(|| array.iter().flatten().max())
.flatten(),
min_value: options
.min_value
.then(|| array.iter().flatten().min())
.flatten(),
}
.serialize()
}
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub fn array_to_page(

encode_plain(array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Some(build_statistics(array))
let statistics = if options.has_statistics() {
Some(build_statistics(array, &options.statistics))
} else {
None
};
Expand Down
Loading

0 comments on commit 5a335d0

Please sign in to comment.