Skip to content

Commit

Permalink
fix: TableUpdate Snapshot deserialization for v1 (#656)
Browse files Browse the repository at this point in the history
  • Loading branch information
c-thiel authored Oct 3, 2024
1 parent cda4a0c commit d3b3ab1
Showing 1 changed file with 96 additions and 0 deletions.
96 changes: 96 additions & 0 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::fmt::Debug;
use std::mem::take;
use std::ops::Deref;

use _serde::deserialize_snapshot;
use async_trait::async_trait;
use serde_derive::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
Expand Down Expand Up @@ -401,6 +402,7 @@ pub enum TableUpdate {
#[serde(rename_all = "kebab-case")]
AddSnapshot {
/// Snapshot to add.
#[serde(deserialize_with = "deserialize_snapshot")]
snapshot: Snapshot,
},
/// Set table's snapshot ref.
Expand Down Expand Up @@ -451,6 +453,65 @@ impl TableUpdate {
}
}

pub(super) mod _serde {
use serde::{Deserialize as _, Deserializer};

use super::*;
use crate::spec::{SchemaId, Summary};

pub(super) fn deserialize_snapshot<'de, D>(
deserializer: D,
) -> std::result::Result<Snapshot, D::Error>
where D: Deserializer<'de> {
let buf = CatalogSnapshot::deserialize(deserializer)?;
Ok(buf.into())
}

#[derive(Debug, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// Defines the structure of a v2 snapshot for the catalog.
/// Main difference to SnapshotV2 is that sequence-number is optional
/// in the rest catalog spec to allow for backwards compatibility with v1.
struct CatalogSnapshot {
snapshot_id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
parent_snapshot_id: Option<i64>,
#[serde(default)]
sequence_number: i64,
timestamp_ms: i64,
manifest_list: String,
summary: Summary,
#[serde(skip_serializing_if = "Option::is_none")]
schema_id: Option<SchemaId>,
}

impl From<CatalogSnapshot> for Snapshot {
fn from(snapshot: CatalogSnapshot) -> Self {
let CatalogSnapshot {
snapshot_id,
parent_snapshot_id,
sequence_number,
timestamp_ms,
manifest_list,
schema_id,
summary,
} = snapshot;
let builder = Snapshot::builder()
.with_snapshot_id(snapshot_id)
.with_parent_snapshot_id(parent_snapshot_id)
.with_sequence_number(sequence_number)
.with_timestamp_ms(timestamp_ms)
.with_manifest_list(manifest_list)
.with_summary(summary);
if let Some(schema_id) = schema_id {
builder.with_schema_id(schema_id).build()
} else {
builder.build()
}
}
}
}

/// ViewCreation represents the creation of a view in the catalog.
#[derive(Debug, TypedBuilder)]
pub struct ViewCreation {
Expand Down Expand Up @@ -968,6 +1029,41 @@ mod tests {
test_serde_json(json, update);
}

#[test]
fn test_add_snapshot_v1() {
let json = r#"
{
"action": "add-snapshot",
"snapshot": {
"snapshot-id": 3055729675574597000,
"parent-snapshot-id": 3051729675574597000,
"timestamp-ms": 1555100955770,
"summary": {
"operation": "append"
},
"manifest-list": "s3://a/b/2.avro"
}
}
"#;

let update = TableUpdate::AddSnapshot {
snapshot: Snapshot::builder()
.with_snapshot_id(3055729675574597000)
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(0)
.with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
})
.build(),
};

let actual: TableUpdate = serde_json::from_str(json).expect("Failed to parse from json");
assert_eq!(actual, update, "Parsed value is not equal to expected");
}

#[test]
fn test_remove_snapshots() {
let json = r#"
Expand Down

0 comments on commit d3b3ab1

Please sign in to comment.