From ced661f72532a9fe9ce8284461aadef1128cfbb8 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sun, 8 Sep 2024 22:43:38 +0800 Subject: [PATCH 1/6] chore(deps): Bump crate-ci/typos from 1.24.3 to 1.24.5 (#616) Bumps [crate-ci/typos](https://github.com/crate-ci/typos) from 1.24.3 to 1.24.5. - [Release notes](https://github.com/crate-ci/typos/releases) - [Changelog](https://github.com/crate-ci/typos/blob/master/CHANGELOG.md) - [Commits](https://github.com/crate-ci/typos/compare/v1.24.3...v1.24.5) --- updated-dependencies: - dependency-name: crate-ci/typos dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/ci_typos.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci_typos.yml b/.github/workflows/ci_typos.yml index a293d715f..da72929dd 100644 --- a/.github/workflows/ci_typos.yml +++ b/.github/workflows/ci_typos.yml @@ -42,4 +42,4 @@ jobs: steps: - uses: actions/checkout@v4 - name: Check typos - uses: crate-ci/typos@v1.24.3 + uses: crate-ci/typos@v1.24.5 From ede472051aa67cd3f7d8712a5c8291dca1a91fae Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 8 Sep 2024 16:49:39 +0200 Subject: [PATCH 2/6] fix: Less Panics for Snapshot timestamps (#614) --- crates/iceberg/src/spec/snapshot.rs | 20 +++++++++++++++----- crates/iceberg/src/spec/table_metadata.rs | 2 +- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 704a43b5f..f42e736ea 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -22,16 +22,19 @@ use std::collections::HashMap; use std::sync::Arc; use _serde::SnapshotV2; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; -use crate::error::Result; +use crate::error::{timestamp_ms_to_utc, Result}; use crate::io::FileIO; use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; use crate::{Error, ErrorKind}; +/// The ref name of the main branch of the table. +pub const MAIN_BRANCH: &str = "main"; + /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] @@ -125,8 +128,14 @@ impl Snapshot { } /// Get the timestamp of when the snapshot was created #[inline] - pub fn timestamp(&self) -> DateTime { - Utc.timestamp_millis_opt(self.timestamp_ms).unwrap() + pub fn timestamp(&self) -> Result> { + timestamp_ms_to_utc(self.timestamp_ms) + } + + /// Get the timestamp of when the snapshot was created in milliseconds + #[inline] + pub fn timestamp_ms(&self) -> i64 { + self.timestamp_ms } /// Get the schema id of this snapshot. @@ -386,8 +395,9 @@ mod tests { assert_eq!(3051729675574597004, result.snapshot_id()); assert_eq!( Utc.timestamp_millis_opt(1515100955770).unwrap(), - result.timestamp() + result.timestamp().unwrap() ); + assert_eq!(1515100955770, result.timestamp_ms()); assert_eq!( Summary { operation: Operation::Append, diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index dacd5bcd7..16deaac22 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -258,7 +258,7 @@ impl TableMetadata { /// Append snapshot to table pub fn append_snapshot(&mut self, snapshot: Snapshot) { - self.last_updated_ms = snapshot.timestamp().timestamp_millis(); + self.last_updated_ms = snapshot.timestamp_ms(); self.last_sequence_number = snapshot.sequence_number(); self.refs From 58123990c304b72139651a16f45f6505bc868cc3 Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 8 Sep 2024 18:18:41 +0200 Subject: [PATCH 3/6] feat: partition compatibility (#612) * Partition compatability * Partition compatability * Rename compatible_with -> is_compatible_with --- crates/iceberg/src/spec/partition.rs | 414 ++++++++++++++++++++++++++- 1 file changed, 413 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/spec/partition.rs b/crates/iceberg/src/spec/partition.rs index f262a8c7c..36763df7e 100644 --- a/crates/iceberg/src/spec/partition.rs +++ b/crates/iceberg/src/spec/partition.rs @@ -118,9 +118,63 @@ impl PartitionSpec { /// Turn this partition spec into an unbound partition spec. /// /// The `field_id` is retained as `partition_id` in the unbound partition spec. - pub fn to_unbound(self) -> UnboundPartitionSpec { + pub fn into_unbound(self) -> UnboundPartitionSpec { self.into() } + + /// Check if this partition spec is compatible with another partition spec. + /// + /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and + /// spec_id ignored. The following must be identical: + /// * The number of fields + /// * Field order + /// * Field names + /// * Source column ids + /// * Transforms + pub fn is_compatible_with(&self, other: &UnboundPartitionSpec) -> bool { + if self.fields.len() != other.fields.len() { + return false; + } + + for (this_field, other_field) in self.fields.iter().zip(&other.fields) { + if this_field.source_id != other_field.source_id + || this_field.transform != other_field.transform + || this_field.name != other_field.name + { + return false; + } + } + + true + } + + /// Check if this partition spec has sequential partition ids. + /// Sequential ids start from 1000 and increment by 1 for each field. + /// This is required for spec version 1 + pub fn has_sequential_ids(&self) -> bool { + for (index, field) in self.fields.iter().enumerate() { + let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64) + .checked_add(1) + .and_then(|id| id.checked_add(index as i64)) + .unwrap_or(i64::MAX); + + if field.field_id as i64 != expected_id { + return false; + } + } + + true + } + + /// Get the highest field id in the partition spec. + /// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999). + pub fn highest_field_id(&self) -> i32 { + self.fields + .iter() + .map(|f| f.field_id) + .max() + .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID) + } } /// Reference to [`UnboundPartitionSpec`]. @@ -171,6 +225,14 @@ impl UnboundPartitionSpec { pub fn fields(&self) -> &[UnboundPartitionField] { &self.fields } + + /// Change the spec id of the partition spec + pub fn with_spec_id(self, spec_id: i32) -> Self { + Self { + spec_id: Some(spec_id), + ..self + } + } } impl From for UnboundPartitionField { @@ -1263,4 +1325,354 @@ mod tests { }] }); } + + #[test] + fn test_is_compatible_with() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_transform_different() { + let schema = Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "id", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(32), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_source_id_different() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_order_different() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_highest_field_id_unpartitioned() { + let spec = PartitionSpec::builder(&Schema::builder().with_fields(vec![]).build().unwrap()) + .with_spec_id(1) + .build() + .unwrap(); + + assert_eq!(UNPARTITIONED_LAST_ASSIGNED_ID, spec.highest_field_id()); + } + + #[test] + fn test_highest_field_id() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1001), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1000), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1001, spec.highest_field_id()); + } + + #[test] + fn test_has_sequential_ids() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1000), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1001), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1000, spec.fields[0].field_id); + assert_eq!(1001, spec.fields[1].field_id); + assert!(spec.has_sequential_ids()); + } + + #[test] + fn test_sequential_ids_must_start_at_1000() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(999), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1000), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(999, spec.fields[0].field_id); + assert_eq!(1000, spec.fields[1].field_id); + assert!(!spec.has_sequential_ids()); + } + + #[test] + fn test_sequential_ids_must_have_no_gaps() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1000), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1002), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1000, spec.fields[0].field_id); + assert_eq!(1002, spec.fields[1].field_id); + assert!(!spec.has_sequential_ids()); + } } From a5aba9a02c06494d9614a5a556bdcde5871829ef Mon Sep 17 00:00:00 2001 From: Christian Date: Sun, 8 Sep 2024 18:36:05 +0200 Subject: [PATCH 4/6] feat: SortOrder methods should take schema ref if possible (#613) * SortOrder methods should take schema ref if possible * Fix test type * with_order_id should not take reference --- crates/catalog/memory/src/catalog.rs | 2 +- crates/iceberg/src/spec/sort.rs | 49 ++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 05038cb66..1da044821 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -371,7 +371,7 @@ mod tests { let expected_sorted_order = SortOrder::builder() .with_order_id(0) .with_fields(vec![]) - .build(expected_schema.clone()) + .build(expected_schema) .unwrap(); assert_eq!( diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index 82909344a..5e50a175c 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -133,6 +133,14 @@ impl SortOrder { pub fn is_unsorted(&self) -> bool { self.fields.is_empty() } + + /// Set the order id for the sort order + pub fn with_order_id(self, order_id: i64) -> SortOrder { + SortOrder { + order_id, + fields: self.fields, + } + } } impl SortOrderBuilder { @@ -160,13 +168,13 @@ impl SortOrderBuilder { } /// Creates a new bound sort order. - pub fn build(&self, schema: Schema) -> Result { + pub fn build(&self, schema: &Schema) -> Result { let unbound_sort_order = self.build_unbound()?; SortOrderBuilder::check_compatibility(unbound_sort_order, schema) } /// Returns the given sort order if it is compatible with the given schema - fn check_compatibility(sort_order: SortOrder, schema: Schema) -> Result { + fn check_compatibility(sort_order: SortOrder, schema: &Schema) -> Result { let sort_fields = &sort_order.fields; for sort_field in sort_fields { match schema.field_by_id(sort_field.source_id) { @@ -290,6 +298,35 @@ mod tests { ) } + #[test] + fn test_build_unbound_returns_correct_default_order_id_for_no_fields() { + assert_eq!( + SortOrder::builder() + .build_unbound() + .expect("Expected an Ok value") + .order_id, + SortOrder::UNSORTED_ORDER_ID + ) + } + + #[test] + fn test_build_unbound_returns_correct_default_order_id_for_fields() { + let sort_field = SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(); + assert_ne!( + SortOrder::builder() + .with_sort_field(sort_field.clone()) + .build_unbound() + .expect("Expected an Ok value") + .order_id, + SortOrder::UNSORTED_ORDER_ID + ) + } + #[test] fn test_build_unbound_should_return_unsorted_sort_order() { assert_eq!( @@ -367,7 +404,7 @@ mod tests { .transform(Transform::Identity) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -406,7 +443,7 @@ mod tests { .transform(Transform::Identity) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -438,7 +475,7 @@ mod tests { .transform(Transform::Year) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -468,7 +505,7 @@ mod tests { let sort_order_builder_result = SortOrder::builder() .with_sort_field(sort_field.clone()) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result.expect("Expected an Ok value"), From f78c59b19bbcf4f8c2f2be500e0eb8d2883600ca Mon Sep 17 00:00:00 2001 From: Jack <56563911+jdockerty@users.noreply.github.com> Date: Mon, 9 Sep 2024 03:35:16 +0100 Subject: [PATCH 5/6] feat: add `client.region` (#623) --- crates/iceberg/src/io/storage_s3.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 115640131..60e97ab45 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -36,6 +36,10 @@ pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key"; pub const S3_SESSION_TOKEN: &str = "s3.session-token"; /// S3 region. pub const S3_REGION: &str = "s3.region"; +/// Region to use for the S3 client. +/// +/// This takes precedence over [`S3_REGION`]. +pub const CLIENT_REGION: &str = "client.region"; /// S3 Path Style Access. pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access"; /// S3 Server Side Encryption Type. @@ -73,6 +77,9 @@ pub(crate) fn s3_config_parse(mut m: HashMap) -> Result Date: Mon, 9 Sep 2024 11:57:22 +0800 Subject: [PATCH 6/6] fix: Correctly calculate highest_field_id in schema (#590) --- crates/iceberg/src/spec/schema.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 106bfb1d8..63a9e3cb4 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -106,8 +106,6 @@ impl SchemaBuilder { /// Builds the schema. pub fn build(self) -> Result { - let highest_field_id = self.fields.iter().map(|f| f.id).max().unwrap_or(0); - let field_id_to_accessor = self.build_accessors(); let r#struct = StructType::new(self.fields); @@ -130,12 +128,13 @@ impl SchemaBuilder { .map(|(k, v)| (k.to_lowercase(), *v)) .collect(); + let highest_field_id = id_to_field.keys().max().cloned().unwrap_or(0); + Ok(Schema { r#struct, schema_id: self.schema_id, highest_field_id, identifier_field_ids: self.identifier_field_ids, - alias_to_id: self.alias_to_id, id_to_field, @@ -2229,4 +2228,13 @@ table { assert!(result.is_ok()); assert_eq!(result.unwrap(), Type::Struct(schema.as_struct().clone())); } + + #[test] + fn test_highest_field_id() { + let schema = table_schema_nested(); + assert_eq!(17, schema.highest_field_id()); + + let schema = table_schema_simple().0; + assert_eq!(3, schema.highest_field_id()); + } }