Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python,rust!): Allow specifying Hive schema in read/scan_parquet #15434

Merged
merged 26 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 86 additions & 45 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,24 @@ pub fn apply_predicate(
Ok(())
}

/// The statistics for a column in a Parquet file
/// or Hive partition.
/// they typically hold
/// - max value
/// - min value
/// - null_count
/// Statistics of the values in a column.
///
/// The following statistics are tracked for each row group:
/// - Null count
/// - Minimum value
/// - Maximum value
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ColumnStats {
field: Field,
// The array may hold the null count for every row group,
// or for a single row group.
// Each Series contains the stats for each row group.
null_count: Option<Series>,
min_value: Option<Series>,
max_value: Option<Series>,
}

impl ColumnStats {
/// Constructs a new [`ColumnStats`].
pub fn new(
field: Field,
null_count: Option<Series>,
Expand All @@ -70,6 +70,17 @@ impl ColumnStats {
}
}

/// Constructs a new [`ColumnStats`] with only the [`Field`] information and no statistics.
pub fn from_field(field: Field) -> Self {
Self {
field,
null_count: None,
min_value: None,
max_value: None,
}
}

/// Constructs a new [`ColumnStats`] from a single-value Series.
pub fn from_column_literal(s: Series) -> Self {
debug_assert_eq!(s.len(), 1);
Self {
Expand All @@ -80,16 +91,33 @@ impl ColumnStats {
}
}

/// Returns the [`DataType`] of the column.
pub fn dtype(&self) -> &DataType {
self.field.data_type()
}

/// Returns the null count of each row group of the column.
pub fn get_null_count_state(&self) -> Option<&Series> {
self.null_count.as_ref()
}

/// Returns the minimum value of each row group of the column.
pub fn get_min_state(&self) -> Option<&Series> {
self.min_value.as_ref()
}

/// Returns the maximum value of each row group of the column.
pub fn get_max_state(&self) -> Option<&Series> {
self.max_value.as_ref()
}

/// Returns the null count of the column.
pub fn null_count(&self) -> Option<usize> {
match self.field.data_type() {
match self.dtype() {
#[cfg(feature = "dtype-struct")]
DataType::Struct(_) => None,
_ => {
let s = self.null_count.as_ref()?;
let s = self.get_null_count_state()?;
// if all null, there are no statistics.
if s.null_count() != s.len() {
s.sum().ok()
Expand All @@ -100,34 +128,33 @@ impl ColumnStats {
}
}

/// Returns the minimum and maximum values of the column as a single [`Series`].
pub fn to_min_max(&self) -> Option<Series> {
let max_val = self.max_value.as_ref()?;
let min_val = self.min_value.as_ref()?;
let min_val = self.get_min_state()?;
let max_val = self.get_max_state()?;
let dtype = self.dtype();

let dtype = min_val.dtype();
if !use_min_max(dtype) {
return None;
}

if Self::use_min_max(dtype) {
let mut min_max_values = min_val.clone();
min_max_values.append(max_val).unwrap();
if min_max_values.null_count() > 0 {
None
} else {
Some(min_max_values)
}
} else {
let mut min_max_values = min_val.clone();
min_max_values.append(max_val).unwrap();
if min_max_values.null_count() > 0 {
None
} else {
Some(min_max_values)
}
}

pub fn get_min_state(&self) -> Option<&Series> {
self.min_value.as_ref()
}

/// Returns the minimum value of the column as a single-value [`Series`].
///
/// Returns `None` if no maximum value is available.
pub fn to_min(&self) -> Option<&Series> {
let min_val = self.min_value.as_ref()?;
let dtype = min_val.dtype();

if !Self::use_min_max(dtype) || min_val.len() != 1 {
if !use_min_max(dtype) || min_val.len() != 1 {
return None;
}

Expand All @@ -138,11 +165,14 @@ impl ColumnStats {
}
}

/// Returns the maximum value of the column as a single-value [`Series`].
///
/// Returns `None` if no maximum value is available.
pub fn to_max(&self) -> Option<&Series> {
let max_val = self.max_value.as_ref()?;
let dtype = max_val.dtype();

if !Self::use_min_max(dtype) || max_val.len() != 1 {
if !use_min_max(dtype) || max_val.len() != 1 {
return None;
}

Expand All @@ -152,14 +182,15 @@ impl ColumnStats {
Some(max_val)
}
}
}

fn use_min_max(dtype: &DataType) -> bool {
dtype.is_numeric()
|| matches!(
dtype,
DataType::String | DataType::Binary | DataType::Boolean
)
}
/// Returns whether the [`DataType`] supports minimum/maximum operations.
fn use_min_max(dtype: &DataType) -> bool {
dtype.is_numeric()
|| matches!(
dtype,
DataType::String | DataType::Binary | DataType::Boolean
)
}

/// A collection of column stats with a known schema.
Expand All @@ -168,12 +199,14 @@ impl ColumnStats {
pub struct BatchStats {
schema: SchemaRef,
stats: Vec<ColumnStats>,
// This might not be available,
// as when prunnign hive partitions.
// This might not be available, as when pruning hive partitions.
num_rows: Option<usize>,
}

impl BatchStats {
/// Constructs a new [`BatchStats`].
///
/// The `stats` should match the order of the `schema`.
pub fn new(schema: SchemaRef, stats: Vec<ColumnStats>, num_rows: Option<usize>) -> Self {
Self {
schema,
Expand All @@ -182,19 +215,27 @@ impl BatchStats {
}
}

pub fn get_stats(&self, column: &str) -> polars_core::error::PolarsResult<&ColumnStats> {
self.schema.try_index_of(column).map(|i| &self.stats[i])
}

pub fn num_rows(&self) -> Option<usize> {
self.num_rows
}

/// Returns the [`Schema`] of the batch.
pub fn schema(&self) -> &SchemaRef {
&self.schema
}

/// Returns the [`ColumnStats`] of all columns in the batch, if known.
pub fn column_stats(&self) -> &[ColumnStats] {
self.stats.as_ref()
}

/// Returns the [`ColumnStats`] of a single column in the batch.
///
/// Returns an `Err` if no statistics are available for the given column.
pub fn get_stats(&self, column: &str) -> PolarsResult<&ColumnStats> {
self.schema.try_index_of(column).map(|i| &self.stats[i])
}

/// Returns the number of rows in the batch.
///
/// Returns `None` if the number of rows is unknown.
pub fn num_rows(&self) -> Option<usize> {
self.num_rows
}
}
3 changes: 3 additions & 0 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct ScanArgsParquet {
pub cloud_options: Option<CloudOptions>,
pub use_statistics: bool,
pub hive_partitioning: bool,
pub hive_schema: Option<SchemaRef>,
}

impl Default for ScanArgsParquet {
Expand All @@ -32,6 +33,7 @@ impl Default for ScanArgsParquet {
cloud_options: None,
use_statistics: true,
hive_partitioning: false,
hive_schema: None,
}
}
}
Expand Down Expand Up @@ -84,6 +86,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.cloud_options,
self.args.use_statistics,
self.args.hive_partitioning,
self.args.hive_schema,
)?
.build()
.into();
Expand Down
13 changes: 9 additions & 4 deletions crates/polars-plan/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ impl LogicalPlanBuilder {
row_index: None,
rechunk: false,
file_counter: Default::default(),
// TODO: Support Hive partitioning.
hive_partitioning: false,
hive_schema: None,
};

Ok(LogicalPlan::Scan {
Expand Down Expand Up @@ -148,6 +150,7 @@ impl LogicalPlanBuilder {
cloud_options: Option<CloudOptions>,
use_statistics: bool,
hive_partitioning: bool,
hive_schema: Option<SchemaRef>,
) -> PolarsResult<Self> {
use polars_io::{is_cloud_url, SerReader as _};

Expand Down Expand Up @@ -197,10 +200,8 @@ impl LogicalPlanBuilder {
(num_rows, num_rows.unwrap_or(0)),
);

// We set the hive partitions of the first path to determine the schema.
// On iteration the partition values will be re-set per file.
if hive_partitioning {
file_info.init_hive_partitions(path.as_path())?;
file_info.init_hive_partitions(path.as_path(), hive_schema.clone())?
}

let options = FileScanOptions {
Expand All @@ -211,6 +212,7 @@ impl LogicalPlanBuilder {
row_index,
file_counter: Default::default(),
hive_partitioning,
hive_schema,
};
Ok(LogicalPlan::Scan {
paths,
Expand Down Expand Up @@ -285,7 +287,9 @@ impl LogicalPlanBuilder {
rechunk,
row_index,
file_counter: Default::default(),
// TODO: Support Hive partitioning.
hive_partitioning: false,
hive_schema: None,
},
predicate: None,
scan_type: FileScan::Ipc {
Expand Down Expand Up @@ -393,8 +397,9 @@ impl LogicalPlanBuilder {
rechunk,
row_index,
file_counter: Default::default(),
// TODO! add
// TODO: Support Hive partitioning.
hive_partitioning: false,
hive_schema: None,
};
Ok(LogicalPlan::Scan {
paths,
Expand Down
Loading
Loading