Skip to content

Commit

Permalink
Resolve
Browse files Browse the repository at this point in the history
  • Loading branch information
liurenjie1024 committed Dec 19, 2023
1 parent f2ecf9c commit 67205d9
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 108 deletions.
181 changes: 89 additions & 92 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,18 @@ impl ManifestList {
pub fn parse_with_version(
bs: &[u8],
version: FormatVersion,
partition_types: &HashMap<i32, StructType>,
partition_type_provider: impl Fn(i32) -> Result<Option<StructType>>,
) -> Result<ManifestList> {
match version {
FormatVersion::V1 => {
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?;
let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_types)
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type_provider)
}
FormatVersion::V2 => {
let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?;
let values = Value::Array(reader.collect::<std::result::Result<Vec<Value>, _>>()?);
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_types)
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type_provider)
}
}
}
Expand Down Expand Up @@ -683,17 +683,15 @@ pub struct FieldSummary {
/// and then converted into the [ManifestListEntry] struct. Serialization works the other way around.
/// [ManifestListEntryV1] and [ManifestListEntryV2] are internal struct that are only used for serialization and deserialization.
pub(super) mod _serde {
use std::collections::HashMap;

pub use serde_bytes::ByteBuf;
use serde_derive::{Deserialize, Serialize};

use crate::{
spec::{Literal, StructType, Type},
Error,
};
pub use serde_bytes::ByteBuf;
use serde_derive::{Deserialize, Serialize};

use super::ManifestListEntry;
use crate::error::Result;

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(transparent)]
Expand All @@ -712,16 +710,16 @@ pub(super) mod _serde {
/// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait.
pub fn try_into(
self,
partition_types: &HashMap<i32, StructType>,
) -> Result<super::ManifestList, Error> {
partition_type_provider: impl Fn(i32) -> Result<Option<StructType>>,
) -> Result<super::ManifestList> {
Ok(super::ManifestList {
entries: self
.entries
.into_iter()
.map(|v| {
let partition_spec_id = v.partition_spec_id;
let manifest_path = v.manifest_path.clone();
v.try_into(partition_types.get(&partition_spec_id))
v.try_into(partition_type_provider(partition_spec_id)?.as_ref())
.map_err(|err| {
err.with_context("manifest file path", manifest_path)
.with_context(
Expand All @@ -730,21 +728,21 @@ pub(super) mod _serde {
)
})
})
.collect::<Result<Vec<_>, _>>()?,
.collect::<Result<Vec<_>>>()?,
})
}
}

impl TryFrom<super::ManifestList> for ManifestListV2 {
type Error = Error;

fn try_from(value: super::ManifestList) -> Result<Self, Self::Error> {
fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
Ok(Self {
entries: value
.entries
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
.collect::<std::result::Result<Vec<_>, _>>()?,
})
}
}
Expand All @@ -754,16 +752,16 @@ pub(super) mod _serde {
/// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait.
pub fn try_into(
self,
partition_types: &HashMap<i32, StructType>,
) -> Result<super::ManifestList, Error> {
partition_type_provider: impl Fn(i32) -> Result<Option<StructType>>,
) -> Result<super::ManifestList> {
Ok(super::ManifestList {
entries: self
.entries
.into_iter()
.map(|v| {
let partition_spec_id = v.partition_spec_id;
let manifest_path = v.manifest_path.clone();
v.try_into(partition_types.get(&partition_spec_id))
v.try_into(partition_type_provider(partition_spec_id)?.as_ref())
.map_err(|err| {
err.with_context("manifest file path", manifest_path)
.with_context(
Expand All @@ -772,21 +770,21 @@ pub(super) mod _serde {
)
})
})
.collect::<Result<Vec<_>, _>>()?,
.collect::<Result<Vec<_>>>()?,
})
}
}

impl TryFrom<super::ManifestList> for ManifestListV1 {
type Error = Error;

fn try_from(value: super::ManifestList) -> Result<Self, Self::Error> {
fn try_from(value: super::ManifestList) -> std::result::Result<Self, Self::Error> {
Ok(Self {
entries: value
.entries
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()?,
.collect::<std::result::Result<Vec<_>, _>>()?,
})
}
}
Expand Down Expand Up @@ -838,7 +836,7 @@ pub(super) mod _serde {
/// Converts the [FieldSummary] into a [super::FieldSummary].
/// [lower_bound] and [upper_bound] are converted into [Literal]s need the type info so use
/// this function instead of [std::TryFrom] trait.
pub(crate) fn try_into(self, r#type: &Type) -> Result<super::FieldSummary, Error> {
pub(crate) fn try_into(self, r#type: &Type) -> Result<super::FieldSummary> {
Ok(super::FieldSummary {
contains_null: self.contains_null,
contains_nan: self.contains_nan,
Expand All @@ -857,7 +855,7 @@ pub(super) mod _serde {
fn try_convert_to_field_summary(
partitions: Option<Vec<FieldSummary>>,
partition_type: Option<&StructType>,
) -> Result<Vec<super::FieldSummary>, Error> {
) -> Result<Vec<super::FieldSummary>> {
if let Some(partitions) = partitions {
if let Some(partition_type) = partition_type {
let partition_types = partition_type.fields();
Expand All @@ -875,7 +873,7 @@ pub(super) mod _serde {
.into_iter()
.zip(partition_types)
.map(|(v, field)| v.try_into(&field.field_type))
.collect::<Result<Vec<_>, _>>()
.collect::<Result<Vec<_>>>()
} else {
Err(Error::new(
crate::ErrorKind::DataInvalid,
Expand All @@ -890,10 +888,7 @@ pub(super) mod _serde {
impl ManifestListEntryV2 {
/// Converts the [ManifestListEntryV2] into a [ManifestListEntry].
/// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait.
pub fn try_into(
self,
partition_type: Option<&StructType>,
) -> Result<ManifestListEntry, Error> {
pub fn try_into(self, partition_type: Option<&StructType>) -> Result<ManifestListEntry> {
let partitions = try_convert_to_field_summary(self.partitions, partition_type)?;
Ok(ManifestListEntry {
manifest_path: self.manifest_path,
Expand All @@ -918,10 +913,7 @@ pub(super) mod _serde {
impl ManifestListEntryV1 {
/// Converts the [ManifestListEntryV1] into a [ManifestListEntry].
/// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait.
pub fn try_into(
self,
partition_type: Option<&StructType>,
) -> Result<ManifestListEntry, Error> {
pub fn try_into(self, partition_type: Option<&StructType>) -> Result<ManifestListEntry> {
let partitions = try_convert_to_field_summary(self.partitions, partition_type)?;
Ok(ManifestListEntry {
manifest_path: self.manifest_path,
Expand Down Expand Up @@ -988,7 +980,7 @@ pub(super) mod _serde {
impl TryFrom<ManifestListEntry> for ManifestListEntryV2 {
type Error = Error;

fn try_from(value: ManifestListEntry) -> Result<Self, Self::Error> {
fn try_from(value: ManifestListEntry) -> std::result::Result<Self, Self::Error> {
let partitions = convert_to_serde_field_summary(value.partitions);
let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
Ok(Self {
Expand Down Expand Up @@ -1062,7 +1054,7 @@ pub(super) mod _serde {
impl TryFrom<ManifestListEntry> for ManifestListEntryV1 {
type Error = Error;

fn try_from(value: ManifestListEntry) -> Result<Self, Self::Error> {
fn try_from(value: ManifestListEntry) -> std::result::Result<Self, Self::Error> {
let partitions = convert_to_serde_field_summary(value.partitions);
let key_metadata = convert_to_serde_key_metadata(value.key_metadata);
Ok(Self {
Expand Down Expand Up @@ -1159,7 +1151,7 @@ mod test {
let bs = fs::read(full_path).expect("read_file must succeed");

let parsed_manifest_list =
ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, &HashMap::new())
ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, |_id| Ok(None))
.unwrap();

assert_eq!(manifest_list, parsed_manifest_list);
Expand Down Expand Up @@ -1226,29 +1218,30 @@ mod test {

let bs = fs::read(full_path).expect("read_file must succeed");

let parsed_manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V2,
&HashMap::from([
(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
),
(
2,
StructType::new(vec![Arc::new(NestedField::required(
let parsed_manifest_list =
ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, |id| {
Ok(HashMap::from([
(
1,
"test",
Type::Primitive(PrimitiveType::Float),
))]),
),
]),
)
.unwrap();
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
),
(
2,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Float),
))]),
),
])
.get(&id)
.cloned())
})
.unwrap();

assert_eq!(manifest_list, parsed_manifest_list);
}
Expand Down Expand Up @@ -1343,19 +1336,21 @@ mod test {
writer.close().await.unwrap();

let bs = fs::read(path).unwrap();
let manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V1,
&HashMap::from([(

let partition_types = HashMap::from([(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]),
)
.unwrap();
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]);

let manifest_list =
ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, move |id| {
Ok(partition_types.get(&id).cloned())
})
.unwrap();
assert_eq!(manifest_list, expected_manifest_list);

temp_dir.close().unwrap();
Expand Down Expand Up @@ -1397,19 +1392,19 @@ mod test {
writer.close().await.unwrap();

let bs = fs::read(path).unwrap();
let manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V2,
&HashMap::from([(
let partition_types = HashMap::from([(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]),
)
.unwrap();
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]);
let manifest_list =
ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, move |id| {
Ok(partition_types.get(&id).cloned())
})
.unwrap();
expected_manifest_list.entries[0].sequence_number = seq_num;
expected_manifest_list.entries[0].min_sequence_number = seq_num;
assert_eq!(manifest_list, expected_manifest_list);
Expand Down Expand Up @@ -1451,19 +1446,21 @@ mod test {
writer.close().await.unwrap();

let bs = fs::read(path).unwrap();
let manifest_list = ManifestList::parse_with_version(
&bs,
crate::spec::FormatVersion::V2,
&HashMap::from([(

let partition_types = HashMap::from([(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
StructType::new(vec![Arc::new(NestedField::required(
1,
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]),
)
.unwrap();
"test",
Type::Primitive(PrimitiveType::Long),
))]),
)]);

let manifest_list =
ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, move |id| {
Ok(partition_types.get(&id).cloned())
})
.unwrap();
assert_eq!(manifest_list, expected_manifest_list);

temp_dir.close().unwrap();
Expand Down
Loading

0 comments on commit 67205d9

Please sign in to comment.