Skip to content

Commit

Permalink
feat: Partition Binding and safe PartitionSpecBuilder (#491)
Browse files Browse the repository at this point in the history
* Initial commit

* Fixes

* Replace UnboundPartitionSpec Builder

* Fix tests, allow year, month day partition

* Comments

* typos

* Fix UnboundBuild setting partition_id

* Add test for unbound spec without partition ids

* Fix into_unbound fn name

* Split bound & unbound Partition builder, change add_partition_fields

* Improve comment

* Fix fmt

* Review fixes

* Remove partition_names() HashSet creation
  • Loading branch information
c-thiel authored Aug 14, 2024
1 parent 4434909 commit 9862026
Show file tree
Hide file tree
Showing 9 changed files with 945 additions and 173 deletions.
3 changes: 1 addition & 2 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,8 @@ mod tests {

assert_eq!(metadata.current_schema().as_ref(), expected_schema);

let expected_partition_spec = PartitionSpec::builder()
let expected_partition_spec = PartitionSpec::builder(expected_schema)
.with_spec_id(0)
.with_fields(vec![])
.build()
.unwrap();

Expand Down
6 changes: 3 additions & 3 deletions crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1467,13 +1467,13 @@ mod tests {
.properties(HashMap::from([("owner".to_string(), "testx".to_string())]))
.partition_spec(
UnboundPartitionSpec::builder()
.with_fields(vec![UnboundPartitionField::builder()
.add_partition_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.transform(Transform::Truncate(3))
.name("id".to_string())
.build()])
.build()
.unwrap(),
.unwrap()
.build(),
)
.sort_order(
SortOrder::builder()
Expand Down
34 changes: 9 additions & 25 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub struct TableCreation {
/// The schema of the table.
pub schema: Schema,
/// The partition spec of the table, could be None.
#[builder(default, setter(strip_option))]
#[builder(default, setter(strip_option, into))]
pub partition_spec: Option<UnboundPartitionSpec>,
/// The sort order of the table.
#[builder(default, setter(strip_option))]
Expand Down Expand Up @@ -476,7 +476,7 @@ mod tests {
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
TableMetadataBuilder, Transform, Type, UnboundPartitionField, UnboundPartitionSpec,
TableMetadataBuilder, Transform, Type, UnboundPartitionSpec,
};
use crate::{NamespaceIdent, TableCreation, TableIdent, TableRequirement, TableUpdate};

Expand Down Expand Up @@ -820,29 +820,13 @@ mod tests {
"#,
TableUpdate::AddSpec {
spec: UnboundPartitionSpec::builder()
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(4)
.name("ts_day".to_string())
.transform(Transform::Day)
.build(),
)
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(1)
.name("id_bucket".to_string())
.transform(Transform::Bucket(16))
.build(),
)
.with_unbound_partition_field(
UnboundPartitionField::builder()
.source_id(2)
.name("id_truncate".to_string())
.transform(Transform::Truncate(4))
.build(),
)
.build()
.unwrap(),
.add_partition_field(4, "ts_day".to_string(), Transform::Day)
.unwrap()
.add_partition_field(1, "id_bucket".to_string(), Transform::Bucket(16))
.unwrap()
.add_partition_field(2, "id_truncate".to_string(), Transform::Truncate(4))
.unwrap()
.build(),
},
);
}
Expand Down
14 changes: 8 additions & 6 deletions crates/iceberg/src/expr/visitors/expression_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,9 @@ mod tests {
UnaryExpression,
};
use crate::spec::{
DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionField,
PartitionSpec, PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type,
DataContentType, DataFile, DataFileFormat, Datum, Literal, NestedField, PartitionSpec,
PartitionSpecRef, PrimitiveType, Schema, SchemaRef, Struct, Transform, Type,
UnboundPartitionField,
};
use crate::Result;

Expand All @@ -274,14 +275,15 @@ mod tests {
))])
.build()?;

