Skip to content

Commit

Permalink
refactor append action of transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
ZENOTME committed Sep 26, 2024
1 parent fc6b8f2 commit 7229e59
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 253 deletions.
2 changes: 1 addition & 1 deletion crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1376,7 +1376,7 @@ mod tests {
.with_schema_id(0)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::from_iter([
additional_properties: HashMap::from_iter([
("spark.app.id", "local-1646787004168"),
("added-data-files", "1"),
("added-records", "1"),
Expand Down
22 changes: 10 additions & 12 deletions crates/e2e_test/tests/append_data_file_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

//! Integration tests for rest catalog.

use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
Expand All @@ -35,8 +38,6 @@ use iceberg_test_utils::{normalize_test_name, set_up};
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::file::properties::WriterProperties;
use port_scanner::scan_port_addr;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::sleep;

const REST_CATALOG_PORT: u16 = 8181;
Expand Down Expand Up @@ -80,7 +81,7 @@ async fn set_test_fixture(func: &str) -> TestFixture {
(S3_REGION.to_string(), "us-east-1".to_string()),
]))
.build();
let rest_catalog = RestCatalog::new(config).await.unwrap();
let rest_catalog = RestCatalog::new(config);

TestFixture {
_docker_compose: docker_compose,
Expand Down Expand Up @@ -145,7 +146,7 @@ async fn test_append_data_file() {
);
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
schema.clone(),
table.metadata().current_schema().clone(),
table.file_io().clone(),
location_generator.clone(),
file_name_generator.clone(),
Expand All @@ -158,14 +159,11 @@ async fn test_append_data_file() {
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(col1) as ArrayRef,
Arc::new(col2) as ArrayRef,
Arc::new(col3) as ArrayRef,
],
)
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(col1) as ArrayRef,
Arc::new(col2) as ArrayRef,
Arc::new(col3) as ArrayRef,
])
.unwrap();
data_file_writer.write(batch.clone()).await.unwrap();
let data_file = data_file_writer.close().await.unwrap();
Expand Down
22 changes: 10 additions & 12 deletions crates/e2e_test/tests/conflict_commit_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

//! Integration tests for rest catalog.

use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray};
use futures::TryStreamExt;
use iceberg::io::{S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY};
Expand All @@ -34,8 +37,6 @@ use iceberg_test_utils::docker::DockerCompose;
use iceberg_test_utils::{normalize_test_name, set_up};
use parquet::file::properties::WriterProperties;
use port_scanner::scan_port_addr;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::time::sleep;

const REST_CATALOG_PORT: u16 = 8181;
Expand Down Expand Up @@ -79,7 +80,7 @@ async fn set_test_fixture(func: &str) -> TestFixture {
(S3_REGION.to_string(), "us-east-1".to_string()),
]))
.build();
let rest_catalog = RestCatalog::new(config).await.unwrap();
let rest_catalog = RestCatalog::new(config);

TestFixture {
_docker_compose: docker_compose,
Expand Down Expand Up @@ -144,7 +145,7 @@ async fn test_append_data_file_conflict() {
);
let parquet_writer_builder = ParquetWriterBuilder::new(
WriterProperties::default(),
schema.clone(),
table.metadata().current_schema().clone(),
table.file_io().clone(),
location_generator.clone(),
file_name_generator.clone(),
Expand All @@ -157,14 +158,11 @@ async fn test_append_data_file_conflict() {
let col1 = StringArray::from(vec![Some("foo"), Some("bar"), None, Some("baz")]);
let col2 = Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4)]);
let col3 = BooleanArray::from(vec![Some(true), Some(false), None, Some(false)]);
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(col1) as ArrayRef,
Arc::new(col2) as ArrayRef,
Arc::new(col3) as ArrayRef,
],
)
let batch = RecordBatch::try_new(schema.clone(), vec![
Arc::new(col1) as ArrayRef,
Arc::new(col2) as ArrayRef,
Arc::new(col3) as ArrayRef,
])
.unwrap();
data_file_writer.write(batch.clone()).await.unwrap();
let data_file = data_file_writer.close().await.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ mod tests {
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::default(),
additional_properties: HashMap::default(),
})
.build(),
};
Expand Down
6 changes: 2 additions & 4 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ mod tests {
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
ManifestWriter, Struct, TableMetadata,
};
use crate::table::Table;
use crate::TableIdent;
Expand Down Expand Up @@ -293,9 +293,7 @@ mod tests {
.new_output(current_snapshot.manifest_list())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
.parent_snapshot_id()
.unwrap_or(EMPTY_SNAPSHOT_ID),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
);
manifest_list_write
Expand Down
51 changes: 0 additions & 51 deletions crates/iceberg/src/spec/datatypes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,23 +102,6 @@ impl fmt::Display for Type {
}

