Skip to content

Commit

Permalink
fix: Manifest parsing should consider schema evolution. (#171)
Browse files Browse the repository at this point in the history
* fix: Manifest parsing should consider schema evolution.

* Fix ut
  • Loading branch information
liurenjie1024 authored Jan 25, 2024
1 parent a9104dc commit e4f55d1
Showing 1 changed file with 156 additions and 15 deletions.
171 changes: 156 additions & 15 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,16 +1328,11 @@ mod _serde {
) -> Result<HashMap<i32, Literal>, Error> {
let mut m = HashMap::with_capacity(v.len());
for entry in v {
let data_type = &schema
.field_by_id(entry.key)
.ok_or_else(|| {
Error::new(
ErrorKind::DataInvalid,
format!("Can't find field id {} for upper/lower_bounds", entry.key),
)
})?
.field_type;
m.insert(entry.key, Literal::try_from_bytes(&entry.value, data_type)?);
// We ignore the entry if the field is not found in the schema, due to schema evolution.
if let Some(field) = schema.field_by_id(entry.key) {
let data_type = &field.field_type;
m.insert(entry.key, Literal::try_from_bytes(&entry.value, data_type)?);
}
}
Ok(m)
}
Expand Down Expand Up @@ -1822,10 +1817,160 @@ mod tests {
assert_eq!(entry.partitions[0].upper_bound, Some(Literal::string("x")));
}

#[tokio::test]
async fn test_parse_manifest_with_schema_evolution() {
let manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
schema: Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"v_int",
Type::Primitive(PrimitiveType::Int),
)),
])
.build()
.unwrap(),
partition_spec: PartitionSpec {
spec_id: 0,
fields: vec![],
},
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
entries: vec![Arc::new(ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_format: DataFileFormat::Parquet,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([
(1, 61),
(2, 73),
(3, 61),
]),
value_counts: HashMap::default(),
null_value_counts: HashMap::default(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::int(2)),
(3, Literal::string("x"))
]),
upper_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::int(2)),
(3, Literal::string("x"))
]),
key_metadata: vec![],
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: None,
},
})],
};

let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]);

let (avro_bytes, _) = write_manifest(&manifest, writer).await;

// The parse should succeed.
let actual_manifest = Manifest::parse_avro(avro_bytes.as_slice()).unwrap();

// Compared with original manifest, the lower_bounds and upper_bounds no longer has data for field 3, and
// other parts should be same.
let expected_manifest = Manifest {
metadata: ManifestMetadata {
schema_id: 0,
schema: Schema::builder()
.with_fields(vec![
Arc::new(NestedField::optional(
1,
"id",
Type::Primitive(PrimitiveType::Long),
)),
Arc::new(NestedField::optional(
2,
"v_int",
Type::Primitive(PrimitiveType::Int),
)),
])
.build()
.unwrap(),
partition_spec: PartitionSpec {
spec_id: 0,
fields: vec![],
},
content: ManifestContentType::Data,
format_version: FormatVersion::V2,
},
entries: vec![Arc::new(ManifestEntry {
status: ManifestStatus::Added,
snapshot_id: None,
sequence_number: None,
file_sequence_number: None,
data_file: DataFile {
content: DataContentType::Data,
file_format: DataFileFormat::Parquet,
file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(),
partition: Struct::empty(),
record_count: 1,
file_size_in_bytes: 5442,
column_sizes: HashMap::from([
(1, 61),
(2, 73),
(3, 61),
]),
value_counts: HashMap::default(),
null_value_counts: HashMap::default(),
nan_value_counts: HashMap::new(),
lower_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::int(2)),
]),
upper_bounds: HashMap::from([
(1, Literal::long(1)),
(2, Literal::int(2)),
]),
key_metadata: vec![],
split_offsets: vec![4],
equality_ids: vec![],
sort_order_id: None,
},
})],
};

assert_eq!(actual_manifest, expected_manifest);
}

async fn test_manifest_read_write(
manifest: Manifest,
writer_builder: impl FnOnce(OutputFile) -> ManifestWriter,
) -> ManifestListEntry {
let (bs, res) = write_manifest(&manifest, writer_builder).await;
let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap();

assert_eq!(actual_manifest, manifest);
res
}

/// Utility method which writes out a manifest and returns the bytes.
async fn write_manifest(
manifest: &Manifest,
writer_builder: impl FnOnce(OutputFile) -> ManifestWriter,
) -> (Vec<u8>, ManifestListEntry) {
let temp_dir = TempDir::new().unwrap();
let path = temp_dir.path().join("test_manifest.avro");
let io = FileIOBuilder::new_fs_io().build().unwrap();
Expand All @@ -1834,10 +1979,6 @@ mod tests {
let res = writer.write(manifest.clone()).await.unwrap();

// Verify manifest
let bs = fs::read(path).expect("read_file must succeed");
let actual_manifest = Manifest::parse_avro(bs.as_slice()).unwrap();

assert_eq!(actual_manifest, manifest);
res
(fs::read(path).expect("read_file must succeed"), res)
}
}

0 comments on commit e4f55d1

Please sign in to comment.