Skip to content

Commit

Permalink
Improve readability & comments
Browse files Browse the repository at this point in the history
  • Loading branch information
c-thiel committed Sep 9, 2024
1 parent fd93fc3 commit 65074a1
Showing 1 changed file with 34 additions and 44 deletions.
78 changes: 34 additions & 44 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,82 +338,69 @@ impl TableMetadata {
/// We run this method after json deserialization.
/// All constructors for `TableMetadata` which are part of `iceberg-rust`
/// should return normalized `TableMetadata`.
///
/// It does:
/// * Validate the current schema is set and valid
/// * Validate that all refs are valid (snapshot exists)
/// * Validate logs are chronological
/// * Normalize location (remove trailing slash)
/// * Validate that for V1 Metadata the last_sequence_number is 0
/// * If the default partition spec is specified but the spec is not present in specs, add it
/// * If the default sort order is unsorted but the sort order is not present, add it
pub(super) fn try_normalize(&mut self) -> Result<&mut Self> {
self.validate_current_schema()?;
self.normalize_current_snapshot()?;
self.validate_refs()?;
self.validate_chronological_snapshot_logs()?;
self.validate_chronological_metadata_logs()?;
// Normalize location (remove trailing slash)
self.location = self.location.trim_end_matches('/').to_string();
self.validate_format_version_specifics()?;
self.try_normalize_partition_spec()?;
self.try_normalize_sort_order()?;
Ok(self)
}

/// If the default partition spec is specified but the spec is not present in specs, add it
fn try_normalize_partition_spec(&mut self) -> Result<()> {
if self.partition_spec_by_id(self.default_spec_id).is_none() {
if self.default_spec_id == DEFAULT_PARTITION_SPEC_ID {
let partition_spec = PartitionSpec {
spec_id: DEFAULT_PARTITION_SPEC_ID,
fields: vec![],
};
self.partition_specs
.insert(DEFAULT_PARTITION_SPEC_ID, Arc::new(partition_spec));
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"No partition spec exists with the default spec id {}.",
self.default_spec_id
),
));
}
if self.partition_spec_by_id(self.default_spec_id).is_some() {
return Ok(());
}
if self.partition_specs.is_empty() {

if self.default_spec_id != DEFAULT_PARTITION_SPEC_ID {
return Err(Error::new(
ErrorKind::DataInvalid,
"Partition specs cannot be null or empty",
format!(
"No partition spec exists with the default spec id {}.",
self.default_spec_id
),
));
}

let partition_spec = PartitionSpec {
spec_id: DEFAULT_PARTITION_SPEC_ID,
fields: vec![],
};
self.partition_specs
.insert(DEFAULT_PARTITION_SPEC_ID, Arc::new(partition_spec));

Ok(())
}

/// If the default sort order is unsorted but the sort order is not present, add it
fn try_normalize_sort_order(&mut self) -> Result<()> {
if self.sort_order_by_id(self.default_sort_order_id).is_none() {
if self.default_sort_order_id == SortOrder::UNSORTED_ORDER_ID {
let sort_order = SortOrder::unsorted_order();
self.sort_orders
.insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
} else {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"No sort order exists with the default sort order id {}.",
self.default_sort_order_id
),
));
}
if self.sort_order_by_id(self.default_sort_order_id).is_some() {
return Ok(());
}

if self.sort_orders.is_empty() {
if self.default_sort_order_id != SortOrder::UNSORTED_ORDER_ID {
return Err(Error::new(
ErrorKind::DataInvalid,
"Sort orders cannot be null or empty",
format!(
"No sort order exists with the default sort order id {}.",
self.default_sort_order_id
),
));
}

let sort_order = SortOrder::unsorted_order();
self.sort_orders
.insert(SortOrder::UNSORTED_ORDER_ID, Arc::new(sort_order));
Ok(())
}

/// Validate the current schema is set and exists.
fn validate_current_schema(&self) -> Result<()> {
if self.schema_by_id(self.current_schema_id).is_none() {
return Err(Error::new(
Expand Down Expand Up @@ -445,6 +432,7 @@ impl TableMetadata {
Ok(())
}

/// Validate that all refs are valid (snapshot exists)
fn validate_refs(&self) -> Result<()> {
for (name, snapshot_ref) in self.refs.iter() {
if self.snapshot_by_id(snapshot_ref.snapshot_id).is_none() {
Expand Down Expand Up @@ -481,6 +469,7 @@ impl TableMetadata {
Ok(())
}

/// Validate that for V1 Metadata the last_sequence_number is 0
fn validate_format_version_specifics(&self) -> Result<()> {
if self.format_version < FormatVersion::V2 && self.last_sequence_number != 0 {
return Err(Error::new(
Expand All @@ -495,6 +484,7 @@ impl TableMetadata {
Ok(())
}

/// Validate snapshots logs are chronological and last updated is after the last snapshot log.
fn validate_chronological_snapshot_logs(&self) -> Result<()> {
for window in self.snapshot_log.windows(2) {
let (prev, curr) = (&window[0], &window[1]);
Expand Down

0 comments on commit 65074a1

Please sign in to comment.