diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 54abe8083..854c1269c 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -22,6 +22,7 @@ use std::fmt::Debug; use std::mem::take; use std::ops::Deref; +use _serde::deserialize_snapshot; use async_trait::async_trait; use serde_derive::{Deserialize, Serialize}; use typed_builder::TypedBuilder; @@ -401,6 +402,7 @@ pub enum TableUpdate { #[serde(rename_all = "kebab-case")] AddSnapshot { /// Snapshot to add. + #[serde(deserialize_with = "deserialize_snapshot")] snapshot: Snapshot, }, /// Set table's snapshot ref. @@ -451,6 +453,65 @@ impl TableUpdate { } } +pub(super) mod _serde { + use serde::{Deserialize as _, Deserializer}; + + use super::*; + use crate::spec::{SchemaId, Summary}; + + pub(super) fn deserialize_snapshot<'de, D>( + deserializer: D, + ) -> std::result::Result + where D: Deserializer<'de> { + let buf = CatalogSnapshot::deserialize(deserializer)?; + Ok(buf.into()) + } + + #[derive(Debug, Deserialize, PartialEq, Eq)] + #[serde(rename_all = "kebab-case")] + /// Defines the structure of a v2 snapshot for the catalog. + /// Main difference to SnapshotV2 is that sequence-number is optional + /// in the rest catalog spec to allow for backwards compatibility with v1. + struct CatalogSnapshot { + snapshot_id: i64, + #[serde(skip_serializing_if = "Option::is_none")] + parent_snapshot_id: Option, + #[serde(default)] + sequence_number: i64, + timestamp_ms: i64, + manifest_list: String, + summary: Summary, + #[serde(skip_serializing_if = "Option::is_none")] + schema_id: Option, + } + + impl From for Snapshot { + fn from(snapshot: CatalogSnapshot) -> Self { + let CatalogSnapshot { + snapshot_id, + parent_snapshot_id, + sequence_number, + timestamp_ms, + manifest_list, + schema_id, + summary, + } = snapshot; + let builder = Snapshot::builder() + .with_snapshot_id(snapshot_id) + .with_parent_snapshot_id(parent_snapshot_id) + .with_sequence_number(sequence_number) + .with_timestamp_ms(timestamp_ms) + .with_manifest_list(manifest_list) + .with_summary(summary); + if let Some(schema_id) = schema_id { + builder.with_schema_id(schema_id).build() + } else { + builder.build() + } + } + } +} + /// ViewCreation represents the creation of a view in the catalog. #[derive(Debug, TypedBuilder)] pub struct ViewCreation { @@ -968,6 +1029,41 @@ mod tests { test_serde_json(json, update); } + #[test] + fn test_add_snapshot_v1() { + let json = r#" +{ + "action": "add-snapshot", + "snapshot": { + "snapshot-id": 3055729675574597000, + "parent-snapshot-id": 3051729675574597000, + "timestamp-ms": 1555100955770, + "summary": { + "operation": "append" + }, + "manifest-list": "s3://a/b/2.avro" + } +} + "#; + + let update = TableUpdate::AddSnapshot { + snapshot: Snapshot::builder() + .with_snapshot_id(3055729675574597000) + .with_parent_snapshot_id(Some(3051729675574597000)) + .with_timestamp_ms(1555100955770) + .with_sequence_number(0) + .with_manifest_list("s3://a/b/2.avro") + .with_summary(Summary { + operation: Operation::Append, + other: HashMap::default(), + }) + .build(), + }; + + let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json"); + assert_eq!(actual, update, "Parsed value is not equal to expected"); + } + #[test] fn test_remove_snapshots() { let json = r#"