let spec = PartitionSpec::builder()
let spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.field_id(1)
.partition_id(1)
.transform(Transform::Identity)
.build()])
.unwrap()
.build()
.unwrap();

Expand All @@ -298,7 +300,7 @@ mod tests {
let partition_fields = partition_type.fields().to_owned();

let partition_schema = Schema::builder()
.with_schema_id(partition_spec.spec_id)
.with_schema_id(partition_spec.spec_id())
.with_fields(partition_fields)
.build()?;

Expand Down
11 changes: 6 additions & 5 deletions crates/iceberg/src/expr/visitors/inclusive_metrics_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,8 @@ mod test {
UnaryExpression,
};
use crate::spec::{
DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionField,
PartitionSpec, PrimitiveType, Schema, Struct, Transform, Type,
DataContentType, DataFile, DataFileFormat, Datum, NestedField, PartitionSpec,
PrimitiveType, Schema, Struct, Transform, Type, UnboundPartitionField,
};

const INT_MIN_VALUE: i32 = 30;
Expand Down Expand Up @@ -1656,14 +1656,15 @@ mod test {
.unwrap();
let table_schema_ref = Arc::new(table_schema);

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&table_schema_ref)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.field_id(1)
.partition_id(1)
.transform(Transform::Identity)
.build()])
.unwrap()
.build()
.unwrap();
let partition_spec_ref = Arc::new(partition_spec);
Expand Down
110 changes: 59 additions & 51 deletions crates/iceberg/src/expr/visitors/inclusive_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl InclusiveProjection {
fn get_parts_for_field_id(&mut self, field_id: i32) -> &Vec<PartitionField> {
if let std::collections::hash_map::Entry::Vacant(e) = self.cached_parts.entry(field_id) {
let mut parts: Vec<PartitionField> = vec![];
for partition_spec_field in &self.partition_spec.fields {
for partition_spec_field in self.partition_spec.fields() {
if partition_spec_field.source_id == field_id {
parts.push(partition_spec_field.clone())
}
Expand Down Expand Up @@ -236,6 +236,7 @@ mod tests {
use crate::expr::{Bind, Predicate, Reference};
use crate::spec::{
Datum, NestedField, PartitionField, PartitionSpec, PrimitiveType, Schema, Transform, Type,
UnboundPartitionField,
};

fn build_test_schema() -> Schema {
Expand Down Expand Up @@ -265,9 +266,8 @@ mod tests {
fn test_inclusive_projection_logic_ops() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![])
.build()
.unwrap();

Expand Down Expand Up @@ -296,14 +296,17 @@ mod tests {
fn test_inclusive_projection_identity_transform() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.source_id(1)
.name("a".to_string())
.field_id(1)
.transform(Transform::Identity)
.build()])
.add_unbound_field(
UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.partition_id(1)
.transform(Transform::Identity)
.build(),
)
.unwrap()
.build()
.unwrap();

Expand All @@ -330,30 +333,29 @@ mod tests {
fn test_inclusive_projection_date_transforms() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
.with_spec_id(1)
.with_fields(vec![
PartitionField::builder()
.source_id(2)
.name("year".to_string())
.field_id(2)
.transform(Transform::Year)
.build(),
PartitionField::builder()
.source_id(2)
.name("month".to_string())
.field_id(2)
.transform(Transform::Month)
.build(),
PartitionField::builder()
.source_id(2)
.name("day".to_string())
.field_id(2)
.transform(Transform::Day)
.build(),
])
.build()
.unwrap();
let partition_spec = PartitionSpec {
spec_id: 1,
fields: vec![
PartitionField {
source_id: 2,
name: "year".to_string(),
field_id: 1000,
transform: Transform::Year,
},
PartitionField {
source_id: 2,
name: "month".to_string(),
field_id: 1001,
transform: Transform::Month,
},
PartitionField {
source_id: 2,
name: "day".to_string(),
field_id: 1002,
transform: Transform::Day,
},
],
};

let arc_schema = Arc::new(schema);
let arc_partition_spec = Arc::new(partition_spec);
Expand All @@ -378,14 +380,17 @@ mod tests {
fn test_inclusive_projection_truncate_transform() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.source_id(3)
.name("name".to_string())
.field_id(3)
.transform(Transform::Truncate(4))
.build()])
.add_unbound_field(
UnboundPartitionField::builder()
.source_id(3)
.name("name_truncate".to_string())
.partition_id(3)
.transform(Transform::Truncate(4))
.build(),
)
.unwrap()
.build()
.unwrap();

Expand All @@ -398,15 +403,15 @@ mod tests {

// applying InclusiveProjection to bound_predicate
// should result in the 'name STARTS WITH "Testy McTest"'
// predicate being transformed to 'name STARTS WITH "Test"',
// predicate being transformed to 'name_truncate STARTS WITH "Test"',
// since a `Truncate(4)` partition will map values of
// name that start with "Testy McTest" into a partition
// for values of name that start with the first four letters
// of that, ie "Test".
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "name STARTS WITH \"Test\"".to_string();
let expected = "name_truncate STARTS WITH \"Test\"".to_string();

assert_eq!(result.to_string(), expected)
}
Expand All @@ -415,14 +420,17 @@ mod tests {
fn test_inclusive_projection_bucket_transform() {
let schema = build_test_schema();

let partition_spec = PartitionSpec::builder()
let partition_spec = PartitionSpec::builder(&schema)
.with_spec_id(1)
.with_fields(vec![PartitionField::builder()
.source_id(1)
.name("a".to_string())
.field_id(1)
.transform(Transform::Bucket(7))
.build()])
.add_unbound_field(
UnboundPartitionField::builder()
.source_id(1)
.name("a_bucket[7]".to_string())
.partition_id(1)
.transform(Transform::Bucket(7))
.build(),
)
.unwrap()
.build()
.unwrap();

Expand All @@ -440,7 +448,7 @@ mod tests {
let mut inclusive_projection = InclusiveProjection::new(arc_partition_spec.clone());
let result = inclusive_projection.project(&bound_predicate).unwrap();

let expected = "a = 2".to_string();
let expected = "a_bucket[7] = 2".to_string();

assert_eq!(result.to_string(), expected)
}
Expand Down
8 changes: 4 additions & 4 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,14 @@ impl ManifestWriter {
)?;
avro_writer.add_user_metadata(
"partition-spec".to_string(),
to_vec(&manifest.metadata.partition_spec.fields).map_err(|err| {
to_vec(&manifest.metadata.partition_spec.fields()).map_err(|err| {
Error::new(ErrorKind::DataInvalid, "Fail to serialize partition spec")
.with_source(err)
})?,
)?;
avro_writer.add_user_metadata(
"partition-spec-id".to_string(),
manifest.metadata.partition_spec.spec_id.to_string(),
manifest.metadata.partition_spec.spec_id().to_string(),
)?;
avro_writer.add_user_metadata(
"format-version".to_string(),
Expand Down Expand Up @@ -300,12 +300,12 @@ impl ManifestWriter {
self.output.write(Bytes::from(content)).await?;

let partition_summary =
self.get_field_summary_vec(&manifest.metadata.partition_spec.fields);
self.get_field_summary_vec(manifest.metadata.partition_spec.fields());

Ok(ManifestFile {
manifest_path: self.output.location().to_string(),
manifest_length: length as i64,
partition_spec_id: manifest.metadata.partition_spec.spec_id,
partition_spec_id: manifest.metadata.partition_spec.spec_id(),
content: manifest.metadata.content,
// sequence_number and min_sequence_number with UNASSIGNED_SEQUENCE_NUMBER will be replace with
// real sequence number in `ManifestListWriter`.
Expand Down
Loading

0 comments on commit 9862026

Please sign in to comment.