Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
c-thiel committed Sep 25, 2024
1 parent a0deaae commit 0bbe868
Showing 1 changed file with 92 additions and 41 deletions.
133 changes: 92 additions & 41 deletions crates/iceberg/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,12 @@ impl SchemaBuilder {
}

/// Reassign all field-ids (nested) on build.
/// If `start_from` is provided, it will start reassigning from that id (inclusive).
/// If not provided, it will start from 0.
/// Reassignment starts from the field-id specified in `start_from` (inclusive).
///
/// All specified aliases and identifier fields will be updated to the new field-ids.
pub fn with_reassigned_field_ids(mut self, start_from: Option<i32>) -> Self {
self.reassign_field_ids_from = Some(start_from.unwrap_or(0));
#[allow(dead_code)] // Will be needed in TableMetadataBuilder
pub(crate) fn with_reassigned_field_ids(mut self, start_from: u32) -> Self {
self.reassign_field_ids_from = Some(start_from.try_into().unwrap_or(i32::MAX));
self
}

Expand All @@ -116,16 +116,7 @@ impl SchemaBuilder {
}

/// Builds the schema.
pub fn build(mut self) -> Result<Schema> {
if let Some(start_from) = self.reassign_field_ids_from {
let mut id_reassigner = ReassignFieldIds::new(start_from);
self.fields = id_reassigner.reassign_field_ids(self.fields);

self.identifier_field_ids =
id_reassigner.apply_to_identifier_fields(self.identifier_field_ids)?;
self.alias_to_id = id_reassigner.apply_to_aliases(self.alias_to_id)?;
}

pub fn build(self) -> Result<Schema> {
let field_id_to_accessor = self.build_accessors();

let r#struct = StructType::new(self.fields);
Expand All @@ -150,7 +141,7 @@ impl SchemaBuilder {

let highest_field_id = id_to_field.keys().max().cloned().unwrap_or(0);

Ok(Schema {
let mut schema = Schema {
r#struct,
schema_id: self.schema_id,
highest_field_id,
Expand All @@ -163,7 +154,26 @@ impl SchemaBuilder {
id_to_name,

field_id_to_accessor,
})
};

if let Some(start_from) = self.reassign_field_ids_from {
let mut id_reassigner = ReassignFieldIds::new(start_from);
let mew_fields = id_reassigner.reassign_field_ids(schema.r#struct.fields().to_vec())?;
let new_identifier_field_ids =
id_reassigner.apply_to_identifier_fields(schema.identifier_field_ids)?;
let new_alias_to_id = id_reassigner.apply_to_aliases(schema.alias_to_id.clone())?;

schema = SchemaBuilder {
schema_id: schema.schema_id,
fields: mew_fields,
alias_to_id: new_alias_to_id,
identifier_field_ids: new_identifier_field_ids,
reassign_field_ids_from: None,
}
.build()?;
}

Ok(schema)
}

fn build_accessors(&self) -> HashMap<i32, Arc<StructAccessor>> {
Expand Down Expand Up @@ -980,74 +990,95 @@ impl ReassignFieldIds {
}
}

fn reassign_field_ids(&mut self, fields: Vec<NestedFieldRef>) -> Vec<NestedFieldRef> {
fn reassign_field_ids(&mut self, fields: Vec<NestedFieldRef>) -> Result<Vec<NestedFieldRef>> {
// Visit fields on the same level first
let outer_fields = fields
.into_iter()
.map(|field| {
self.old_to_new_id.insert(field.id, self.next_field_id);
self
.old_to_new_id
.insert(field.id, self.next_field_id)
.map_or_else(|| Ok(()), |_| Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Error reassigning field ids: Found duplicate 'field.id' {}. Field ids must be unique.",
field.id
))
))?;

let new_field = Arc::unwrap_or_clone(field).with_id(self.next_field_id);
self.next_field_id += 1;
Arc::new(new_field)
self.increase_next_field_id()?;
Ok(Arc::new(new_field))
})
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;

// Now visit nested fields
outer_fields
.into_iter()
.map(|field| {
if field.field_type.is_primitive() {
field
Ok(field)
} else {
let mut new_field = Arc::unwrap_or_clone(field);
*new_field.field_type = self.reassign_ids_visit_type(*new_field.field_type);
Arc::new(new_field)
*new_field.field_type = self.reassign_ids_visit_type(*new_field.field_type)?;
Ok(Arc::new(new_field))
}
})
.collect()
}

fn reassign_ids_visit_type(&mut self, field_type: Type) -> Type {
fn reassign_ids_visit_type(&mut self, field_type: Type) -> Result<Type> {
match field_type {
Type::Primitive(s) => Type::Primitive(s),
Type::Primitive(s) => Ok(Type::Primitive(s)),
Type::Struct(s) => {
let new_fields = self.reassign_field_ids(s.fields().to_vec());
Type::Struct(StructType::new(new_fields))
let new_fields = self.reassign_field_ids(s.fields().to_vec())?;
Ok(Type::Struct(StructType::new(new_fields)))
}
Type::List(l) => {
self.old_to_new_id
.insert(l.element_field.id, self.next_field_id);
let mut element_field = Arc::unwrap_or_clone(l.element_field);
element_field.id = self.next_field_id;
self.next_field_id += 1;
*element_field.field_type = self.reassign_ids_visit_type(*element_field.field_type);
Type::List(ListType {
self.increase_next_field_id()?;
*element_field.field_type =
self.reassign_ids_visit_type(*element_field.field_type)?;
Ok(Type::List(ListType {
element_field: Arc::new(element_field),
})
}))
}
Type::Map(m) => {
self.old_to_new_id
.insert(m.key_field.id, self.next_field_id);
let mut key_field = Arc::unwrap_or_clone(m.key_field);
key_field.id = self.next_field_id;
self.next_field_id += 1;
*key_field.field_type = self.reassign_ids_visit_type(*key_field.field_type);
self.increase_next_field_id()?;
*key_field.field_type = self.reassign_ids_visit_type(*key_field.field_type)?;

self.old_to_new_id
.insert(m.value_field.id, self.next_field_id);
let mut value_field = Arc::unwrap_or_clone(m.value_field);
value_field.id = self.next_field_id;
self.next_field_id += 1;
*value_field.field_type = self.reassign_ids_visit_type(*value_field.field_type);
self.increase_next_field_id()?;
*value_field.field_type = self.reassign_ids_visit_type(*value_field.field_type)?;

Type::Map(MapType {
Ok(Type::Map(MapType {
key_field: Arc::new(key_field),
value_field: Arc::new(value_field),
})
}))
}
}
}

fn increase_next_field_id(&mut self) -> Result<()> {
self.next_field_id = self.next_field_id.checked_add(1).ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
"Field ID overflowed, cannot add more fields",
)
})?;
Ok(())
}

fn apply_to_identifier_fields(&self, field_ids: HashSet<i32>) -> Result<HashSet<i32>> {
field_ids
.into_iter()
Expand Down Expand Up @@ -2400,7 +2431,7 @@ table {

let reassigned_schema = schema
.into_builder()
.with_reassigned_field_ids(Some(0))
.with_reassigned_field_ids(0)
.build()
.unwrap();

Expand All @@ -2426,7 +2457,7 @@ table {
let reassigned_schema = schema
.into_builder()
.with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 2)]))
.with_reassigned_field_ids(Some(0))
.with_reassigned_field_ids(0)
.build()
.unwrap();

Expand Down Expand Up @@ -2528,13 +2559,33 @@ table {
assert_eq!(reassigned_schema.field_by_id(16).unwrap().name, "age");
}

#[test]
fn test_reassign_ids_fails_with_duplicate_ids() {
let reassigned_schema = Schema::builder()
.with_schema_id(1)
.with_identifier_field_ids(vec![3])
.with_alias(BiHashMap::from_iter(vec![("bar_alias".to_string(), 3)]))
.with_fields(vec![
NestedField::optional(5, "foo", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(3, "bar", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::optional(3, "baz", Type::Primitive(PrimitiveType::Boolean)).into(),
])
.with_reassigned_field_ids(0)
.build()
.unwrap_err();

assert!(reassigned_schema
.message()
.contains("Found duplicate 'field.id' 3"));
}

#[test]
fn test_reassign_ids_empty_schema() {
let schema = Schema::builder().with_schema_id(1).build().unwrap();
let reassigned_schema = schema
.clone()
.into_builder()
.with_reassigned_field_ids(Some(0))
.with_reassigned_field_ids(0)
.build()
.unwrap();

Expand Down

0 comments on commit 0bbe868

Please sign in to comment.