From 0bbe8689482529a78f1e9ccf66acea6b48a944a6 Mon Sep 17 00:00:00 2001 From: Christian Thiel Date: Wed, 25 Sep 2024 12:24:19 +0200 Subject: [PATCH] Address comments --- crates/iceberg/src/spec/schema.rs | 133 +++++++++++++++++++++--------- 1 file changed, 92 insertions(+), 41 deletions(-) diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index d0fa3f25d..430744ba9 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -88,12 +88,12 @@ impl SchemaBuilder { } /// Reassign all field-ids (nested) on build. - /// If `start_from` is provided, it will start reassigning from that id (inclusive). - /// If not provided, it will start from 0. + /// Reassignment starts from the field-id specified in `start_from` (inclusive). /// /// All specified aliases and identifier fields will be updated to the new field-ids. - pub fn with_reassigned_field_ids(mut self, start_from: Option) -> Self { - self.reassign_field_ids_from = Some(start_from.unwrap_or(0)); + #[allow(dead_code)] // Will be needed in TableMetadataBuilder + pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self { + self.reassign_field_ids_from = Some(start_from.try_into().unwrap_or(i32::MAX)); self } @@ -116,16 +116,7 @@ impl SchemaBuilder { } /// Builds the schema. - pub fn build(mut self) -> Result { - if let Some(start_from) = self.reassign_field_ids_from { - let mut id_reassigner = ReassignFieldIds::new(start_from); - self.fields = id_reassigner.reassign_field_ids(self.fields); - - self.identifier_field_ids = - id_reassigner.apply_to_identifier_fields(self.identifier_field_ids)?; - self.alias_to_id = id_reassigner.apply_to_aliases(self.alias_to_id)?; - } - + pub fn build(self) -> Result { let field_id_to_accessor = self.build_accessors(); let r#struct = StructType::new(self.fields); @@ -150,7 +141,7 @@ impl SchemaBuilder { let highest_field_id = id_to_field.keys().max().cloned().unwrap_or(0); - Ok(Schema { + let mut schema = Schema { r#struct, schema_id: self.schema_id, highest_field_id, @@ -163,7 +154,26 @@ impl SchemaBuilder { id_to_name, field_id_to_accessor, - }) + }; + + if let Some(start_from) = self.reassign_field_ids_from { + let mut id_reassigner = ReassignFieldIds::new(start_from); + let mew_fields = id_reassigner.reassign_field_ids(schema.r#struct.fields().to_vec())?; + let new_identifier_field_ids = + id_reassigner.apply_to_identifier_fields(schema.identifier_field_ids)?; + let new_alias_to_id = id_reassigner.apply_to_aliases(schema.alias_to_id.clone())?; + + schema = SchemaBuilder { + schema_id: schema.schema_id, + fields: mew_fields, + alias_to_id: new_alias_to_id, + identifier_field_ids: new_identifier_field_ids, + reassign_field_ids_from: None, + } + .build()?; + } + + Ok(schema) } fn build_accessors(&self) -> HashMap> { @@ -980,74 +990,95 @@ impl ReassignFieldIds { } } - fn reassign_field_ids(&mut self, fields: Vec) -> Vec { + fn reassign_field_ids(&mut self, fields: Vec) -> Result> { // Visit fields on the same level first let outer_fields = fields .into_iter() .map(|field| { - self.old_to_new_id.insert(field.id, self.next_field_id); + self + .old_to_new_id + .insert(field.id, self.next_field_id) + .map_or_else(|| Ok(()), |_| Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Error reassigning field ids: Found duplicate 'field.id' {}. Field ids must be unique.", + field.id + )) + ))?; + let new_field = Arc::unwrap_or_clone(field).with_id(self.next_field_id); - self.next_field_id += 1; - Arc::new(new_field) + self.increase_next_field_id()?; + Ok(Arc::new(new_field)) }) - .collect::>(); + .collect::>>()?; // Now visit nested fields outer_fields .into_iter() .map(|field| { if field.field_type.is_primitive() { - field + Ok(field) } else { let mut new_field = Arc::unwrap_or_clone(field); - *new_field.field_type = self.reassign_ids_visit_type(*new_field.field_type); - Arc::new(new_field) + *new_field.field_type = self.reassign_ids_visit_type(*new_field.field_type)?; + Ok(Arc::new(new_field)) } }) .collect() } - fn reassign_ids_visit_type(&mut self, field_type: Type) -> Type { + fn reassign_ids_visit_type(&mut self, field_type: Type) -> Result { match field_type { - Type::Primitive(s) => Type::Primitive(s), + Type::Primitive(s) => Ok(Type::Primitive(s)), Type::Struct(s) => { - let new_fields = self.reassign_field_ids(s.fields().to_vec()); - Type::Struct(StructType::new(new_fields)) + let new_fields = self.reassign_field_ids(s.fields().to_vec())?; + Ok(Type::Struct(StructType::new(new_fields))) } Type::List(l) => { self.old_to_new_id .insert(l.element_field.id, self.next_field_id); let mut element_field = Arc::unwrap_or_clone(l.element_field); element_field.id = self.next_field_id; - self.next_field_id += 1; - *element_field.field_type = self.reassign_ids_visit_type(*element_field.field_type); - Type::List(ListType { + self.increase_next_field_id()?; + *element_field.field_type = + self.reassign_ids_visit_type(*element_field.field_type)?; + Ok(Type::List(ListType { element_field: Arc::new(element_field), - }) + })) } Type::Map(m) => { self.old_to_new_id .insert(m.key_field.id, self.next_field_id); let mut key_field = Arc::unwrap_or_clone(m.key_field); key_field.id = self.next_field_id; - self.next_field_id += 1; - *key_field.field_type = self.reassign_ids_visit_type(*key_field.field_type); + self.increase_next_field_id()?; + *key_field.field_type = self.reassign_ids_visit_type(*key_field.field_type)?; self.old_to_new_id .insert(m.value_field.id, self.next_field_id); let mut value_field = Arc::unwrap_or_clone(m.value_field); value_field.id = self.next_field_id; - self.next_field_id += 1; - *value_field.field_type = self.reassign_ids_visit_type(*value_field.field_type); + self.increase_next_field_id()?; + *value_field.field_type = self.reassign_ids_visit_type(*value_field.field_type)?; - Type::Map(MapType { + Ok(Type::Map(MapType { key_field: Arc::new(key_field), value_field: Arc::new(value_field), - }) + })) } } } + fn increase_next_field_id(&mut self) -> Result<()> { + self.next_field_id = self.next_field_id.checked_add(1).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Field ID overflowed, cannot add more fields", + ) + })?; + Ok(()) + } + fn apply_to_identifier_fields(&self, field_ids: HashSet) -> Result> { field_ids .into_iter() @@ -2400,7 +2431,7 @@ table { let reassigned_schema = schema .into_builder() - .with_reassigned_field_ids(Some(0)) + .with_reassigned_field_ids(0) .build() .unwrap(); @@ -2426,7 +2457,7 @@ table { let reassigned_schema = schema .into_builder() .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 2)])) - .with_reassigned_field_ids(Some(0)) + .with_reassigned_field_ids(0) .build() .unwrap(); @@ -2528,13 +2559,33 @@ table { assert_eq!(reassigned_schema.field_by_id(16).unwrap().name, "age"); } + #[test] + fn test_reassign_ids_fails_with_duplicate_ids() { + let reassigned_schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![3]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 3)])) + .with_fields(vec![ + NestedField::optional(5, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .with_reassigned_field_ids(0) + .build() + .unwrap_err(); + + assert!(reassigned_schema + .message() + .contains("Found duplicate 'field.id' 3")); + } + #[test] fn test_reassign_ids_empty_schema() { let schema = Schema::builder().with_schema_id(1).build().unwrap(); let reassigned_schema = schema .clone() .into_builder() - .with_reassigned_field_ids(Some(0)) + .with_reassigned_field_ids(0) .build() .unwrap();