From ad6e283984a22a6f8a2f5645da2ff615efcf7e07 Mon Sep 17 00:00:00 2001 From: Yue Deng Date: Thu, 15 Feb 2024 13:57:03 +0800 Subject: [PATCH] refactor: remove support of manifest list format as a list of file paths#158 --- crates/iceberg/src/catalog/mod.rs | 3 +- crates/iceberg/src/spec/snapshot.rs | 116 ++++++++-------------- crates/iceberg/src/spec/table_metadata.rs | 16 ++- 3 files changed, 49 insertions(+), 86 deletions(-) diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index b68837593..8e493f553 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -429,7 +429,6 @@ pub enum TableUpdate { #[cfg(test)] mod tests { - use crate::spec::ManifestListLocation::ManifestListFile; use crate::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary, @@ -911,7 +910,7 @@ mod tests { .with_parent_snapshot_id(Some(3051729675574597000)) .with_timestamp_ms(1555100955770) .with_sequence_number(1) - .with_manifest_list(ManifestListFile("s3://a/b/2.avro".to_string())) + .with_manifest_list("s3://a/b/2.avro".to_string()) .with_schema_id(1) .with_summary(Summary { operation: Operation::Append, diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 781b757fb..dc814d9b9 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -84,7 +84,7 @@ pub struct Snapshot { timestamp_ms: i64, /// The location of a manifest list for this snapshot that /// tracks manifest files with additional metadata. - manifest_list: ManifestListLocation, + manifest_list: String, /// A string map that summarizes the snapshot changes, including operation. summary: Summary, /// ID of the table’s current schema when the snapshot was created. @@ -92,16 +92,6 @@ pub struct Snapshot { schema_id: Option, } -/// Type to distinguish between a path to a manifestlist file or a vector of manifestfile locations -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] -#[serde(untagged)] -pub enum ManifestListLocation { - /// Location of manifestlist file - ManifestListFile(String), - /// Manifestfile locations - ManifestFiles(Vec), -} - impl Snapshot { /// Get the id of the snapshot #[inline] @@ -122,7 +112,7 @@ impl Snapshot { } /// Get location of manifest_list file #[inline] - pub fn manifest_list(&self) -> &ManifestListLocation { + pub fn manifest_list(&self) -> &String { &self.manifest_list } @@ -131,13 +121,7 @@ impl Snapshot { /// It will return an error if the manifest list is not a file but a list of manifest file paths. #[inline] pub fn manifest_list_file_path(&self) -> Result<&str> { - match &self.manifest_list { - ManifestListLocation::ManifestListFile(s) => Ok(s), - _ => Err(Error::new( - ErrorKind::DataInvalid, - "Manifest list is not a file but a list of manifest files.", - )), - } + Ok(&self.manifest_list) } /// Get summary of the snapshot #[inline] @@ -187,31 +171,28 @@ impl Snapshot { file_io: &FileIO, table_metadata: &TableMetadata, ) -> Result { - match &self.manifest_list { - ManifestListLocation::ManifestListFile(file) => { - let mut manifest_list_content= Vec::new(); - file_io - .new_input(file)? - .reader().await? - .read_to_end(&mut manifest_list_content) - .await?; - - let schema = self.schema(table_metadata)?; - - let partition_type_provider = |partition_spec_id: i32| -> Result> { - table_metadata.partition_spec_by_id(partition_spec_id).map(|partition_spec| { - partition_spec.partition_type(&schema) - }).transpose() - }; - - ManifestList::parse_with_version(&manifest_list_content, table_metadata.format_version(), - partition_type_provider, ) - } - ManifestListLocation::ManifestFiles(_) => Err(Error::new( - ErrorKind::FeatureUnsupported, - "Loading manifests from `manifests` is currently not supported, we only support loading from `manifest-list` file, see https://iceberg.apache.org/spec/#snapshots for more information.", - )), - } + let mut manifest_list_content = Vec::new(); + file_io + .new_input(&self.manifest_list)? + .reader() + .await? + .read_to_end(&mut manifest_list_content) + .await?; + + let schema = self.schema(table_metadata)?; + + let partition_type_provider = |partition_spec_id: i32| -> Result> { + table_metadata + .partition_spec_by_id(partition_spec_id) + .map(|partition_spec| partition_spec.partition_type(&schema)) + .transpose() + }; + + ManifestList::parse_with_version( + &manifest_list_content, + table_metadata.format_version(), + partition_type_provider, + ) } pub(crate) fn log(&self) -> SnapshotLog { @@ -234,7 +215,7 @@ pub(super) mod _serde { use crate::spec::SchemaId; use crate::{Error, ErrorKind}; - use super::{ManifestListLocation, Operation, Snapshot, Summary}; + use super::{Operation, Snapshot, Summary}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "kebab-case")] @@ -262,8 +243,6 @@ pub(super) mod _serde { #[serde(skip_serializing_if = "Option::is_none")] pub manifest_list: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub manifests: Option>, - #[serde(skip_serializing_if = "Option::is_none")] pub summary: Option, #[serde(skip_serializing_if = "Option::is_none")] pub schema_id: Option, @@ -276,7 +255,7 @@ pub(super) mod _serde { parent_snapshot_id: v2.parent_snapshot_id, sequence_number: v2.sequence_number, timestamp_ms: v2.timestamp_ms, - manifest_list: ManifestListLocation::ManifestListFile(v2.manifest_list), + manifest_list: v2.manifest_list, summary: v2.summary, schema_id: v2.schema_id, } @@ -286,17 +265,14 @@ pub(super) mod _serde { impl From for SnapshotV2 { fn from(v2: Snapshot) -> Self { SnapshotV2 { - snapshot_id: v2.snapshot_id, - parent_snapshot_id: v2.parent_snapshot_id, - sequence_number: v2.sequence_number, - timestamp_ms: v2.timestamp_ms, - manifest_list: match v2.manifest_list { - ManifestListLocation::ManifestListFile(file) => file, - ManifestListLocation::ManifestFiles(_) => panic!("Wrong table format version. Can't convert a list of manifest files into a location of a manifest file.") - }, - summary: v2.summary, - schema_id: v2.schema_id, - } + snapshot_id: v2.snapshot_id, + parent_snapshot_id: v2.parent_snapshot_id, + sequence_number: v2.sequence_number, + timestamp_ms: v2.timestamp_ms, + manifest_list: v2.manifest_list, + summary: v2.summary, + schema_id: v2.schema_id, + } } } @@ -309,10 +285,9 @@ pub(super) mod _serde { parent_snapshot_id: v1.parent_snapshot_id, sequence_number: 0, timestamp_ms: v1.timestamp_ms, - manifest_list: match (v1.manifest_list, v1.manifests) { - (Some(file), _) => ManifestListLocation::ManifestListFile(file), - (None, Some(files)) => ManifestListLocation::ManifestFiles(files), - (None, None) => { + manifest_list: match v1.manifest_list { + Some(file) => file, + None => { return Err(Error::new( ErrorKind::DataInvalid, "Neither manifestlist file or manifest files are provided.", @@ -330,16 +305,14 @@ pub(super) mod _serde { impl From for SnapshotV1 { fn from(v2: Snapshot) -> Self { - let (manifest_list, manifests) = match v2.manifest_list { - ManifestListLocation::ManifestListFile(file) => (Some(file), None), - ManifestListLocation::ManifestFiles(files) => (None, Some(files)), + let manifest_list = match v2.manifest_list { + file => Some(file), }; SnapshotV1 { snapshot_id: v2.snapshot_id, parent_snapshot_id: v2.parent_snapshot_id, timestamp_ms: v2.timestamp_ms, manifest_list, - manifests, summary: Some(v2.summary), schema_id: v2.schema_id, } @@ -403,9 +376,7 @@ mod tests { use chrono::{TimeZone, Utc}; use std::collections::HashMap; - use crate::spec::snapshot::{ - ManifestListLocation, Operation, Snapshot, Summary, _serde::SnapshotV1, - }; + use crate::spec::snapshot::{Operation, Snapshot, Summary, _serde::SnapshotV1}; #[test] fn schema() { @@ -437,9 +408,6 @@ mod tests { }, *result.summary() ); - assert_eq!( - ManifestListLocation::ManifestListFile("s3://b/wh/.../s1.avro".to_string()), - *result.manifest_list() - ); + assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list()); } } diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 18e9ce82d..22cb23285 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -839,9 +839,9 @@ mod tests { use pretty_assertions::assert_eq; use crate::spec::{ - table_metadata::TableMetadata, ManifestListLocation, NestedField, NullOrder, Operation, - PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, - SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type, + table_metadata::TableMetadata, NestedField, NullOrder, Operation, PartitionField, + PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, + SortDirection, SortField, SortOrder, Summary, Transform, Type, }; use super::{FormatVersion, MetadataLog, SnapshotLog}; @@ -1104,7 +1104,7 @@ mod tests { .with_timestamp_ms(1662532818843) .with_sequence_number(0) .with_schema_id(0) - .with_manifest_list(ManifestListLocation::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())) + .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()) .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) .build(); @@ -1228,9 +1228,7 @@ mod tests { .with_snapshot_id(3051729675574597004) .with_timestamp_ms(1515100955770) .with_sequence_number(0) - .with_manifest_list(ManifestListLocation::ManifestListFile( - "s3://a/b/1.avro".to_string(), - )) + .with_manifest_list("s3://a/b/1.avro".to_string()) .with_summary(Summary { operation: Operation::Append, other: HashMap::new(), @@ -1243,9 +1241,7 @@ mod tests { .with_timestamp_ms(1555100955770) .with_sequence_number(1) .with_schema_id(1) - .with_manifest_list(ManifestListLocation::ManifestListFile( - "s3://a/b/2.avro".to_string(), - )) + .with_manifest_list("s3://a/b/2.avro".to_string()) .with_summary(Summary { operation: Operation::Append, other: HashMap::new(),