diff --git a/.github/workflows/ci_typos.yml b/.github/workflows/ci_typos.yml index a293d715f..da72929dd 100644 --- a/.github/workflows/ci_typos.yml +++ b/.github/workflows/ci_typos.yml @@ -42,4 +42,4 @@ jobs: steps: - uses: actions/checkout@v4 - name: Check typos - uses: crate-ci/typos@v1.24.3 + uses: crate-ci/typos@v1.24.5 diff --git a/crates/catalog/memory/src/catalog.rs b/crates/catalog/memory/src/catalog.rs index 05038cb66..1da044821 100644 --- a/crates/catalog/memory/src/catalog.rs +++ b/crates/catalog/memory/src/catalog.rs @@ -371,7 +371,7 @@ mod tests { let expected_sorted_order = SortOrder::builder() .with_order_id(0) .with_fields(vec![]) - .build(expected_schema.clone()) + .build(expected_schema) .unwrap(); assert_eq!( diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index 115640131..60e97ab45 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -36,6 +36,10 @@ pub const S3_SECRET_ACCESS_KEY: &str = "s3.secret-access-key"; pub const S3_SESSION_TOKEN: &str = "s3.session-token"; /// S3 region. pub const S3_REGION: &str = "s3.region"; +/// Region to use for the S3 client. +/// +/// This takes precedence over [`S3_REGION`]. +pub const CLIENT_REGION: &str = "client.region"; /// S3 Path Style Access. pub const S3_PATH_STYLE_ACCESS: &str = "s3.path-style-access"; /// S3 Server Side Encryption Type. @@ -73,6 +77,9 @@ pub(crate) fn s3_config_parse(mut m: HashMap) -> Result UnboundPartitionSpec { + pub fn into_unbound(self) -> UnboundPartitionSpec { self.into() } + + /// Check if this partition spec is compatible with another partition spec. + /// + /// Returns true if the partition spec is equal to the other spec with partition field ids ignored and + /// spec_id ignored. The following must be identical: + /// * The number of fields + /// * Field order + /// * Field names + /// * Source column ids + /// * Transforms + pub fn is_compatible_with(&self, other: &UnboundPartitionSpec) -> bool { + if self.fields.len() != other.fields.len() { + return false; + } + + for (this_field, other_field) in self.fields.iter().zip(&other.fields) { + if this_field.source_id != other_field.source_id + || this_field.transform != other_field.transform + || this_field.name != other_field.name + { + return false; + } + } + + true + } + + /// Check if this partition spec has sequential partition ids. + /// Sequential ids start from 1000 and increment by 1 for each field. + /// This is required for spec version 1 + pub fn has_sequential_ids(&self) -> bool { + for (index, field) in self.fields.iter().enumerate() { + let expected_id = (UNPARTITIONED_LAST_ASSIGNED_ID as i64) + .checked_add(1) + .and_then(|id| id.checked_add(index as i64)) + .unwrap_or(i64::MAX); + + if field.field_id as i64 != expected_id { + return false; + } + } + + true + } + + /// Get the highest field id in the partition spec. + /// If the partition spec is unpartitioned, it returns the last unpartitioned last assigned id (999). + pub fn highest_field_id(&self) -> i32 { + self.fields + .iter() + .map(|f| f.field_id) + .max() + .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID) + } } /// Reference to [`UnboundPartitionSpec`]. @@ -171,6 +225,14 @@ impl UnboundPartitionSpec { pub fn fields(&self) -> &[UnboundPartitionField] { &self.fields } + + /// Change the spec id of the partition spec + pub fn with_spec_id(self, spec_id: i32) -> Self { + Self { + spec_id: Some(spec_id), + ..self + } + } } impl From for UnboundPartitionField { @@ -1263,4 +1325,354 @@ mod tests { }] }); } + + #[test] + fn test_is_compatible_with() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_transform_different() { + let schema = Schema::builder() + .with_fields(vec![NestedField::required( + 1, + "id", + Type::Primitive(crate::spec::PrimitiveType::Int), + ) + .into()]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(32), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_source_id_different() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_not_compatible_with_order_different() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec_1 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + let partition_spec_2 = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: None, + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: None, + name: "id_bucket".to_string(), + transform: Transform::Bucket(16), + }) + .unwrap() + .build() + .unwrap(); + + assert!(!partition_spec_1.is_compatible_with(&partition_spec_2.into_unbound())); + } + + #[test] + fn test_highest_field_id_unpartitioned() { + let spec = PartitionSpec::builder(&Schema::builder().with_fields(vec![]).build().unwrap()) + .with_spec_id(1) + .build() + .unwrap(); + + assert_eq!(UNPARTITIONED_LAST_ASSIGNED_ID, spec.highest_field_id()); + } + + #[test] + fn test_highest_field_id() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1001), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1000), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1001, spec.highest_field_id()); + } + + #[test] + fn test_has_sequential_ids() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1000), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1001), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1000, spec.fields[0].field_id); + assert_eq!(1001, spec.fields[1].field_id); + assert!(spec.has_sequential_ids()); + } + + #[test] + fn test_sequential_ids_must_start_at_1000() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(999), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1000), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(999, spec.fields[0].field_id); + assert_eq!(1000, spec.fields[1].field_id); + assert!(!spec.has_sequential_ids()); + } + + #[test] + fn test_sequential_ids_must_have_no_gaps() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(crate::spec::PrimitiveType::Int)) + .into(), + NestedField::required( + 2, + "name", + Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let spec = PartitionSpec::builder(&schema) + .with_spec_id(1) + .add_unbound_field(UnboundPartitionField { + source_id: 1, + field_id: Some(1000), + name: "id".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .add_unbound_field(UnboundPartitionField { + source_id: 2, + field_id: Some(1002), + name: "name".to_string(), + transform: Transform::Identity, + }) + .unwrap() + .build() + .unwrap(); + + assert_eq!(1000, spec.fields[0].field_id); + assert_eq!(1002, spec.fields[1].field_id); + assert!(!spec.has_sequential_ids()); + } } diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 7b24aa25f..d0fa3f25d 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -117,11 +117,9 @@ impl SchemaBuilder { /// Builds the schema. pub fn build(mut self) -> Result { - let mut highest_field_id = None; if let Some(start_from) = self.reassign_field_ids_from { let mut id_reassigner = ReassignFieldIds::new(start_from); self.fields = id_reassigner.reassign_field_ids(self.fields); - highest_field_id = Some(id_reassigner.next_field_id - 1); self.identifier_field_ids = id_reassigner.apply_to_identifier_fields(self.identifier_field_ids)?; @@ -132,8 +130,6 @@ impl SchemaBuilder { let r#struct = StructType::new(self.fields); let id_to_field = index_by_id(&r#struct)?; - let highest_field_id = - highest_field_id.unwrap_or(id_to_field.keys().max().cloned().unwrap_or(0)); Self::validate_identifier_ids( &r#struct, @@ -152,12 +148,13 @@ impl SchemaBuilder { .map(|(k, v)| (k.to_lowercase(), *v)) .collect(); + let highest_field_id = id_to_field.keys().max().cloned().unwrap_or(0); + Ok(Schema { r#struct, schema_id: self.schema_id, highest_field_id, identifier_field_ids: self.identifier_field_ids, - alias_to_id: self.alias_to_id, id_to_field, @@ -1477,12 +1474,6 @@ table { assert_eq!(original_schema, schema); } - #[test] - fn test_highest_field_id() { - let schema = table_schema_nested(); - assert_eq!(17, schema.highest_field_id()); - } - #[test] fn test_schema_index_by_name() { let expected_name_to_id = HashMap::from( @@ -2378,6 +2369,21 @@ table { assert_eq!(result.unwrap(), Type::Struct(schema.as_struct().clone())); } + #[test] + fn test_highest_field_id() { + let schema = table_schema_nested(); + assert_eq!(17, schema.highest_field_id()); + + let schema = table_schema_simple().0; + assert_eq!(3, schema.highest_field_id()); + } + + #[test] + fn test_highest_field_id_no_fields() { + let schema = Schema::builder().with_schema_id(1).build().unwrap(); + assert_eq!(0, schema.highest_field_id()); + } + #[test] fn test_reassign_ids() { let schema = Schema::builder() diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 704a43b5f..f42e736ea 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -22,16 +22,19 @@ use std::collections::HashMap; use std::sync::Arc; use _serde::SnapshotV2; -use chrono::{DateTime, TimeZone, Utc}; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; -use crate::error::Result; +use crate::error::{timestamp_ms_to_utc, Result}; use crate::io::FileIO; use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; use crate::{Error, ErrorKind}; +/// The ref name of the main branch of the table. +pub const MAIN_BRANCH: &str = "main"; + /// Reference to [`Snapshot`]. pub type SnapshotRef = Arc; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] @@ -125,8 +128,14 @@ impl Snapshot { } /// Get the timestamp of when the snapshot was created #[inline] - pub fn timestamp(&self) -> DateTime { - Utc.timestamp_millis_opt(self.timestamp_ms).unwrap() + pub fn timestamp(&self) -> Result> { + timestamp_ms_to_utc(self.timestamp_ms) + } + + /// Get the timestamp of when the snapshot was created in milliseconds + #[inline] + pub fn timestamp_ms(&self) -> i64 { + self.timestamp_ms } /// Get the schema id of this snapshot. @@ -386,8 +395,9 @@ mod tests { assert_eq!(3051729675574597004, result.snapshot_id()); assert_eq!( Utc.timestamp_millis_opt(1515100955770).unwrap(), - result.timestamp() + result.timestamp().unwrap() ); + assert_eq!(1515100955770, result.timestamp_ms()); assert_eq!( Summary { operation: Operation::Append, diff --git a/crates/iceberg/src/spec/sort.rs b/crates/iceberg/src/spec/sort.rs index 82909344a..5e50a175c 100644 --- a/crates/iceberg/src/spec/sort.rs +++ b/crates/iceberg/src/spec/sort.rs @@ -133,6 +133,14 @@ impl SortOrder { pub fn is_unsorted(&self) -> bool { self.fields.is_empty() } + + /// Set the order id for the sort order + pub fn with_order_id(self, order_id: i64) -> SortOrder { + SortOrder { + order_id, + fields: self.fields, + } + } } impl SortOrderBuilder { @@ -160,13 +168,13 @@ impl SortOrderBuilder { } /// Creates a new bound sort order. - pub fn build(&self, schema: Schema) -> Result { + pub fn build(&self, schema: &Schema) -> Result { let unbound_sort_order = self.build_unbound()?; SortOrderBuilder::check_compatibility(unbound_sort_order, schema) } /// Returns the given sort order if it is compatible with the given schema - fn check_compatibility(sort_order: SortOrder, schema: Schema) -> Result { + fn check_compatibility(sort_order: SortOrder, schema: &Schema) -> Result { let sort_fields = &sort_order.fields; for sort_field in sort_fields { match schema.field_by_id(sort_field.source_id) { @@ -290,6 +298,35 @@ mod tests { ) } + #[test] + fn test_build_unbound_returns_correct_default_order_id_for_no_fields() { + assert_eq!( + SortOrder::builder() + .build_unbound() + .expect("Expected an Ok value") + .order_id, + SortOrder::UNSORTED_ORDER_ID + ) + } + + #[test] + fn test_build_unbound_returns_correct_default_order_id_for_fields() { + let sort_field = SortField::builder() + .source_id(2) + .direction(SortDirection::Ascending) + .null_order(NullOrder::First) + .transform(Transform::Identity) + .build(); + assert_ne!( + SortOrder::builder() + .with_sort_field(sort_field.clone()) + .build_unbound() + .expect("Expected an Ok value") + .order_id, + SortOrder::UNSORTED_ORDER_ID + ) + } + #[test] fn test_build_unbound_should_return_unsorted_sort_order() { assert_eq!( @@ -367,7 +404,7 @@ mod tests { .transform(Transform::Identity) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -406,7 +443,7 @@ mod tests { .transform(Transform::Identity) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -438,7 +475,7 @@ mod tests { .transform(Transform::Year) .build(), ) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result @@ -468,7 +505,7 @@ mod tests { let sort_order_builder_result = SortOrder::builder() .with_sort_field(sort_field.clone()) - .build(schema); + .build(&schema); assert_eq!( sort_order_builder_result.expect("Expected an Ok value"), diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index dacd5bcd7..16deaac22 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -258,7 +258,7 @@ impl TableMetadata { /// Append snapshot to table pub fn append_snapshot(&mut self, snapshot: Snapshot) { - self.last_updated_ms = snapshot.timestamp().timestamp_millis(); + self.last_updated_ms = snapshot.timestamp_ms(); self.last_sequence_number = snapshot.sequence_number(); self.refs