Skip to content

Commit

Permalink
refactor: remove support of manifest list format as a list of file pa…
Browse files Browse the repository at this point in the history
…ths#158
  • Loading branch information
Dysprosium0626 committed Feb 17, 2024
1 parent 03f77b5 commit ad6e283
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 86 deletions.
3 changes: 1 addition & 2 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ pub enum TableUpdate {

#[cfg(test)]
mod tests {
use crate::spec::ManifestListLocation::ManifestListFile;
use crate::spec::{
FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot,
SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Summary,
Expand Down Expand Up @@ -911,7 +910,7 @@ mod tests {
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
.with_manifest_list(ManifestListFile("s3://a/b/2.avro".to_string()))
.with_manifest_list("s3://a/b/2.avro".to_string())
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
Expand Down
116 changes: 42 additions & 74 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,14 @@ pub struct Snapshot {
timestamp_ms: i64,
/// The location of a manifest list for this snapshot that
/// tracks manifest files with additional metadata.
manifest_list: ManifestListLocation,
manifest_list: String,
/// A string map that summarizes the snapshot changes, including operation.
summary: Summary,
/// ID of the table’s current schema when the snapshot was created.
#[builder(setter(strip_option), default = None)]
schema_id: Option<SchemaId>,
}

/// Type to distinguish between a path to a manifestlist file or a vector of manifestfile locations
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(untagged)]
pub enum ManifestListLocation {
/// Location of manifestlist file
ManifestListFile(String),
/// Manifestfile locations
ManifestFiles(Vec<String>),
}

impl Snapshot {
/// Get the id of the snapshot
#[inline]
Expand All @@ -122,7 +112,7 @@ impl Snapshot {
}
/// Get location of manifest_list file
#[inline]
pub fn manifest_list(&self) -> &ManifestListLocation {
pub fn manifest_list(&self) -> &String {
&self.manifest_list
}

Expand All @@ -131,13 +121,7 @@ impl Snapshot {
/// It will return an error if the manifest list is not a file but a list of manifest file paths.
#[inline]
pub fn manifest_list_file_path(&self) -> Result<&str> {
match &self.manifest_list {
ManifestListLocation::ManifestListFile(s) => Ok(s),
_ => Err(Error::new(
ErrorKind::DataInvalid,
"Manifest list is not a file but a list of manifest files.",
)),
}
Ok(&self.manifest_list)
}
/// Get summary of the snapshot
#[inline]
Expand Down Expand Up @@ -187,31 +171,28 @@ impl Snapshot {
file_io: &FileIO,
table_metadata: &TableMetadata,
) -> Result<ManifestList> {
match &self.manifest_list {
ManifestListLocation::ManifestListFile(file) => {
let mut manifest_list_content= Vec::new();
file_io
.new_input(file)?
.reader().await?
.read_to_end(&mut manifest_list_content)
.await?;

let schema = self.schema(table_metadata)?;

let partition_type_provider = |partition_spec_id: i32| -> Result<Option<StructType>> {
table_metadata.partition_spec_by_id(partition_spec_id).map(|partition_spec| {
partition_spec.partition_type(&schema)
}).transpose()
};

ManifestList::parse_with_version(&manifest_list_content, table_metadata.format_version(),
partition_type_provider, )
}
ManifestListLocation::ManifestFiles(_) => Err(Error::new(
ErrorKind::FeatureUnsupported,
"Loading manifests from `manifests` is currently not supported, we only support loading from `manifest-list` file, see https://iceberg.apache.org/spec/#snapshots for more information.",
)),
}
let mut manifest_list_content = Vec::new();
file_io
.new_input(&self.manifest_list)?
.reader()
.await?
.read_to_end(&mut manifest_list_content)
.await?;

let schema = self.schema(table_metadata)?;

let partition_type_provider = |partition_spec_id: i32| -> Result<Option<StructType>> {
table_metadata
.partition_spec_by_id(partition_spec_id)
.map(|partition_spec| partition_spec.partition_type(&schema))
.transpose()
};

ManifestList::parse_with_version(
&manifest_list_content,
table_metadata.format_version(),
partition_type_provider,
)
}

pub(crate) fn log(&self) -> SnapshotLog {
Expand All @@ -234,7 +215,7 @@ pub(super) mod _serde {
use crate::spec::SchemaId;
use crate::{Error, ErrorKind};

use super::{ManifestListLocation, Operation, Snapshot, Summary};
use super::{Operation, Snapshot, Summary};

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -262,8 +243,6 @@ pub(super) mod _serde {
#[serde(skip_serializing_if = "Option::is_none")]
pub manifest_list: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub manifests: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub summary: Option<Summary>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema_id: Option<SchemaId>,
Expand All @@ -276,7 +255,7 @@ pub(super) mod _serde {
parent_snapshot_id: v2.parent_snapshot_id,
sequence_number: v2.sequence_number,
timestamp_ms: v2.timestamp_ms,
manifest_list: ManifestListLocation::ManifestListFile(v2.manifest_list),
manifest_list: v2.manifest_list,
summary: v2.summary,
schema_id: v2.schema_id,
}
Expand All @@ -286,17 +265,14 @@ pub(super) mod _serde {
impl From<Snapshot> for SnapshotV2 {
fn from(v2: Snapshot) -> Self {
SnapshotV2 {
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
sequence_number: v2.sequence_number,
timestamp_ms: v2.timestamp_ms,
manifest_list: match v2.manifest_list {
ManifestListLocation::ManifestListFile(file) => file,
ManifestListLocation::ManifestFiles(_) => panic!("Wrong table format version. Can't convert a list of manifest files into a location of a manifest file.")
},
summary: v2.summary,
schema_id: v2.schema_id,
}
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
sequence_number: v2.sequence_number,
timestamp_ms: v2.timestamp_ms,
manifest_list: v2.manifest_list,
summary: v2.summary,
schema_id: v2.schema_id,
}
}
}

Expand All @@ -309,10 +285,9 @@ pub(super) mod _serde {
parent_snapshot_id: v1.parent_snapshot_id,
sequence_number: 0,
timestamp_ms: v1.timestamp_ms,
manifest_list: match (v1.manifest_list, v1.manifests) {
(Some(file), _) => ManifestListLocation::ManifestListFile(file),
(None, Some(files)) => ManifestListLocation::ManifestFiles(files),
(None, None) => {
manifest_list: match v1.manifest_list {
Some(file) => file,
None => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Neither manifestlist file or manifest files are provided.",
Expand All @@ -330,16 +305,14 @@ pub(super) mod _serde {

impl From<Snapshot> for SnapshotV1 {
fn from(v2: Snapshot) -> Self {
let (manifest_list, manifests) = match v2.manifest_list {
ManifestListLocation::ManifestListFile(file) => (Some(file), None),
ManifestListLocation::ManifestFiles(files) => (None, Some(files)),
let manifest_list = match v2.manifest_list {
file => Some(file),
};
SnapshotV1 {
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
timestamp_ms: v2.timestamp_ms,
manifest_list,
manifests,
summary: Some(v2.summary),
schema_id: v2.schema_id,
}
Expand Down Expand Up @@ -403,9 +376,7 @@ mod tests {
use chrono::{TimeZone, Utc};
use std::collections::HashMap;

use crate::spec::snapshot::{
ManifestListLocation, Operation, Snapshot, Summary, _serde::SnapshotV1,
};
use crate::spec::snapshot::{Operation, Snapshot, Summary, _serde::SnapshotV1};

#[test]
fn schema() {
Expand Down Expand Up @@ -437,9 +408,6 @@ mod tests {
},
*result.summary()
);
assert_eq!(
ManifestListLocation::ManifestListFile("s3://b/wh/.../s1.avro".to_string()),
*result.manifest_list()
);
assert_eq!("s3://b/wh/.../s1.avro".to_string(), *result.manifest_list());
}
}
16 changes: 6 additions & 10 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,9 +839,9 @@ mod tests {
use pretty_assertions::assert_eq;

use crate::spec::{
table_metadata::TableMetadata, ManifestListLocation, NestedField, NullOrder, Operation,
PartitionField, PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference,
SnapshotRetention, SortDirection, SortField, SortOrder, Summary, Transform, Type,
table_metadata::TableMetadata, NestedField, NullOrder, Operation, PartitionField,
PartitionSpec, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention,
SortDirection, SortField, SortOrder, Summary, Transform, Type,
};

use super::{FormatVersion, MetadataLog, SnapshotLog};
Expand Down Expand Up @@ -1104,7 +1104,7 @@ mod tests {
.with_timestamp_ms(1662532818843)
.with_sequence_number(0)
.with_schema_id(0)
.with_manifest_list(ManifestListLocation::ManifestListFile("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string()))
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro".to_string())
.with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.build();

Expand Down Expand Up @@ -1228,9 +1228,7 @@ mod tests {
.with_snapshot_id(3051729675574597004)
.with_timestamp_ms(1515100955770)
.with_sequence_number(0)
.with_manifest_list(ManifestListLocation::ManifestListFile(
"s3://a/b/1.avro".to_string(),
))
.with_manifest_list("s3://a/b/1.avro".to_string())
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
Expand All @@ -1243,9 +1241,7 @@ mod tests {
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
.with_schema_id(1)
.with_manifest_list(ManifestListLocation::ManifestListFile(
"s3://a/b/2.avro".to_string(),
))
.with_manifest_list("s3://a/b/2.avro".to_string())
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
Expand Down

0 comments on commit ad6e283

Please sign in to comment.