From 2bc66c43cf0aa7cf79e6c1b82a47c39598616d77 Mon Sep 17 00:00:00 2001 From: Christian Date: Fri, 4 Oct 2024 05:22:10 +0200 Subject: [PATCH] feat: Reassign field ids for schema (#615) * Reassign field ids for schema * Address comments * Schema ensure unique field ids * Fix tests with duplicate nested field ids * Use Schema::builder() for reassigned ids * Better docs --- crates/iceberg/src/arrow/schema.rs | 20 +- crates/iceberg/src/spec/datatypes.rs | 6 + crates/iceberg/src/spec/schema.rs | 395 ++++++++++++++++++++++++++- crates/iceberg/src/spec/values.rs | 22 +- 4 files changed, 411 insertions(+), 32 deletions(-) diff --git a/crates/iceberg/src/arrow/schema.rs b/crates/iceberg/src/arrow/schema.rs index a32c10a22..e73b409c6 100644 --- a/crates/iceberg/src/arrow/schema.rs +++ b/crates/iceberg/src/arrow/schema.rs @@ -826,8 +826,8 @@ mod tests { fn arrow_schema_for_arrow_schema_to_schema_test() -> ArrowSchema { let fields = Fields::from(vec![ - simple_field("key", DataType::Int32, false, "17"), - simple_field("value", DataType::Utf8, true, "18"), + simple_field("key", DataType::Int32, false, "28"), + simple_field("value", DataType::Utf8, true, "29"), ]); let r#struct = DataType::Struct(fields); @@ -1057,9 +1057,9 @@ mod tests { "required": true, "type": { "type": "map", - "key-id": 17, + "key-id": 28, "key": "int", - "value-id": 18, + "value-id": 29, "value-required": false, "value": "string" } @@ -1110,8 +1110,8 @@ mod tests { fn arrow_schema_for_schema_to_arrow_schema_test() -> ArrowSchema { let fields = Fields::from(vec![ - simple_field("key", DataType::Int32, false, "17"), - simple_field("value", DataType::Utf8, true, "18"), + simple_field("key", DataType::Int32, false, "28"), + simple_field("value", DataType::Utf8, true, "29"), ]); let r#struct = DataType::Struct(fields); @@ -1200,7 +1200,7 @@ mod tests { ), simple_field("map", map, false, "16"), simple_field("struct", r#struct, false, "17"), - simple_field("uuid", DataType::FixedSizeBinary(16), false, "26"), + simple_field("uuid", DataType::FixedSizeBinary(16), false, "30"), ]) } @@ -1344,9 +1344,9 @@ mod tests { "required": true, "type": { "type": "map", - "key-id": 17, + "key-id": 28, "key": "int", - "value-id": 18, + "value-id": 29, "value-required": false, "value": "string" } @@ -1380,7 +1380,7 @@ mod tests { } }, { - "id":26, + "id":30, "name":"uuid", "required":true, "type":"uuid" diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index d38245960..bce10ad5f 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -668,6 +668,12 @@ impl NestedField { self.write_default = Some(value); self } + + /// Set the id of the field. + pub(crate) fn with_id(mut self, id: i32) -> Self { + self.id = id; + self + } } impl fmt::Display for NestedField { diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 63a9e3cb4..cf86874dc 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -39,7 +39,7 @@ use crate::{ensure_data_valid, Error, ErrorKind}; pub type SchemaId = i32; /// Reference to [`Schema`]. pub type SchemaRef = Arc; -const DEFAULT_SCHEMA_ID: SchemaId = 0; +pub(crate) const DEFAULT_SCHEMA_ID: SchemaId = 0; /// Defines schema in iceberg. #[derive(Debug, Serialize, Deserialize, Clone)] @@ -77,6 +77,7 @@ pub struct SchemaBuilder { fields: Vec, alias_to_id: BiHashMap, identifier_field_ids: HashSet, + reassign_field_ids_from: Option, } impl SchemaBuilder { @@ -86,6 +87,16 @@ impl SchemaBuilder { self } + /// Reassign all field-ids (including nested) on build. + /// Reassignment starts from the field-id specified in `start_from` (inclusive). + /// + /// All specified aliases and identifier fields will be updated to the new field-ids. + #[allow(dead_code)] // Will be needed in TableMetadataBuilder + pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self { + self.reassign_field_ids_from = Some(start_from.try_into().unwrap_or(i32::MAX)); + self + } + /// Set schema id. pub fn with_schema_id(mut self, schema_id: i32) -> Self { self.schema_id = schema_id; @@ -130,7 +141,7 @@ impl SchemaBuilder { let highest_field_id = id_to_field.keys().max().cloned().unwrap_or(0); - Ok(Schema { + let mut schema = Schema { r#struct, schema_id: self.schema_id, highest_field_id, @@ -143,7 +154,24 @@ impl SchemaBuilder { id_to_name, field_id_to_accessor, - }) + }; + + if let Some(start_from) = self.reassign_field_ids_from { + let mut id_reassigner = ReassignFieldIds::new(start_from); + let new_fields = id_reassigner.reassign_field_ids(schema.r#struct.fields().to_vec())?; + let new_identifier_field_ids = + id_reassigner.apply_to_identifier_fields(schema.identifier_field_ids)?; + let new_alias_to_id = id_reassigner.apply_to_aliases(schema.alias_to_id.clone())?; + + schema = Schema::builder() + .with_schema_id(schema.schema_id) + .with_fields(new_fields) + .with_identifier_field_ids(new_identifier_field_ids) + .with_alias(new_alias_to_id) + .build()?; + } + + Ok(schema) } fn build_accessors(&self) -> HashMap> { @@ -265,6 +293,7 @@ impl Schema { fields: vec![], identifier_field_ids: HashSet::default(), alias_to_id: BiHashMap::default(), + reassign_field_ids_from: None, } } @@ -275,6 +304,7 @@ impl Schema { fields: self.r#struct.fields().to_vec(), alias_to_id: self.alias_to_id, identifier_field_ids: self.identifier_field_ids, + reassign_field_ids_from: None, } } @@ -475,8 +505,7 @@ pub fn index_by_id(r#struct: &StructType) -> Result } fn field(&mut self, field: &NestedFieldRef, _value: ()) -> Result<()> { - self.0.insert(field.id, field.clone()); - Ok(()) + try_insert_field(&mut self.0, field.id, field.clone()) } fn r#struct(&mut self, _struct: &StructType, _results: Vec) -> Result { @@ -484,15 +513,16 @@ pub fn index_by_id(r#struct: &StructType) -> Result } fn list(&mut self, list: &ListType, _value: Self::T) -> Result { - self.0 - .insert(list.element_field.id, list.element_field.clone()); - Ok(()) + try_insert_field( + &mut self.0, + list.element_field.id, + list.element_field.clone(), + ) } fn map(&mut self, map: &MapType, _key_value: Self::T, _value: Self::T) -> Result { - self.0.insert(map.key_field.id, map.key_field.clone()); - self.0.insert(map.value_field.id, map.value_field.clone()); - Ok(()) + try_insert_field(&mut self.0, map.key_field.id, map.key_field.clone())?; + try_insert_field(&mut self.0, map.value_field.id, map.value_field.clone()) } fn primitive(&mut self, _: &PrimitiveType) -> Result { @@ -943,6 +973,148 @@ impl SchemaVisitor for PruneColumn { } } +struct ReassignFieldIds { + next_field_id: i32, + old_to_new_id: HashMap, +} + +fn try_insert_field(map: &mut HashMap, field_id: i32, value: V) -> Result<()> { + map.insert(field_id, value).map_or_else( + || Ok(()), + |_| { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Found duplicate 'field.id' {}. Field ids must be unique.", + field_id + ), + )) + }, + ) +} + +// We are not using the visitor here, as post order traversal is not desired. +// Instead we want to re-assign all fields on one level first before diving deeper. +impl ReassignFieldIds { + fn new(start_from: i32) -> Self { + Self { + next_field_id: start_from, + old_to_new_id: HashMap::new(), + } + } + + fn reassign_field_ids(&mut self, fields: Vec) -> Result> { + // Visit fields on the same level first + let outer_fields = fields + .into_iter() + .map(|field| { + try_insert_field(&mut self.old_to_new_id, field.id, self.next_field_id)?; + let new_field = Arc::unwrap_or_clone(field).with_id(self.next_field_id); + self.increase_next_field_id()?; + Ok(Arc::new(new_field)) + }) + .collect::>>()?; + + // Now visit nested fields + outer_fields + .into_iter() + .map(|field| { + if field.field_type.is_primitive() { + Ok(field) + } else { + let mut new_field = Arc::unwrap_or_clone(field); + *new_field.field_type = self.reassign_ids_visit_type(*new_field.field_type)?; + Ok(Arc::new(new_field)) + } + }) + .collect() + } + + fn reassign_ids_visit_type(&mut self, field_type: Type) -> Result { + match field_type { + Type::Primitive(s) => Ok(Type::Primitive(s)), + Type::Struct(s) => { + let new_fields = self.reassign_field_ids(s.fields().to_vec())?; + Ok(Type::Struct(StructType::new(new_fields))) + } + Type::List(l) => { + self.old_to_new_id + .insert(l.element_field.id, self.next_field_id); + let mut element_field = Arc::unwrap_or_clone(l.element_field); + element_field.id = self.next_field_id; + self.increase_next_field_id()?; + *element_field.field_type = + self.reassign_ids_visit_type(*element_field.field_type)?; + Ok(Type::List(ListType { + element_field: Arc::new(element_field), + })) + } + Type::Map(m) => { + self.old_to_new_id + .insert(m.key_field.id, self.next_field_id); + let mut key_field = Arc::unwrap_or_clone(m.key_field); + key_field.id = self.next_field_id; + self.increase_next_field_id()?; + *key_field.field_type = self.reassign_ids_visit_type(*key_field.field_type)?; + + self.old_to_new_id + .insert(m.value_field.id, self.next_field_id); + let mut value_field = Arc::unwrap_or_clone(m.value_field); + value_field.id = self.next_field_id; + self.increase_next_field_id()?; + *value_field.field_type = self.reassign_ids_visit_type(*value_field.field_type)?; + + Ok(Type::Map(MapType { + key_field: Arc::new(key_field), + value_field: Arc::new(value_field), + })) + } + } + } + + fn increase_next_field_id(&mut self) -> Result<()> { + self.next_field_id = self.next_field_id.checked_add(1).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Field ID overflowed, cannot add more fields", + ) + })?; + Ok(()) + } + + fn apply_to_identifier_fields(&self, field_ids: HashSet) -> Result> { + field_ids + .into_iter() + .map(|id| { + self.old_to_new_id.get(&id).copied().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Identifier Field ID {} not found", id), + ) + }) + }) + .collect() + } + + fn apply_to_aliases(&self, alias: BiHashMap) -> Result> { + alias + .into_iter() + .map(|(name, id)| { + self.old_to_new_id + .get(&id) + .copied() + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Field with id {} for alias {} not found", id, name), + ) + }) + .map(|new_id| (name, new_id)) + }) + .collect() + } +} + pub(super) mod _serde { /// This is a helper module that defines types to help with serialization/deserialization. /// For deserialization the input first gets read into either the [SchemaV1] or [SchemaV2] struct @@ -1062,6 +1234,8 @@ pub(super) mod _serde { mod tests { use std::collections::{HashMap, HashSet}; + use bimap::BiHashMap; + use super::DEFAULT_SCHEMA_ID; use crate::spec::datatypes::Type::{List, Map, Primitive, Struct}; use crate::spec::datatypes::{ @@ -2237,4 +2411,203 @@ table { let schema = table_schema_simple().0; assert_eq!(3, schema.highest_field_id()); } + + #[test] + fn test_highest_field_id_no_fields() { + let schema = Schema::builder().with_schema_id(1).build().unwrap(); + assert_eq!(0, schema.highest_field_id()); + } + + #[test] + fn test_reassign_ids() { + let schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![3]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 3)])) + .with_fields(vec![ + NestedField::optional(5, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(4, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + let reassigned_schema = schema + .into_builder() + .with_reassigned_field_ids(0) + .build() + .unwrap(); + + let expected = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 1)])) + .with_fields(vec![ + NestedField::optional(0, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(1, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap(); + + pretty_assertions::assert_eq!(expected, reassigned_schema); + assert_eq!(reassigned_schema.highest_field_id(), 2); + } + + #[test] + fn test_reassigned_ids_nested() { + let schema = table_schema_nested(); + let reassigned_schema = schema + .into_builder() + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 2)])) + .with_reassigned_field_ids(0) + .build() + .unwrap(); + + let expected = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![1]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 1)])) + .with_fields(vec![ + NestedField::optional(0, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(1, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + NestedField::required( + 3, + "qux", + Type::List(ListType { + element_field: NestedField::list_element( + 7, + Type::Primitive(PrimitiveType::String), + true, + ) + .into(), + }), + ) + .into(), + NestedField::required( + 4, + "quux", + Type::Map(MapType { + key_field: NestedField::map_key_element( + 8, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 9, + Type::Map(MapType { + key_field: NestedField::map_key_element( + 10, + Type::Primitive(PrimitiveType::String), + ) + .into(), + value_field: NestedField::map_value_element( + 11, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + true, + ) + .into(), + }), + ) + .into(), + NestedField::required( + 5, + "location", + Type::List(ListType { + element_field: NestedField::list_element( + 12, + Type::Struct(StructType::new(vec![ + NestedField::optional( + 13, + "latitude", + Type::Primitive(PrimitiveType::Float), + ) + .into(), + NestedField::optional( + 14, + "longitude", + Type::Primitive(PrimitiveType::Float), + ) + .into(), + ])), + true, + ) + .into(), + }), + ) + .into(), + NestedField::optional( + 6, + "person", + Type::Struct(StructType::new(vec![ + NestedField::optional(15, "name", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(16, "age", Type::Primitive(PrimitiveType::Int)) + .into(), + ])), + ) + .into(), + ]) + .build() + .unwrap(); + + pretty_assertions::assert_eq!(expected, reassigned_schema); + assert_eq!(reassigned_schema.highest_field_id(), 16); + assert_eq!(reassigned_schema.field_by_id(6).unwrap().name, "person"); + assert_eq!(reassigned_schema.field_by_id(16).unwrap().name, "age"); + } + + #[test] + fn test_reassign_ids_fails_with_duplicate_ids() { + let reassigned_schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![5]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 3)])) + .with_fields(vec![ + NestedField::required(5, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .with_reassigned_field_ids(0) + .build() + .unwrap_err(); + + assert!(reassigned_schema.message().contains("'field.id' 3")); + } + + #[test] + fn test_field_ids_must_be_unique() { + let reassigned_schema = Schema::builder() + .with_schema_id(1) + .with_identifier_field_ids(vec![5]) + .with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 3)])) + .with_fields(vec![ + NestedField::required(5, "foo", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "bar", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(), + ]) + .build() + .unwrap_err(); + + assert!(reassigned_schema.message().contains("'field.id' 3")); + } + + #[test] + fn test_reassign_ids_empty_schema() { + let schema = Schema::builder().with_schema_id(1).build().unwrap(); + let reassigned_schema = schema + .clone() + .into_builder() + .with_reassigned_field_ids(0) + .build() + .unwrap(); + + assert_eq!(schema, reassigned_schema); + assert_eq!(schema.highest_field_id(), 0); + } } diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 3568d3dcd..3c6e2aa68 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -3192,10 +3192,10 @@ mod tests { (Literal::Primitive(PrimitiveLiteral::Int(3)), None), ])), &Type::Map(MapType { - key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::Int)) + key_field: NestedField::map_key_element(2, Type::Primitive(PrimitiveType::Int)) .into(), value_field: NestedField::map_value_element( - 1, + 3, Type::Primitive(PrimitiveType::Long), false, ) @@ -3219,10 +3219,10 @@ mod tests { ), ])), &Type::Map(MapType { - key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::Int)) + key_field: NestedField::map_key_element(2, Type::Primitive(PrimitiveType::Int)) .into(), value_field: NestedField::map_value_element( - 1, + 3, Type::Primitive(PrimitiveType::Long), true, ) @@ -3249,10 +3249,10 @@ mod tests { ), ])), &Type::Map(MapType { - key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::String)) + key_field: NestedField::map_key_element(2, Type::Primitive(PrimitiveType::String)) .into(), value_field: NestedField::map_value_element( - 1, + 3, Type::Primitive(PrimitiveType::Int), false, ) @@ -3276,10 +3276,10 @@ mod tests { ), ])), &Type::Map(MapType { - key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::String)) + key_field: NestedField::map_key_element(2, Type::Primitive(PrimitiveType::String)) .into(), value_field: NestedField::map_value_element( - 1, + 3, Type::Primitive(PrimitiveType::Int), true, ) @@ -3299,9 +3299,9 @@ mod tests { None, ])), &Type::Struct(StructType::new(vec![ - NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), - NestedField::optional(3, "address", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(3, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(4, "address", Type::Primitive(PrimitiveType::String)).into(), ])), ); }