From 7229e59f35c54b8554c99787cc27733c846b783b Mon Sep 17 00:00:00 2001 From: ZENOTME Date: Thu, 26 Sep 2024 19:29:18 +0800 Subject: [PATCH] refactor append action of transaction --- crates/catalog/rest/src/catalog.rs | 2 +- .../e2e_test/tests/append_data_file_test.rs | 22 +- crates/e2e_test/tests/conflict_commit_test.rs | 22 +- crates/iceberg/src/catalog/mod.rs | 2 +- crates/iceberg/src/io/object_cache.rs | 6 +- crates/iceberg/src/spec/datatypes.rs | 51 --- crates/iceberg/src/spec/manifest_list.rs | 30 +- crates/iceberg/src/spec/snapshot.rs | 11 +- crates/iceberg/src/spec/table_metadata.rs | 18 +- crates/iceberg/src/spec/values.rs | 10 + crates/iceberg/src/transaction.rs | 391 +++++++++++------- 11 files changed, 312 insertions(+), 253 deletions(-) diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 1181c3cc1..bd73f6d03 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -1376,7 +1376,7 @@ mod tests { .with_schema_id(0) .with_summary(Summary { operation: Operation::Append, - other: HashMap::from_iter([ + additional_properties: HashMap::from_iter([ ("spark.app.id", "local-1646787004168"), ("added-data-files", "1"), ("added-records", "1"), diff --git a/crates/e2e_test/tests/append_data_file_test.rs b/crates/e2e_test/tests/append_data_file_test.rs index 101101151..92d91b696 100644 --- a/crates/e2e_test/tests/append_data_file_test.rs +++ b/crates/e2e_test/tests/append_data_file_test.rs @@ -17,6 +17,9 @@ //! Integration tests for rest catalog. +use std::collections::HashMap; +use std::sync::Arc; + use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; @@ -35,8 +38,6 @@ use iceberg_test_utils::{normalize_test_name, set_up}; use parquet::arrow::arrow_reader::ArrowReaderOptions; use parquet::file::properties::WriterProperties; use port_scanner::scan_port_addr; -use std::collections::HashMap; -use std::sync::Arc; use tokio::time::sleep; const REST_CATALOG_PORT: u16 = 8181; @@ -80,7 +81,7 @@ async fn set_test_fixture(func: &str) -> TestFixture { (S3_REGION.to_string(), "us-east-1".to_string()), ])) .build(); - let rest_catalog = RestCatalog::new(config).await.unwrap(); + let rest_catalog = RestCatalog::new(config); TestFixture { _docker_compose: docker_compose, @@ -145,7 +146,7 @@ async fn test_append_data_file() { ); let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), - schema.clone(), + table.metadata().current_schema().clone(), table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), @@ -158,14 +159,11 @@ async fn test_append_data_file() { let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(col1) as ArrayRef, - Arc::new(col2) as ArrayRef, - Arc::new(col3) as ArrayRef, - ], - ) + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) .unwrap(); data_file_writer.write(batch.clone()).await.unwrap(); let data_file = data_file_writer.close().await.unwrap(); diff --git a/crates/e2e_test/tests/conflict_commit_test.rs b/crates/e2e_test/tests/conflict_commit_test.rs index e9e4606a2..7890b2b12 100644 --- a/crates/e2e_test/tests/conflict_commit_test.rs +++ b/crates/e2e_test/tests/conflict_commit_test.rs @@ -17,6 +17,9 @@ //! Integration tests for rest catalog. +use std::collections::HashMap; +use std::sync::Arc; + use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use futures::TryStreamExt; use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; @@ -34,8 +37,6 @@ use iceberg_test_utils::docker::DockerCompose; use iceberg_test_utils::{normalize_test_name, set_up}; use parquet::file::properties::WriterProperties; use port_scanner::scan_port_addr; -use std::collections::HashMap; -use std::sync::Arc; use tokio::time::sleep; const REST_CATALOG_PORT: u16 = 8181; @@ -79,7 +80,7 @@ async fn set_test_fixture(func: &str) -> TestFixture { (S3_REGION.to_string(), "us-east-1".to_string()), ])) .build(); - let rest_catalog = RestCatalog::new(config).await.unwrap(); + let rest_catalog = RestCatalog::new(config); TestFixture { _docker_compose: docker_compose, @@ -144,7 +145,7 @@ async fn test_append_data_file_conflict() { ); let parquet_writer_builder = ParquetWriterBuilder::new( WriterProperties::default(), - schema.clone(), + table.metadata().current_schema().clone(), table.file_io().clone(), location_generator.clone(), file_name_generator.clone(), @@ -157,14 +158,11 @@ async fn test_append_data_file_conflict() { let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]); let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]); let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]); - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(col1) as ArrayRef, - Arc::new(col2) as ArrayRef, - Arc::new(col3) as ArrayRef, - ], - ) + let batch = RecordBatch::try_new(schema.clone(), vec![ + Arc::new(col1) as ArrayRef, + Arc::new(col2) as ArrayRef, + Arc::new(col3) as ArrayRef, + ]) .unwrap(); data_file_writer.write(batch.clone()).await.unwrap(); let data_file = data_file_writer.close().await.unwrap(); diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 54abe8083..3d9f0db8d 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -960,7 +960,7 @@ mod tests { .with_schema_id(1) .with_summary(Summary { operation: Operation::Append, - other: HashMap::default(), + additional_properties: HashMap::default(), }) .build(), }; diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 731072a5a..2ea75b637 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -185,7 +185,7 @@ mod tests { use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest, ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, - ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, + ManifestWriter, Struct, TableMetadata, }; use crate::table::Table; use crate::TableIdent; @@ -293,9 +293,7 @@ mod tests { .new_output(current_snapshot.manifest_list()) .unwrap(), current_snapshot.snapshot_id(), - current_snapshot - .parent_snapshot_id() - .unwrap_or(EMPTY_SNAPSHOT_ID), + current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), ); manifest_list_write diff --git a/crates/iceberg/src/spec/datatypes.rs b/crates/iceberg/src/spec/datatypes.rs index a43ed668b..d38245960 100644 --- a/crates/iceberg/src/spec/datatypes.rs +++ b/crates/iceberg/src/spec/datatypes.rs @@ -102,23 +102,6 @@ impl fmt::Display for Type { } impl Type { - /// Check whether literal is compatible with the type. - pub fn compatible(&self, literal: &Literal) -> bool { - match (self, literal) { - (Type::Primitive(primitive), Literal::Primitive(primitive_literal)) => { - primitive.compatible(primitive_literal) - } - (Type::Struct(struct_type), Literal::Struct(struct_literal)) => { - struct_type.compatible(struct_literal) - } - (Type::List(list_type), Literal::List(list_literal)) => { - list_type.compatible(list_literal) - } - (Type::Map(map_type), Literal::Map(map_literal)) => map_type.compatible(map_literal), - _ => false, - } - } - /// Whether the type is primitive type. #[inline(always)] pub fn is_primitive(&self) -> bool { @@ -311,29 +294,6 @@ impl<'de> Deserialize<'de> for PrimitiveType { } } -impl PrimitiveType { - /// Check whether literal is compatible with the type. - pub fn compatible(&self, literal: &PrimitiveLiteral) -> bool { - matches!( - (self, literal), - (PrimitiveType::Boolean, PrimitiveLiteral::Boolean(_)) - | (PrimitiveType::Int, PrimitiveLiteral::Int(_)) - | (PrimitiveType::Long, PrimitiveLiteral::Long(_)) - | (PrimitiveType::Float, PrimitiveLiteral::Float(_)) - | (PrimitiveType::Double, PrimitiveLiteral::Double(_)) - | (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Decimal(_)) - | (PrimitiveType::Date, PrimitiveLiteral::Date(_)) - | (PrimitiveType::Time, PrimitiveLiteral::Time(_)) - | (PrimitiveType::Timestamp, PrimitiveLiteral::Timestamp(_)) - | (PrimitiveType::Timestamptz, PrimitiveLiteral::TimestampTZ(_)) - | (PrimitiveType::String, PrimitiveLiteral::String(_)) - | (PrimitiveType::Uuid, PrimitiveLiteral::UUID(_)) - | (PrimitiveType::Fixed(_), PrimitiveLiteral::Fixed(_)) - | (PrimitiveType::Binary, PrimitiveLiteral::Binary(_)) - ) - } -} - impl Serialize for PrimitiveType { fn serialize(&self, serializer: S) -> std::result::Result where S: Serializer { @@ -521,17 +481,6 @@ impl StructType { pub fn fields(&self) -> &[NestedFieldRef] { &self.fields } - - /// Check whether literal is compatible with the type. - pub fn compatible(&self, struct_literal: &Struct) -> bool { - if self.fields().len() != struct_literal.fields().len() { - return false; - } - self.fields() - .iter() - .zip(struct_literal.fields()) - .all(|(field, literal)| field.field_type.compatible(literal)) - } } impl PartialEq for StructType { diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 5363d10a8..7ef6b8c31 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -106,15 +106,17 @@ impl std::fmt::Debug for ManifestListWriter { impl ManifestListWriter { /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. - pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self { - let metadata = HashMap::from_iter([ + pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option) -> Self { + let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), - ( - "parent-snapshot-id".to_string(), - parent_snapshot_id.to_string(), - ), ("format-version".to_string(), "1".to_string()), ]); + if let Some(parent_snapshot_id) = parent_snapshot_id { + metadata.insert( + "parent-snapshot-id".to_string(), + parent_snapshot_id.to_string(), + ); + } Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id) } @@ -582,6 +584,18 @@ pub struct ManifestFile { pub key_metadata: Vec, } +impl ManifestFile { + /// Checks if the manifest file has any added files. + pub fn has_added_files(&self) -> bool { + self.added_files_count.is_none() || self.added_files_count.unwrap() > 0 + } + + /// Checks if the manifest file has any existed files. + pub fn has_existing_files(&self) -> bool { + self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0 + } +} + /// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests #[derive(Debug, PartialEq, Clone, Eq)] pub enum ManifestContentType { @@ -1148,7 +1162,7 @@ mod test { let mut writer = ManifestListWriter::v1( file_io.new_output(full_path.clone()).unwrap(), 1646658105718557341, - 1646658105718557341, + Some(1646658105718557341), ); writer @@ -1337,7 +1351,7 @@ mod test { let io = FileIOBuilder::new_fs_io().build().unwrap(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0); + let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index 09fa7dc87..0240afaba 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -59,7 +59,7 @@ pub struct Summary { pub operation: Operation, /// Other summary data. #[serde(flatten)] - pub other: HashMap, + pub additional_properties: HashMap, } impl Default for Operation { @@ -291,7 +291,7 @@ pub(super) mod _serde { }, summary: v1.summary.unwrap_or(Summary { operation: Operation::default(), - other: HashMap::new(), + additional_properties: HashMap::new(), }), schema_id: v1.schema_id, }) @@ -378,11 +378,6 @@ impl SnapshotRetention { max_ref_age_ms, } } - - /// Create a new tag retention policy - pub fn tag(max_ref_age_ms: i64) -> Self { - SnapshotRetention::Tag { max_ref_age_ms } - } } #[cfg(test)] @@ -421,7 +416,7 @@ mod tests { assert_eq!( Summary { operation: Operation::Append, - other: HashMap::new() + additional_properties: HashMap::new() }, *result.summary() ); diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index cde709375..71ed32260 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -189,6 +189,18 @@ impl TableMetadata { self.last_sequence_number } + /// Returns the next sequence number for the table. + /// + /// For format version 1, it always returns the initial sequence number. + /// For other versions, it returns the last sequence number incremented by 1. + #[inline] + pub fn next_sequence_number(&self) -> i64 { + match self.format_version { + FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER, + _ => self.last_sequence_number + 1, + } + } + /// Returns last updated time. #[inline] pub fn last_updated_timestamp(&self) -> Result> { @@ -1463,7 +1475,7 @@ mod tests { .with_sequence_number(0) .with_schema_id(0) .with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro") - .with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) + .with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) }) .build(); let expected = TableMetadata { @@ -1864,7 +1876,7 @@ mod tests { .with_manifest_list("s3://a/b/1.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); @@ -1877,7 +1889,7 @@ mod tests { .with_manifest_list("s3://a/b/2.avro") .with_summary(Summary { operation: Operation::Append, - other: HashMap::new(), + additional_properties: HashMap::new(), }) .build(); diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 7f163ac03..809d70df4 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -1537,6 +1537,14 @@ impl Literal { })?; Ok(Self::decimal(decimal.mantissa())) } + + /// Attempts to convert the Literal to a PrimitiveLiteral + pub fn as_primitive_literal(&self) -> Option { + match self { + Literal::Primitive(primitive) => Some(primitive.clone()), + _ => None, + } + } } /// The partition struct stores the tuple of partition values for each file. @@ -1575,6 +1583,8 @@ impl Struct { /// returns true if the field at position `index` is null pub fn is_null_at_index(&self, index: usize) -> bool { self.null_bitmap[index] + } + /// Return fields in the struct. pub fn fields(&self) -> &[Literal] { &self.fields diff --git a/crates/iceberg/src/transaction.rs b/crates/iceberg/src/transaction.rs index 6c3590e0d..6d7f687a4 100644 --- a/crates/iceberg/src/transaction.rs +++ b/crates/iceberg/src/transaction.rs @@ -19,22 +19,26 @@ use std::cmp::Ordering; use std::collections::HashMap; +use std::future::Future; use std::mem::discriminant; +use std::ops::RangeFrom; +use std::sync::Arc; + +use uuid::Uuid; use crate::error::Result; +use crate::io::OutputFile; use crate::spec::{ DataFile, DataFileFormat, FormatVersion, Manifest, ManifestEntry, ManifestFile, - ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, PartitionSpec, Schema, - Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, Struct, - StructType, Summary, Transform, + ManifestListWriter, ManifestMetadata, ManifestWriter, NullOrder, Operation, PartitionSpec, + Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, + Struct, StructType, Summary, Transform, MAIN_BRANCH, }; use crate::table::Table; use crate::TableUpdate::UpgradeFormatVersion; use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate}; -const INITIAL_SEQUENCE_NUMBER: i64 = 0; const META_ROOT_PATH: &str = "metadata"; -const MAIN_BRANCH: &str = "main"; /// Table transaction. pub struct Transaction<'a> { @@ -142,12 +146,7 @@ impl<'a> Transaction<'a> { let schema = self.table.metadata().current_schema().as_ref().clone(); let schema_id = schema.schema_id(); let format_version = self.table.metadata().format_version(); - let partition_spec = self - .table - .metadata() - .default_partition_spec() - .map(|spec| spec.as_ref().clone()) - .unwrap_or_default(); + let partition_spec = self.table.metadata().default_partition_spec().clone(); let commit_uuid = commit_uuid.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); FastAppendAction::new( @@ -160,6 +159,7 @@ impl<'a> Transaction<'a> { partition_spec, key_metadata, commit_uuid, + HashMap::new(), ) } @@ -191,47 +191,174 @@ impl<'a> Transaction<'a> { /// FastAppendAction is a transaction action for fast append data files to the table. pub struct FastAppendAction<'a> { + snapshot_produce_action: SnapshotProduceAction<'a>, +} + +impl<'a> FastAppendAction<'a> { + pub(crate) fn new( + tx: Transaction<'a>, + parent_snapshot_id: Option, + snapshot_id: i64, + schema: Schema, + schema_id: i32, + format_version: FormatVersion, + partition_spec: Arc, + key_metadata: Vec, + commit_uuid: String, + snapshot_properties: HashMap, + ) -> Result { + Ok(Self { + snapshot_produce_action: SnapshotProduceAction::new( + tx, + snapshot_id, + parent_snapshot_id, + schema_id, + format_version, + partition_spec, + schema, + key_metadata, + commit_uuid, + snapshot_properties, + )?, + }) + } + + /// Add data files to the snapshot. + pub fn add_data_files( + &mut self, + data_files: impl IntoIterator, + ) -> Result<&mut Self> { + self.snapshot_produce_action.add_data_files(data_files)?; + Ok(self) + } + + /// Finished building the action and apply it to the transaction. + pub async fn apply(self) -> Result> { + self.snapshot_produce_action + .apply(FastAppendOperation, DefaultManifestProcess) + .await + } +} + +struct FastAppendOperation; + +impl SnapshotProduceOperation for FastAppendOperation { + fn operation(&self) -> Operation { + Operation::Append + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce + .parent_snapshot_id + .map(|id| snapshot_produce.tx.table.metadata().snapshot_by_id(id)) + .flatten() + else { + return Ok(vec![]); + }; + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.tx.table.file_io(), + &snapshot_produce.tx.table.metadata_ref(), + ) + .await?; + + Ok(manifest_list + .entries() + .iter() + .filter(|entry| { + entry.has_added_files() + || entry.has_existing_files() + || entry.added_snapshot_id == snapshot_produce.snapshot_id + }) + .cloned() + .collect()) + } +} + +trait SnapshotProduceOperation: Send + Sync { + fn operation(&self) -> Operation; + #[allow(unused)] + fn delete_entries( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future>> + Send; + fn existing_manifest( + &self, + snapshot_produce: &SnapshotProduceAction, + ) -> impl Future>> + Send; +} + +struct DefaultManifestProcess; + +impl ManifestProcess for DefaultManifestProcess { + fn process_manifeset(&self, manifests: Vec) -> Vec { + manifests + } +} + +trait ManifestProcess: Send + Sync { + fn process_manifeset(&self, manifests: Vec) -> Vec; +} + +struct SnapshotProduceAction<'a> { tx: Transaction<'a>, parent_snapshot_id: Option, snapshot_id: i64, - schema: Schema, schema_id: i32, format_version: FormatVersion, - partition_spec: PartitionSpec, + partition_spec: Arc, + schema: Schema, key_metadata: Vec, commit_uuid: String, - manifest_id: i64, - appended_data_files: Vec, + snapshot_properties: HashMap, + added_data_files: Vec, + + // A counter used to generate unique manifest file names. + // It starts from 0 and increments for each new manifest file. + // Note: This counter is limited to the range of (0..u64::MAX). + manifest_counter: RangeFrom, } -impl<'a> FastAppendAction<'a> { - #[allow(clippy::too_many_arguments)] +impl<'a> SnapshotProduceAction<'a> { pub(crate) fn new( tx: Transaction<'a>, - parent_snapshot_id: Option, snapshot_id: i64, - schema: Schema, + parent_snapshot_id: Option, schema_id: i32, format_version: FormatVersion, - partition_spec: PartitionSpec, + partition_spec: Arc, + schema: Schema, key_metadata: Vec, commit_uuid: String, + snapshot_properties: HashMap, ) -> Result { Ok(Self { tx, parent_snapshot_id, snapshot_id, - schema, schema_id, format_version, + commit_uuid, + snapshot_properties, + added_data_files: vec![], + manifest_counter: (0..).into_iter(), partition_spec, + schema, key_metadata, - commit_uuid, - manifest_id: 0, - appended_data_files: vec![], }) } @@ -250,7 +377,13 @@ impl<'a> FastAppendAction<'a> { .fields() .iter() .zip(partition_type.fields()) - .any(|(field, field_type)| !field_type.field_type.compatible(field)) + .any(|(field_from_value, field_from_type)| { + !field_from_type + .field_type + .as_primitive_type() + .unwrap() + .compatible(&field_from_value.as_primitive_literal().unwrap()) + }) { return Err(Error::new( ErrorKind::DataInvalid, @@ -278,52 +411,26 @@ impl<'a> FastAppendAction<'a> { &self.partition_spec.partition_type(&self.schema)?, )?; } - self.appended_data_files.extend(data_files); + self.added_data_files.extend(data_files); Ok(self) } - fn generate_manifest_file_path(&mut self) -> String { - let manifest_id = self.manifest_id; - self.manifest_id += 1; - format!( + fn new_manifest_output(&mut self) -> Result { + let new_manifest_path = format!( "{}/{}/{}-m{}.{}", self.tx.table.metadata().location(), META_ROOT_PATH, &self.commit_uuid, - manifest_id, + self.manifest_counter.next().unwrap(), DataFileFormat::Avro - ) - } - - async fn manifest_from_parent_snapshot(&self) -> Result> { - if let Some(snapshot) = self.tx.table.metadata().current_snapshot() { - let manifest_list = snapshot - .load_manifest_list(self.tx.table.file_io(), &self.tx.table.metadata_ref()) - .await?; - let mut manifest_files = Vec::with_capacity(manifest_list.entries().len()); - for entry in manifest_list.entries() { - // From: https://github.com/apache/iceberg-python/blob/659a951d6397ab280cae80206fe6e8e4be2d3738/pyiceberg/table/__init__.py#L2921 - // Why we need this? - if entry.added_snapshot_id == self.snapshot_id { - continue; - } - let manifest = entry.load_manifest(self.tx.table.file_io()).await?; - // Skip manifest with all delete entries. - if manifest.entries().iter().all(|entry| !entry.is_alive()) { - continue; - } - manifest_files.push(entry.clone()); - } - Ok(manifest_files) - } else { - Ok(vec![]) - } + ); + self.tx.table.file_io().new_output(new_manifest_path) } // Write manifest file for added data files and return the ManifestFile for ManifestList. - async fn manifest_for_data_file(&mut self) -> Result { - let appended_data_files = std::mem::take(&mut self.appended_data_files); - let manifest_entries = appended_data_files + async fn write_added_manifest(&mut self) -> Result { + let added_data_files = std::mem::take(&mut self.added_data_files); + let manifest_entries = added_data_files .into_iter() .map(|data_file| { let builder = ManifestEntry::builder() @@ -342,124 +449,99 @@ impl<'a> FastAppendAction<'a> { .schema(self.schema.clone()) .schema_id(self.schema_id) .format_version(self.format_version) - .partition_spec(self.partition_spec.clone()) + .partition_spec(self.partition_spec.as_ref().clone()) .content(crate::spec::ManifestContentType::Data) .build(); let manifest = Manifest::new(manifest_meta, manifest_entries); let writer = ManifestWriter::new( - self.tx - .table - .file_io() - .new_output(self.generate_manifest_file_path())?, + self.new_manifest_output()?, self.snapshot_id, self.key_metadata.clone(), ); writer.write(manifest).await } - // # TODO: - // Complete the summary. - fn summary(&self) -> Summary { - Summary { - operation: crate::spec::Operation::Append, - other: HashMap::new(), - } - } - - /// Finished building the action and apply it to the transaction. - pub async fn apply(mut self) -> Result> { - let summary = self.summary(); - let manifest = self.manifest_for_data_file().await?; - let existing_manifest_files = self.manifest_from_parent_snapshot().await?; + async fn manifest_file( + &mut self, + snapshot_produce_operation: &OP, + manifest_process: &MP, + ) -> Result> { + let added_manifest = self.write_added_manifest().await?; + let existing_manifests = snapshot_produce_operation.existing_manifest(self).await?; - let snapshot_produce_action = SnapshotProduceAction::new( - self.tx, - self.snapshot_id, - self.parent_snapshot_id, - self.schema_id, - self.format_version, - self.commit_uuid, - )?; - - snapshot_produce_action - .apply( - vec![manifest] - .into_iter() - .chain(existing_manifest_files.into_iter()), - summary, - ) - .await + let mut manifest_files = vec![added_manifest]; + manifest_files.extend(existing_manifests); + let manifest_files = manifest_process.process_manifeset(manifest_files); + Ok(manifest_files) } -} - -struct SnapshotProduceAction<'a> { - tx: Transaction<'a>, - - parent_snapshot_id: Option, - snapshot_id: i64, - schema_id: i32, - format_version: FormatVersion, - commit_uuid: String, -} - -impl<'a> SnapshotProduceAction<'a> { - pub(crate) fn new( - tx: Transaction<'a>, - snapshot_id: i64, - parent_snapshot_id: Option, - schema_id: i32, - format_version: FormatVersion, - commit_uuid: String, - ) -> Result { - Ok(Self { - tx, - parent_snapshot_id, - snapshot_id, - schema_id, - format_version, - commit_uuid, - }) + // # TODO + // Fullfill this function + fn summary(&self, snapshot_produce_operation: &OP) -> Summary { + Summary { + operation: snapshot_produce_operation.operation(), + additional_properties: self.snapshot_properties.clone(), + } } - fn generate_manifest_list_file_path(&self, next_seq_num: i64) -> String { + fn generate_manifest_list_file_path(&self, attempt: i64) -> String { format!( "{}/{}/snap-{}-{}-{}.{}", self.tx.table.metadata().location(), META_ROOT_PATH, self.snapshot_id, - next_seq_num, + attempt, self.commit_uuid, DataFileFormat::Avro ) } /// Finished building the action and apply it to the transaction. - pub async fn apply( + pub async fn apply( mut self, - manifest_files: impl IntoIterator, - summary: Summary, + snapshot_produce_operation: OP, + process: MP, ) -> Result> { - let next_seq_num = if self.format_version as u8 > 1u8 { - self.tx.table.metadata().last_sequence_number() + 1 - } else { - INITIAL_SEQUENCE_NUMBER + let new_manifests = self + .manifest_file(&snapshot_produce_operation, &process) + .await?; + let next_seq_num = self.tx.table.metadata().next_sequence_number(); + + let summary = self.summary(&snapshot_produce_operation); + + let manifest_list_path = self.generate_manifest_list_file_path(0); + + let mut manifest_list_writer = match self.tx.table.metadata().format_version() { + FormatVersion::V1 => ManifestListWriter::v1( + self.tx + .table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.parent_snapshot_id, + ), + FormatVersion::V2 => ManifestListWriter::v2( + self.tx + .table + .file_io() + .new_output(manifest_list_path.clone())?, + self.snapshot_id, + self.parent_snapshot_id, + next_seq_num, + ), }; - let commit_ts = chrono::Utc::now().timestamp_millis(); - let manifest_list_path = self.generate_manifest_list_file_path(next_seq_num); - - let mut manifest_list_writer = ManifestListWriter::v2( - self.tx - .table - .file_io() - .new_output(manifest_list_path.clone())?, - self.snapshot_id, - self.parent_snapshot_id, - next_seq_num, - ); - manifest_list_writer.add_manifests(manifest_files.into_iter())?; + manifest_list_writer.add_manifests(new_manifests.into_iter())?; manifest_list_writer.close().await?; + let input = self + .tx + .table + .file_io() + .new_input(manifest_list_path.clone()) + .unwrap(); + println!("{}", input.exists().await?); + + let commit_ts = chrono::Utc::now().timestamp_millis(); let new_snapshot = Snapshot::builder() .with_manifest_list(manifest_list_path) .with_snapshot_id(self.snapshot_id) @@ -470,7 +552,6 @@ impl<'a> SnapshotProduceAction<'a> { .with_timestamp_ms(commit_ts) .build(); - let new_snapshot_id = new_snapshot.snapshot_id(); self.tx.append_updates(vec![ TableUpdate::AddSnapshot { snapshot: new_snapshot, @@ -478,7 +559,7 @@ impl<'a> SnapshotProduceAction<'a> { TableUpdate::SetSnapshotRef { ref_name: MAIN_BRANCH.to_string(), reference: SnapshotReference::new( - new_snapshot_id, + self.snapshot_id, SnapshotRetention::branch(None, None, None), ), }, @@ -577,10 +658,13 @@ mod tests { use std::fs::File; use std::io::BufReader; - use crate::io::FileIO; - use crate::spec::{FormatVersion, TableMetadata}; + use crate::io::FileIOBuilder; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Struct, + TableMetadata, + }; use crate::table::Table; - use crate::transaction::{Transaction, MAIN_BRACNH}; + use crate::transaction::{Transaction, MAIN_BRANCH}; use crate::{TableIdent, TableRequirement, TableUpdate}; fn make_v1_table() -> Table { @@ -618,6 +702,7 @@ mod tests { .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap()) .file_io(FileIOBuilder::new("memory").build().unwrap()) .build() + .unwrap() } fn make_v2_minimal_table() -> Table { @@ -740,7 +825,7 @@ mod tests { } #[tokio::test] - async fn test_merge_snapshot_action() { + async fn test_fast_append_action() { let table = make_v2_minimal_table(); let tx = Transaction::new(&table);