impl Type {
/// Check whether literal is compatible with the type.
pub fn compatible(&self, literal: &Literal) -> bool {
match (self, literal) {
(Type::Primitive(primitive), Literal::Primitive(primitive_literal)) => {
primitive.compatible(primitive_literal)
}
(Type::Struct(struct_type), Literal::Struct(struct_literal)) => {
struct_type.compatible(struct_literal)
}
(Type::List(list_type), Literal::List(list_literal)) => {
list_type.compatible(list_literal)
}
(Type::Map(map_type), Literal::Map(map_literal)) => map_type.compatible(map_literal),
_ => false,
}
}

/// Whether the type is primitive type.
#[inline(always)]
pub fn is_primitive(&self) -> bool {
Expand Down Expand Up @@ -311,29 +294,6 @@ impl<'de> Deserialize<'de> for PrimitiveType {
}
}

impl PrimitiveType {
/// Check whether literal is compatible with the type.
pub fn compatible(&self, literal: &PrimitiveLiteral) -> bool {
matches!(
(self, literal),
(PrimitiveType::Boolean, PrimitiveLiteral::Boolean(_))
| (PrimitiveType::Int, PrimitiveLiteral::Int(_))
| (PrimitiveType::Long, PrimitiveLiteral::Long(_))
| (PrimitiveType::Float, PrimitiveLiteral::Float(_))
| (PrimitiveType::Double, PrimitiveLiteral::Double(_))
| (PrimitiveType::Decimal { .. }, PrimitiveLiteral::Decimal(_))
| (PrimitiveType::Date, PrimitiveLiteral::Date(_))
| (PrimitiveType::Time, PrimitiveLiteral::Time(_))
| (PrimitiveType::Timestamp, PrimitiveLiteral::Timestamp(_))
| (PrimitiveType::Timestamptz, PrimitiveLiteral::TimestampTZ(_))
| (PrimitiveType::String, PrimitiveLiteral::String(_))
| (PrimitiveType::Uuid, PrimitiveLiteral::UUID(_))
| (PrimitiveType::Fixed(_), PrimitiveLiteral::Fixed(_))
| (PrimitiveType::Binary, PrimitiveLiteral::Binary(_))
)
}
}

impl Serialize for PrimitiveType {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where S: Serializer {
Expand Down Expand Up @@ -521,17 +481,6 @@ impl StructType {
pub fn fields(&self) -> &[NestedFieldRef] {
&self.fields
}

/// Check whether literal is compatible with the type.
pub fn compatible(&self, struct_literal: &Struct) -> bool {
if self.fields().len() != struct_literal.fields().len() {
return false;
}
self.fields()
.iter()
.zip(struct_literal.fields())
.all(|(field, literal)| field.field_type.compatible(literal))
}
}

impl PartialEq for StructType {
Expand Down
30 changes: 22 additions & 8 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,17 @@ impl std::fmt::Debug for ManifestListWriter {

impl ManifestListWriter {
/// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`].
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: i64) -> Self {
let metadata = HashMap::from_iter([
pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option<i64>) -> Self {
let mut metadata = HashMap::from_iter([
("snapshot-id".to_string(), snapshot_id.to_string()),
(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
),
("format-version".to_string(), "1".to_string()),
]);
if let Some(parent_snapshot_id) = parent_snapshot_id {
metadata.insert(
"parent-snapshot-id".to_string(),
parent_snapshot_id.to_string(),
);
}
Self::new(FormatVersion::V1, output_file, metadata, 0, snapshot_id)
}

Expand Down Expand Up @@ -582,6 +584,18 @@ pub struct ManifestFile {
pub key_metadata: Vec<u8>,
}

impl ManifestFile {
/// Checks if the manifest file has any added files.
pub fn has_added_files(&self) -> bool {
self.added_files_count.is_none() || self.added_files_count.unwrap() > 0
}

/// Checks if the manifest file has any existed files.
pub fn has_existing_files(&self) -> bool {
self.existing_files_count.is_none() || self.existing_files_count.unwrap() > 0
}
}

/// The type of files tracked by the manifest, either data or delete files; Data(0) for all v1 manifests
#[derive(Debug, PartialEq, Clone, Eq)]
pub enum ManifestContentType {
Expand Down Expand Up @@ -1148,7 +1162,7 @@ mod test {
let mut writer = ManifestListWriter::v1(
file_io.new_output(full_path.clone()).unwrap(),
1646658105718557341,
1646658105718557341,
Some(1646658105718557341),
);

writer
Expand Down Expand Up @@ -1337,7 +1351,7 @@ mod test {
let io = FileIOBuilder::new_fs_io().build().unwrap();
let output_file = io.new_output(path.to_str().unwrap()).unwrap();

let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, 0);
let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0));
writer
.add_manifests(expected_manifest_list.entries.clone().into_iter())
.unwrap();
Expand Down
11 changes: 3 additions & 8 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct Summary {
pub operation: Operation,
/// Other summary data.
#[serde(flatten)]
pub other: HashMap<String, String>,
pub additional_properties: HashMap<String, String>,
}

impl Default for Operation {
Expand Down Expand Up @@ -291,7 +291,7 @@ pub(super) mod _serde {
},
summary: v1.summary.unwrap_or(Summary {
operation: Operation::default(),
other: HashMap::new(),
additional_properties: HashMap::new(),
}),
schema_id: v1.schema_id,
})
Expand Down Expand Up @@ -378,11 +378,6 @@ impl SnapshotRetention {
max_ref_age_ms,
}
}

/// Create a new tag retention policy
pub fn tag(max_ref_age_ms: i64) -> Self {
SnapshotRetention::Tag { max_ref_age_ms }
}
}

#[cfg(test)]
Expand Down Expand Up @@ -421,7 +416,7 @@ mod tests {
assert_eq!(
Summary {
operation: Operation::Append,
other: HashMap::new()
additional_properties: HashMap::new()
},
*result.summary()
);
Expand Down
18 changes: 15 additions & 3 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ impl TableMetadata {
self.last_sequence_number
}

/// Returns the next sequence number for the table.
///
/// For format version 1, it always returns the initial sequence number.
/// For other versions, it returns the last sequence number incremented by 1.
#[inline]
pub fn next_sequence_number(&self) -> i64 {
match self.format_version {
FormatVersion::V1 => INITIAL_SEQUENCE_NUMBER,
_ => self.last_sequence_number + 1,
}
}

/// Returns last updated time.
#[inline]
pub fn last_updated_timestamp(&self) -> Result<DateTime<Utc>> {
Expand Down Expand Up @@ -1463,7 +1475,7 @@ mod tests {
.with_sequence_number(0)
.with_schema_id(0)
.with_manifest_list("/home/iceberg/warehouse/nyc/taxis/metadata/snap-638933773299822130-1-7e6760f0-4f6c-4b23-b907-0a5a174e3863.avro")
.with_summary(Summary { operation: Operation::Append, other: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.with_summary(Summary { operation: Operation::Append, additional_properties: HashMap::from_iter(vec![("spark.app.id".to_string(), "local-1662532784305".to_string()), ("added-data-files".to_string(), "4".to_string()), ("added-records".to_string(), "4".to_string()), ("added-files-size".to_string(), "6001".to_string())]) })
.build();

let expected = TableMetadata {
Expand Down Expand Up @@ -1864,7 +1876,7 @@ mod tests {
.with_manifest_list("s3://a/b/1.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

Expand All @@ -1877,7 +1889,7 @@ mod tests {
.with_manifest_list("s3://a/b/2.avro")
.with_summary(Summary {
operation: Operation::Append,
other: HashMap::new(),
additional_properties: HashMap::new(),
})
.build();

Expand Down
10 changes: 10 additions & 0 deletions crates/iceberg/src/spec/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,14 @@ impl Literal {
})?;
Ok(Self::decimal(decimal.mantissa()))
}

/// Attempts to convert the Literal to a PrimitiveLiteral
pub fn as_primitive_literal(&self) -> Option<PrimitiveLiteral> {
match self {
Literal::Primitive(primitive) => Some(primitive.clone()),
_ => None,
}
}
}

/// The partition struct stores the tuple of partition values for each file.
Expand Down Expand Up @@ -1575,6 +1583,8 @@ impl Struct {
/// returns true if the field at position `index` is null
pub fn is_null_at_index(&self, index: usize) -> bool {
self.null_bitmap[index]
}

/// Return fields in the struct.
pub fn fields(&self) -> &[Literal] {
&self.fields
Expand Down
Loading

0 comments on commit 7229e59

Please sign in to comment.