diff --git a/Cargo.toml b/Cargo.toml index a9886b03a..e7918d9ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,5 +66,4 @@ tokio = { version = "1", features = ["macros"] } typed-builder = "^0.18" url = "2" urlencoding = "2" -uuid = "1.6.1" - +uuid = "1.6.1" \ No newline at end of file diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index 085ac1e28..e5ca45cdd 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -63,4 +63,5 @@ uuid = { workspace = true } [dev-dependencies] pretty_assertions = { workspace = true } tempfile = { workspace = true } +tera = { workspace = true } tokio = { workspace = true } diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 55c010b09..c85140222 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -325,6 +325,8 @@ define_from_err!( "Failed to convert decimal literal to rust decimal" ); +define_from_err!(std::io::Error, ErrorKind::Unexpected, "IO Operation failed"); + /// Helper macro to check arguments. /// /// diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index 1ac65f872..7d652d8b0 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -45,6 +45,8 @@ mod avro; pub mod io; pub mod spec; +mod scan; + #[allow(dead_code)] pub mod expr; pub mod transaction; diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs new file mode 100644 index 000000000..a94351fdd --- /dev/null +++ b/crates/iceberg/src/scan.rs @@ -0,0 +1,448 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Table scan api. + +use crate::io::FileIO; +use crate::spec::{DataContentType, ManifestEntryRef, SchemaRef, SnapshotRef, TableMetadataRef}; +use crate::table::Table; +use crate::{Error, ErrorKind}; +use arrow_array::RecordBatch; +use futures::stream::{iter, BoxStream}; +use futures::StreamExt; + +/// Builder to create table scan. +pub struct TableScanBuilder<'a> { + table: &'a Table, + // Empty column names means to select all columns + column_names: Vec, + snapshot_id: Option, +} + +impl<'a> TableScanBuilder<'a> { + pub fn new(table: &'a Table) -> Self { + Self { + table, + column_names: vec![], + snapshot_id: None, + } + } + + /// Select all columns. + pub fn select_all(mut self) -> Self { + self.column_names.clear(); + self + } + + /// Select some columns of the table. + pub fn select(mut self, column_names: impl IntoIterator) -> Self { + self.column_names = column_names + .into_iter() + .map(|item| item.to_string()) + .collect(); + self + } + + /// Set the snapshot to scan. When not set, it uses current snapshot. + pub fn snapshot_id(mut self, snapshot_id: i64) -> Self { + self.snapshot_id = Some(snapshot_id); + self + } + + /// Build the table scan. + pub fn build(self) -> crate::Result { + let snapshot = match self.snapshot_id { + Some(snapshot_id) => self + .table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot with id {} not found", snapshot_id), + ) + })? + .clone(), + None => self + .table + .metadata() + .current_snapshot() + .ok_or_else(|| { + Error::new( + ErrorKind::FeatureUnsupported, + "Can't scan table without snapshots", + ) + })? + .clone(), + }; + + let schema = snapshot.schema(self.table.metadata())?; + + // Check that all column names exist in the schema. + if !self.column_names.is_empty() { + for column_name in &self.column_names { + if schema.field_by_name(column_name).is_none() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Column {} not found in table.", column_name), + )); + } + } + } + + Ok(TableScan { + snapshot, + file_io: self.table.file_io().clone(), + table_metadata: self.table.metadata_ref(), + column_names: self.column_names, + schema, + }) + } +} + +/// Table scan. +#[derive(Debug)] +#[allow(dead_code)] +pub struct TableScan { + snapshot: SnapshotRef, + table_metadata: TableMetadataRef, + file_io: FileIO, + column_names: Vec, + schema: SchemaRef, +} + +/// A stream of [`FileScanTask`]. +pub type FileScanTaskStream = BoxStream<'static, crate::Result>; + +impl TableScan { + /// Returns a stream of file scan tasks. + pub async fn plan_files(&self) -> crate::Result { + let manifest_list = self + .snapshot + .load_manifest_list(&self.file_io, &self.table_metadata) + .await?; + + // Generate data file stream + let mut file_scan_tasks = Vec::with_capacity(manifest_list.entries().len()); + for manifest_list_entry in manifest_list.entries().iter() { + // Data file + let manifest = manifest_list_entry.load_manifest(&self.file_io).await?; + + for manifest_entry in manifest.entries().iter().filter(|e| e.is_alive()) { + match manifest_entry.content_type() { + DataContentType::EqualityDeletes | DataContentType::PositionDeletes => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "Delete files are not supported yet.", + )); + } + DataContentType::Data => { + file_scan_tasks.push(Ok(FileScanTask { + data_file: manifest_entry.clone(), + start: 0, + length: manifest_entry.file_size_in_bytes(), + })); + } + } + } + } + + Ok(iter(file_scan_tasks).boxed()) + } +} + +/// A task to scan part of file. +#[derive(Debug)] +#[allow(dead_code)] +pub struct FileScanTask { + data_file: ManifestEntryRef, + start: u64, + length: u64, +} + +/// A stream of arrow record batches. +pub type ArrowRecordBatchStream = BoxStream<'static, crate::Result>; + +impl FileScanTask { + /// Returns a stream of arrow record batches. + pub async fn execute(&self) -> crate::Result { + todo!() + } +} + +#[cfg(test)] +mod tests { + use crate::io::{FileIO, OutputFile}; + use crate::spec::{ + DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest, + ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus, + ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID, + }; + use crate::table::Table; + use crate::TableIdent; + use futures::TryStreamExt; + use std::fs; + use tempfile::TempDir; + use tera::{Context, Tera}; + use uuid::Uuid; + + struct TableTestFixture { + table_location: String, + table: Table, + } + + impl TableTestFixture { + fn new() -> Self { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().join("table1"); + let manifest_list1_location = table_location.join("metadata/manifests_list_1.avro"); + let manifest_list2_location = table_location.join("metadata/manifests_list_2.avro"); + let table_metadata1_location = table_location.join("metadata/v1.json"); + + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + let table_metadata = { + let template_json_str = fs::read_to_string(format!( + "{}/testdata/example_table_metadata_v2.json", + env!("CARGO_MANIFEST_DIR") + )) + .unwrap(); + let mut context = Context::new(); + context.insert("table_location", &table_location); + context.insert("manifest_list_1_location", &manifest_list1_location); + context.insert("manifest_list_2_location", &manifest_list2_location); + context.insert("table_metadata_1_location", &table_metadata1_location); + + let metadata_json = Tera::one_off(&template_json_str, &context, false).unwrap(); + serde_json::from_str::(&metadata_json).unwrap() + }; + + let table = Table::builder() + .metadata(table_metadata) + .identifier(TableIdent::from_strs(["db", "table1"]).unwrap()) + .file_io(file_io) + .metadata_location(table_metadata1_location.as_os_str().to_str().unwrap()) + .build(); + + Self { + table_location: table_location.to_str().unwrap().to_string(), + table, + } + } + + fn next_manifest_file(&self) -> OutputFile { + self.table + .file_io() + .new_output(format!( + "{}/metadata/manifest_{}.avro", + self.table_location, + Uuid::new_v4() + )) + .unwrap() + } + } + + #[test] + fn test_table_scan_columns() { + let table = TableTestFixture::new().table; + + let table_scan = table.scan().select(["x", "y"]).build().unwrap(); + assert_eq!(vec!["x", "y"], table_scan.column_names); + + let table_scan = table + .scan() + .select(["x", "y"]) + .select(["z"]) + .build() + .unwrap(); + assert_eq!(vec!["z"], table_scan.column_names); + } + + #[test] + fn test_select_all() { + let table = TableTestFixture::new().table; + + let table_scan = table.scan().select_all().build().unwrap(); + assert!(table_scan.column_names.is_empty()); + } + + #[test] + fn test_select_no_exist_column() { + let table = TableTestFixture::new().table; + + let table_scan = table.scan().select(["x", "y", "z", "a"]).build(); + assert!(table_scan.is_err()); + } + + #[test] + fn test_table_scan_default_snapshot_id() { + let table = TableTestFixture::new().table; + + let table_scan = table.scan().build().unwrap(); + assert_eq!( + table.metadata().current_snapshot().unwrap().snapshot_id(), + table_scan.snapshot.snapshot_id() + ); + } + + #[test] + fn test_table_scan_non_exist_snapshot_id() { + let table = TableTestFixture::new().table; + + let table_scan = table.scan().snapshot_id(1024).build(); + assert!(table_scan.is_err()); + } + + #[test] + fn test_table_scan_with_snapshot_id() { + let table = TableTestFixture::new().table; + + let table_scan = table + .scan() + .snapshot_id(3051729675574597004) + .build() + .unwrap(); + assert_eq!(table_scan.snapshot.snapshot_id(), 3051729675574597004); + } + + #[tokio::test] + async fn test_plan_files_no_deletions() { + let fixture = TableTestFixture::new(); + + let current_snapshot = fixture.table.metadata().current_snapshot().unwrap(); + let parent_snapshot = current_snapshot + .parent_snapshot(fixture.table.metadata()) + .unwrap(); + let current_schema = current_snapshot.schema(fixture.table.metadata()).unwrap(); + let current_partition_spec = fixture.table.metadata().default_partition_spec().unwrap(); + + // Write data files + let data_file_manifest = ManifestWriter::new( + fixture.next_manifest_file(), + current_snapshot.snapshot_id(), + vec![], + ) + .write(Manifest::new( + ManifestMetadata::builder() + .schema((*current_schema).clone()) + .content(ManifestContentType::Data) + .format_version(FormatVersion::V2) + .partition_spec((**current_partition_spec).clone()) + .schema_id(current_schema.schema_id()) + .build(), + vec![ + ManifestEntry::builder() + .status(ManifestStatus::Added) + .data_file( + DataFile::builder() + .content(DataContentType::Data) + .file_path(format!("{}/1.parquet", &fixture.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(100))])) + .build(), + ) + .build(), + ManifestEntry::builder() + .status(ManifestStatus::Deleted) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFile::builder() + .content(DataContentType::Data) + .file_path(format!("{}/2.parquet", &fixture.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(200))])) + .build(), + ) + .build(), + ManifestEntry::builder() + .status(ManifestStatus::Existing) + .snapshot_id(parent_snapshot.snapshot_id()) + .sequence_number(parent_snapshot.sequence_number()) + .file_sequence_number(parent_snapshot.sequence_number()) + .data_file( + DataFile::builder() + .content(DataContentType::Data) + .file_path(format!("{}/3.parquet", &fixture.table_location)) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(1) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build(), + ) + .build(), + ], + )) + .await + .unwrap(); + + // Write to manifest list + let mut manifest_list_write = ManifestListWriter::v2( + fixture + .table + .file_io() + .new_output(current_snapshot.manifest_list_file_path().unwrap()) + .unwrap(), + current_snapshot.snapshot_id(), + current_snapshot + .parent_snapshot_id() + .unwrap_or(EMPTY_SNAPSHOT_ID), + current_snapshot.sequence_number(), + ); + manifest_list_write + .add_manifest_entries(vec![data_file_manifest].into_iter()) + .unwrap(); + manifest_list_write.close().await.unwrap(); + + // Create table scan for current snapshot and plan files + let table_scan = fixture.table.scan().build().unwrap(); + let mut tasks = table_scan + .plan_files() + .await + .unwrap() + .try_fold(vec![], |mut acc, task| async move { + acc.push(task); + Ok(acc) + }) + .await + .unwrap(); + + assert_eq!(tasks.len(), 2); + + tasks.sort_by_key(|t| t.data_file.file_path().to_string()); + + // Check first task is added data file + assert_eq!( + tasks[0].data_file.file_path(), + format!("{}/1.parquet", &fixture.table_location) + ); + + // Check second task is existing data file + assert_eq!( + tasks[1].data_file.file_path(), + format!("{}/3.parquet", &fixture.table_location) + ); + } +} diff --git a/crates/iceberg/src/spec/manifest.rs b/crates/iceberg/src/spec/manifest.rs index d412a2070..bdd0d0a56 100644 --- a/crates/iceberg/src/spec/manifest.rs +++ b/crates/iceberg/src/spec/manifest.rs @@ -20,9 +20,10 @@ use self::_const_schema::{manifest_schema_v1, manifest_schema_v2}; use super::{ FieldSummary, FormatVersion, ManifestContentType, ManifestListEntry, PartitionSpec, Schema, - Struct, + SchemaId, Struct, INITIAL_SEQUENCE_NUMBER, }; use super::{Literal, UNASSIGNED_SEQUENCE_NUMBER}; +use crate::error::Result; use crate::io::OutputFile; use crate::spec::PartitionField; use crate::{Error, ErrorKind}; @@ -32,17 +33,19 @@ use serde_json::to_vec; use std::cmp::min; use std::collections::HashMap; use std::str::FromStr; +use std::sync::Arc; +use typed_builder::TypedBuilder; /// A manifest contains metadata and a list of entries. #[derive(Debug, PartialEq, Eq, Clone)] pub struct Manifest { metadata: ManifestMetadata, - entries: Vec, + entries: Vec, } impl Manifest { - /// Parse manifest from bytes of avro file. - pub fn parse_avro(bs: &[u8]) -> Result { + /// Parse manifest metadata and entries from bytes of avro file. + pub(crate) fn try_from_avro_bytes(bs: &[u8]) -> Result<(ManifestMetadata, Vec)> { let reader = AvroReader::new(bs)?; // Parse manifest metadata @@ -62,7 +65,7 @@ impl Manifest { from_value::<_serde::ManifestEntryV1>(&value?)? .try_into(&partition_type, &metadata.schema) }) - .collect::, Error>>()? + .collect::>>()? } FormatVersion::V2 => { let schema = manifest_schema_v2(partition_type.clone())?; @@ -73,11 +76,30 @@ impl Manifest { from_value::<_serde::ManifestEntryV2>(&value?)? .try_into(&partition_type, &metadata.schema) }) - .collect::, Error>>()? + .collect::>>()? } }; - Ok(Manifest { metadata, entries }) + Ok((metadata, entries)) + } + + /// Parse manifest from bytes of avro file. + pub fn parse_avro(bs: &[u8]) -> Result { + let (metadata, entries) = Self::try_from_avro_bytes(bs)?; + Ok(Self::new(metadata, entries)) + } + + /// Entries slice. + pub fn entries(&self) -> &[ManifestEntryRef] { + &self.entries + } + + /// Constructor from [`ManifestMetadata`] and [`ManifestEntry`]s. + pub fn new(metadata: ManifestMetadata, entries: Vec) -> Self { + Self { + metadata, + entries: entries.into_iter().map(Arc::new).collect(), + } } } @@ -174,7 +196,7 @@ impl ManifestWriter { } /// Write a manifest entry. - pub async fn write(mut self, manifest: Manifest) -> Result { + pub async fn write(mut self, manifest: Manifest) -> Result { // Create the avro writer let partition_type = manifest .metadata @@ -252,14 +274,16 @@ impl ManifestWriter { self.update_field_summary(&entry); let value = match manifest.metadata.format_version { - FormatVersion::V1 => { - to_value(_serde::ManifestEntryV1::try_from(entry, &partition_type)?)? - .resolve(&avro_schema)? - } - FormatVersion::V2 => { - to_value(_serde::ManifestEntryV2::try_from(entry, &partition_type)?)? - .resolve(&avro_schema)? - } + FormatVersion::V1 => to_value(_serde::ManifestEntryV1::try_from( + (*entry).clone(), + &partition_type, + )?)? + .resolve(&avro_schema)?, + FormatVersion::V2 => to_value(_serde::ManifestEntryV2::try_from( + (*entry).clone(), + &partition_type, + )?)? + .resolve(&avro_schema)?, }; avro_writer.append(value)?; @@ -677,13 +701,13 @@ mod _const_schema { } /// Meta data of a manifest that is stored in the key-value metadata of the Avro file -#[derive(Debug, PartialEq, Clone, Eq)] +#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)] pub struct ManifestMetadata { /// The table schema at the time the manifest /// was written schema: Schema, /// ID of the schema used to write the manifest as a string - schema_id: i32, + schema_id: SchemaId, /// The partition spec used to write the manifest partition_spec: PartitionSpec, /// Table format version number of the manifest as a string @@ -694,7 +718,7 @@ pub struct ManifestMetadata { impl ManifestMetadata { /// Parse from metadata in avro file. - pub fn parse(meta: &HashMap>) -> Result { + pub fn parse(meta: &HashMap>) -> Result { let schema = { let bs = meta.get("schema").ok_or_else(|| { Error::new( @@ -781,10 +805,13 @@ impl ManifestMetadata { } } +/// Reference to [`ManifestEntry`]. +pub type ManifestEntryRef = Arc; + /// A manifest is an immutable Avro file that lists data files or delete /// files, along with each file’s partition data tuple, metrics, and tracking /// information. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, PartialEq, Eq, Clone, TypedBuilder)] pub struct ManifestEntry { /// field: 0 /// @@ -794,16 +821,19 @@ pub struct ManifestEntry { /// /// Snapshot id where the file was added, or deleted if status is 2. /// Inherited when null. + #[builder(default, setter(strip_option))] snapshot_id: Option, /// field id: 3 /// /// Data sequence number of the file. /// Inherited when null and status is 1 (added). + #[builder(default, setter(strip_option))] sequence_number: Option, /// field id: 4 /// /// File sequence number indicating when the file was added. /// Inherited when null and status is 1 (added). + #[builder(default, setter(strip_option))] file_sequence_number: Option, /// field id: 2 /// @@ -819,6 +849,49 @@ impl ManifestEntry { ManifestStatus::Added | ManifestStatus::Existing ) } + + /// Content type of this manifest entry. + pub fn content_type(&self) -> DataContentType { + self.data_file.content + } + + /// Data file path of this manifest entry. + pub fn file_path(&self) -> &str { + &self.data_file.file_path + } + + /// Inherit data from manifest list, such as snapshot id, sequence number. + pub(crate) fn inherit_data(&mut self, snapshot_entry: &ManifestListEntry) { + if self.snapshot_id.is_none() { + self.snapshot_id = Some(snapshot_entry.added_snapshot_id); + } + + if self.sequence_number.is_none() + && (self.status == ManifestStatus::Added + || snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER) + { + self.sequence_number = Some(snapshot_entry.sequence_number); + } + + if self.file_sequence_number.is_none() + && (self.status == ManifestStatus::Added + || snapshot_entry.sequence_number == INITIAL_SEQUENCE_NUMBER) + { + self.file_sequence_number = Some(snapshot_entry.sequence_number); + } + } + + /// Data sequence number. + #[inline] + pub fn sequence_number(&self) -> Option { + self.sequence_number + } + + /// File size in bytes. + #[inline] + pub fn file_size_in_bytes(&self) -> u64 { + self.data_file.file_size_in_bytes + } } /// Used to track additions and deletions in ManifestEntry. @@ -837,7 +910,7 @@ pub enum ManifestStatus { impl TryFrom for ManifestStatus { type Error = Error; - fn try_from(v: i32) -> Result { + fn try_from(v: i32) -> Result { match v { 0 => Ok(ManifestStatus::Existing), 1 => Ok(ManifestStatus::Added), @@ -851,7 +924,7 @@ impl TryFrom for ManifestStatus { } /// Data file carries data file path, partition tuple, metrics, … -#[derive(Debug, PartialEq, Clone, Eq)] +#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)] pub struct DataFile { /// field id: 134 /// @@ -886,6 +959,7 @@ pub struct DataFile { /// Map from column id to the total size on disk of all regions that /// store the column. Does not include bytes necessary to read other /// columns, like footers. Leave null for row-oriented formats (Avro) + #[builder(default)] column_sizes: HashMap, /// field id: 109 /// key field id: 119 @@ -893,18 +967,21 @@ pub struct DataFile { /// /// Map from column id to number of values in the column (including null /// and NaN values) + #[builder(default)] value_counts: HashMap, /// field id: 110 /// key field id: 121 /// value field id: 122 /// /// Map from column id to number of null values in the column + #[builder(default)] null_value_counts: HashMap, /// field id: 137 /// key field id: 138 /// value field id: 139 /// /// Map from column id to number of NaN values in the column + #[builder(default)] nan_value_counts: HashMap, /// field id: 125 /// key field id: 126 @@ -917,6 +994,7 @@ pub struct DataFile { /// Reference: /// /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + #[builder(default)] lower_bounds: HashMap, /// field id: 128 /// key field id: 129 @@ -929,16 +1007,19 @@ pub struct DataFile { /// Reference: /// /// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization) + #[builder(default)] upper_bounds: HashMap, /// field id: 131 /// /// Implementation-specific key metadata for encryption + #[builder(default)] key_metadata: Vec, /// field id: 132 /// element field id: 133 /// /// Split offsets for the data file. For example, all row group offsets /// in a Parquet file. Must be sorted ascending + #[builder(default)] split_offsets: Vec, /// field id: 135 /// element field id: 136 @@ -947,6 +1028,7 @@ pub struct DataFile { /// Required when content is EqualityDeletes and should be null /// otherwise. Fields with ids listed in this column must be present /// in the delete file + #[builder(default)] equality_ids: Vec, /// field id: 140 /// @@ -958,6 +1040,7 @@ pub struct DataFile { /// sorted by file and position, not a table order, and should set sort /// order id to null. Readers must ignore sort order id for position /// delete files. + #[builder(default, setter(strip_option))] sort_order_id: Option, } @@ -976,7 +1059,7 @@ pub enum DataContentType { impl TryFrom for DataContentType { type Error = Error; - fn try_from(v: i32) -> Result { + fn try_from(v: i32) -> Result { match v { 0 => Ok(DataContentType::Data), 1 => Ok(DataContentType::PositionDeletes), @@ -1003,7 +1086,7 @@ pub enum DataFileFormat { impl FromStr for DataFileFormat { type Err = Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { "avro" => Ok(Self::Avro), "orc" => Ok(Self::Orc), @@ -1388,7 +1471,7 @@ mod tests { format_version: FormatVersion::V2, }, entries: vec![ - ManifestEntry { + Arc::new(ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, sequence_number: None, @@ -1411,7 +1494,7 @@ mod tests { equality_ids: Vec::new(), sort_order_id: None, } - } + }) ] }; @@ -1508,7 +1591,7 @@ mod tests { content: ManifestContentType::Data, format_version: FormatVersion::V2, }, - entries: vec![ManifestEntry { + entries: vec![Arc::new(ManifestEntry { status: ManifestStatus::Added, snapshot_id: None, sequence_number: None, @@ -1519,8 +1602,8 @@ mod tests { file_path: "s3a://icebergdata/demo/s1/t1/data/00000-0-378b56f5-5c52-4102-a2c2-f05f8a7cbe4a-00000.parquet".to_string(), partition: Struct::from_iter( vec![ - (1000, Some(Literal::int(1)), "v_int".to_string()), - (1001, Some(Literal::long(1000)), "v_long".to_string()) + Some(Literal::int(1)), + Some(Literal::long(1000)), ] .into_iter() ), @@ -1573,7 +1656,7 @@ mod tests { equality_ids: vec![], sort_order_id: None, }, - }], + })], }; let writer = |output_file: OutputFile| ManifestWriter::new(output_file, 1, vec![]); @@ -1617,7 +1700,7 @@ mod tests { content: ManifestContentType::Data, format_version: FormatVersion::V1, }, - entries: vec![ManifestEntry { + entries: vec![Arc::new(ManifestEntry { status: ManifestStatus::Added, snapshot_id: Some(0), sequence_number: Some(0), @@ -1640,7 +1723,7 @@ mod tests { equality_ids: vec![], sort_order_id: Some(0), } - }], + })], }; let writer = @@ -1687,7 +1770,7 @@ mod tests { format_version: FormatVersion::V1, }, entries: vec![ - ManifestEntry { + Arc::new(ManifestEntry { status: ManifestStatus::Added, snapshot_id: Some(0), sequence_number: Some(0), @@ -1697,14 +1780,12 @@ mod tests { file_path: "s3://testbucket/prod/db/sample/data/category=x/00010-1-d5c93668-1e52-41ac-92a6-bba590cbf249-00001.parquet".to_string(), file_format: DataFileFormat::Parquet, partition: Struct::from_iter( - vec![( - 1000, + vec![ Some( Literal::try_from_bytes(&[120], &Type::Primitive(PrimitiveType::String)) .unwrap() ), - "category".to_string() - )] + ] .into_iter() ), record_count: 1, @@ -1728,7 +1809,7 @@ mod tests { equality_ids: vec![], sort_order_id: Some(0), }, - } + }) ] }; diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index 76b8b53dd..a3bf0c489 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -19,16 +19,18 @@ use std::{collections::HashMap, str::FromStr}; +use crate::io::FileIO; use crate::{io::OutputFile, spec::Literal, Error, ErrorKind}; use apache_avro::{from_value, types::Value, Reader, Writer}; -use futures::AsyncWriteExt; +use futures::{AsyncReadExt, AsyncWriteExt}; use self::{ _const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2}, _serde::{ManifestListEntryV1, ManifestListEntryV2}, }; -use super::{FormatVersion, StructType}; +use super::{FormatVersion, Manifest, StructType}; +use crate::error::Result; /// Placeholder for sequence number. The field with this value must be replaced with the actual sequence number before it write. pub const UNASSIGNED_SEQUENCE_NUMBER: i64 = -1; @@ -57,18 +59,18 @@ impl ManifestList { pub fn parse_with_version( bs: &[u8], version: FormatVersion, - partition_types: &HashMap, - ) -> Result { + partition_type_provider: impl Fn(i32) -> Result>, + ) -> Result { match version { FormatVersion::V1 => { let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V1, bs)?; - let values = Value::Array(reader.collect::, _>>()?); - from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_types) + let values = Value::Array(reader.collect::, _>>()?); + from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type_provider) } FormatVersion::V2 => { let reader = Reader::with_schema(&MANIFEST_LIST_AVRO_SCHEMA_V2, bs)?; - let values = Value::Array(reader.collect::, _>>()?); - from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_types) + let values = Value::Array(reader.collect::, _>>()?); + from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type_provider) } } } @@ -167,7 +169,7 @@ impl ManifestListWriter { pub fn add_manifest_entries( &mut self, manifest_entries: impl Iterator, - ) -> Result<(), Error> { + ) -> Result<()> { match self.format_version { FormatVersion::V1 => { for manifest_entry in manifest_entries { @@ -210,7 +212,7 @@ impl ManifestListWriter { } /// Write the manifest list to the output file. - pub async fn close(self) -> Result<(), Error> { + pub async fn close(self) -> Result<()> { let data = self.avro_writer.into_inner()?; let mut writer = self.output_file.writer().await?; writer.write_all(&data).await.unwrap(); @@ -589,7 +591,7 @@ pub enum ManifestContentType { impl FromStr for ManifestContentType { type Err = Error; - fn from_str(s: &str) -> Result { + fn from_str(s: &str) -> Result { match s { "data" => Ok(ManifestContentType::Data), "deletes" => Ok(ManifestContentType::Deletes), @@ -613,7 +615,7 @@ impl ToString for ManifestContentType { impl TryFrom for ManifestContentType { type Error = Error; - fn try_from(value: i32) -> Result { + fn try_from(value: i32) -> std::result::Result { match value { 0 => Ok(ManifestContentType::Data), 1 => Ok(ManifestContentType::Deletes), @@ -628,6 +630,30 @@ impl TryFrom for ManifestContentType { } } +impl ManifestListEntry { + /// Load [`Manifest`]. + /// + /// This method will also initialize inherited values of [`ManifestEntry`], such as `sequence_number`. + pub async fn load_manifest(&self, file_io: &FileIO) -> Result { + let mut avro = Vec::new(); + file_io + .new_input(&self.manifest_path)? + .reader() + .await? + .read_to_end(&mut avro) + .await?; + + let (metadata, mut entries) = Manifest::try_from_avro_bytes(&avro)?; + + // Let entries inherit values from the manifest list entry. + for entry in &mut entries { + entry.inherit_data(self); + } + + Ok(Manifest::new(metadata, entries)) + } +} + /// Field summary for partition field in the spec. /// /// Each field in the list corresponds to a field in the manifest file’s partition spec. @@ -657,17 +683,15 @@ pub struct FieldSummary { /// and then converted into the [ManifestListEntry] struct. Serialization works the other way around. /// [ManifestListEntryV1] and [ManifestListEntryV2] are internal struct that are only used for serialization and deserialization. pub(super) mod _serde { - use std::collections::HashMap; - - pub use serde_bytes::ByteBuf; - use serde_derive::{Deserialize, Serialize}; - use crate::{ spec::{Literal, StructType, Type}, Error, }; + pub use serde_bytes::ByteBuf; + use serde_derive::{Deserialize, Serialize}; use super::ManifestListEntry; + use crate::error::Result; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(transparent)] @@ -686,8 +710,8 @@ pub(super) mod _serde { /// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. pub fn try_into( self, - partition_types: &HashMap, - ) -> Result { + partition_type_provider: impl Fn(i32) -> Result>, + ) -> Result { Ok(super::ManifestList { entries: self .entries @@ -695,7 +719,7 @@ pub(super) mod _serde { .map(|v| { let partition_spec_id = v.partition_spec_id; let manifest_path = v.manifest_path.clone(); - v.try_into(partition_types.get(&partition_spec_id)) + v.try_into(partition_type_provider(partition_spec_id)?.as_ref()) .map_err(|err| { err.with_context("manifest file path", manifest_path) .with_context( @@ -704,7 +728,7 @@ pub(super) mod _serde { ) }) }) - .collect::, _>>()?, + .collect::>>()?, }) } } @@ -712,13 +736,13 @@ pub(super) mod _serde { impl TryFrom for ManifestListV2 { type Error = Error; - fn try_from(value: super::ManifestList) -> Result { + fn try_from(value: super::ManifestList) -> std::result::Result { Ok(Self { entries: value .entries .into_iter() .map(TryInto::try_into) - .collect::, _>>()?, + .collect::, _>>()?, }) } } @@ -728,8 +752,8 @@ pub(super) mod _serde { /// The convert of [entries] need the partition_type info so use this function instead of [std::TryFrom] trait. pub fn try_into( self, - partition_types: &HashMap, - ) -> Result { + partition_type_provider: impl Fn(i32) -> Result>, + ) -> Result { Ok(super::ManifestList { entries: self .entries @@ -737,7 +761,7 @@ pub(super) mod _serde { .map(|v| { let partition_spec_id = v.partition_spec_id; let manifest_path = v.manifest_path.clone(); - v.try_into(partition_types.get(&partition_spec_id)) + v.try_into(partition_type_provider(partition_spec_id)?.as_ref()) .map_err(|err| { err.with_context("manifest file path", manifest_path) .with_context( @@ -746,7 +770,7 @@ pub(super) mod _serde { ) }) }) - .collect::, _>>()?, + .collect::>>()?, }) } } @@ -754,13 +778,13 @@ pub(super) mod _serde { impl TryFrom for ManifestListV1 { type Error = Error; - fn try_from(value: super::ManifestList) -> Result { + fn try_from(value: super::ManifestList) -> std::result::Result { Ok(Self { entries: value .entries .into_iter() .map(TryInto::try_into) - .collect::, _>>()?, + .collect::, _>>()?, }) } } @@ -812,7 +836,7 @@ pub(super) mod _serde { /// Converts the [FieldSummary] into a [super::FieldSummary]. /// [lower_bound] and [upper_bound] are converted into [Literal]s need the type info so use /// this function instead of [std::TryFrom] trait. - pub(crate) fn try_into(self, r#type: &Type) -> Result { + pub(crate) fn try_into(self, r#type: &Type) -> Result { Ok(super::FieldSummary { contains_null: self.contains_null, contains_nan: self.contains_nan, @@ -831,7 +855,7 @@ pub(super) mod _serde { fn try_convert_to_field_summary( partitions: Option>, partition_type: Option<&StructType>, - ) -> Result, Error> { + ) -> Result> { if let Some(partitions) = partitions { if let Some(partition_type) = partition_type { let partition_types = partition_type.fields(); @@ -849,7 +873,7 @@ pub(super) mod _serde { .into_iter() .zip(partition_types) .map(|(v, field)| v.try_into(&field.field_type)) - .collect::, _>>() + .collect::>>() } else { Err(Error::new( crate::ErrorKind::DataInvalid, @@ -864,10 +888,7 @@ pub(super) mod _serde { impl ManifestListEntryV2 { /// Converts the [ManifestListEntryV2] into a [ManifestListEntry]. /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. - pub fn try_into( - self, - partition_type: Option<&StructType>, - ) -> Result { + pub fn try_into(self, partition_type: Option<&StructType>) -> Result { let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; Ok(ManifestListEntry { manifest_path: self.manifest_path, @@ -892,10 +913,7 @@ pub(super) mod _serde { impl ManifestListEntryV1 { /// Converts the [ManifestListEntryV1] into a [ManifestListEntry]. /// The convert of [partitions] need the partition_type info so use this function instead of [std::TryFrom] trait. - pub fn try_into( - self, - partition_type: Option<&StructType>, - ) -> Result { + pub fn try_into(self, partition_type: Option<&StructType>) -> Result { let partitions = try_convert_to_field_summary(self.partitions, partition_type)?; Ok(ManifestListEntry { manifest_path: self.manifest_path, @@ -962,7 +980,7 @@ pub(super) mod _serde { impl TryFrom for ManifestListEntryV2 { type Error = Error; - fn try_from(value: ManifestListEntry) -> Result { + fn try_from(value: ManifestListEntry) -> std::result::Result { let partitions = convert_to_serde_field_summary(value.partitions); let key_metadata = convert_to_serde_key_metadata(value.key_metadata); Ok(Self { @@ -1036,7 +1054,7 @@ pub(super) mod _serde { impl TryFrom for ManifestListEntryV1 { type Error = Error; - fn try_from(value: ManifestListEntry) -> Result { + fn try_from(value: ManifestListEntry) -> std::result::Result { let partitions = convert_to_serde_field_summary(value.partitions); let key_metadata = convert_to_serde_key_metadata(value.key_metadata); Ok(Self { @@ -1133,7 +1151,7 @@ mod test { let bs = fs::read(full_path).expect("read_file must succeed"); let parsed_manifest_list = - ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, &HashMap::new()) + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, |_id| Ok(None)) .unwrap(); assert_eq!(manifest_list, parsed_manifest_list); @@ -1200,29 +1218,30 @@ mod test { let bs = fs::read(full_path).expect("read_file must succeed"); - let parsed_manifest_list = ManifestList::parse_with_version( - &bs, - crate::spec::FormatVersion::V2, - &HashMap::from([ - ( - 1, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Long), - ))]), - ), - ( - 2, - StructType::new(vec![Arc::new(NestedField::required( + let parsed_manifest_list = + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, |id| { + Ok(HashMap::from([ + ( 1, - "test", - Type::Primitive(PrimitiveType::Float), - ))]), - ), - ]), - ) - .unwrap(); + StructType::new(vec![Arc::new(NestedField::required( + 1, + "test", + Type::Primitive(PrimitiveType::Long), + ))]), + ), + ( + 2, + StructType::new(vec![Arc::new(NestedField::required( + 1, + "test", + Type::Primitive(PrimitiveType::Float), + ))]), + ), + ]) + .get(&id) + .cloned()) + }) + .unwrap(); assert_eq!(manifest_list, parsed_manifest_list); } @@ -1317,19 +1336,21 @@ mod test { writer.close().await.unwrap(); let bs = fs::read(path).unwrap(); - let manifest_list = ManifestList::parse_with_version( - &bs, - crate::spec::FormatVersion::V1, - &HashMap::from([( + + let partition_types = HashMap::from([( + 1, + StructType::new(vec![Arc::new(NestedField::required( 1, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Long), - ))]), - )]), - ) - .unwrap(); + "test", + Type::Primitive(PrimitiveType::Long), + ))]), + )]); + + let manifest_list = + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V1, move |id| { + Ok(partition_types.get(&id).cloned()) + }) + .unwrap(); assert_eq!(manifest_list, expected_manifest_list); temp_dir.close().unwrap(); @@ -1371,19 +1392,19 @@ mod test { writer.close().await.unwrap(); let bs = fs::read(path).unwrap(); - let manifest_list = ManifestList::parse_with_version( - &bs, - crate::spec::FormatVersion::V2, - &HashMap::from([( + let partition_types = HashMap::from([( + 1, + StructType::new(vec![Arc::new(NestedField::required( 1, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Long), - ))]), - )]), - ) - .unwrap(); + "test", + Type::Primitive(PrimitiveType::Long), + ))]), + )]); + let manifest_list = + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, move |id| { + Ok(partition_types.get(&id).cloned()) + }) + .unwrap(); expected_manifest_list.entries[0].sequence_number = seq_num; expected_manifest_list.entries[0].min_sequence_number = seq_num; assert_eq!(manifest_list, expected_manifest_list); @@ -1425,19 +1446,21 @@ mod test { writer.close().await.unwrap(); let bs = fs::read(path).unwrap(); - let manifest_list = ManifestList::parse_with_version( - &bs, - crate::spec::FormatVersion::V2, - &HashMap::from([( + + let partition_types = HashMap::from([( + 1, + StructType::new(vec![Arc::new(NestedField::required( 1, - StructType::new(vec![Arc::new(NestedField::required( - 1, - "test", - Type::Primitive(PrimitiveType::Long), - ))]), - )]), - ) - .unwrap(); + "test", + Type::Primitive(PrimitiveType::Long), + ))]), + )]); + + let manifest_list = + ManifestList::parse_with_version(&bs, crate::spec::FormatVersion::V2, move |id| { + Ok(partition_types.get(&id).cloned()) + }) + .unwrap(); assert_eq!(manifest_list, expected_manifest_list); temp_dir.close().unwrap(); diff --git a/crates/iceberg/src/spec/schema.rs b/crates/iceberg/src/spec/schema.rs index 724498b45..6991d5296 100644 --- a/crates/iceberg/src/spec/schema.rs +++ b/crates/iceberg/src/spec/schema.rs @@ -32,16 +32,18 @@ use std::sync::Arc; use _serde::SchemaEnum; +/// Type alias for schema id. +pub type SchemaId = i32; /// Reference to [`Schema`]. pub type SchemaRef = Arc; -const DEFAULT_SCHEMA_ID: i32 = 0; +const DEFAULT_SCHEMA_ID: SchemaId = 0; /// Defines schema in iceberg. #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(try_from = "SchemaEnum", into = "SchemaEnum")] pub struct Schema { r#struct: StructType, - schema_id: i32, + schema_id: SchemaId, highest_field_id: i32, identifier_field_ids: HashSet, diff --git a/crates/iceberg/src/spec/snapshot.rs b/crates/iceberg/src/spec/snapshot.rs index c10e892bf..781b757fb 100644 --- a/crates/iceberg/src/spec/snapshot.rs +++ b/crates/iceberg/src/spec/snapshot.rs @@ -18,13 +18,18 @@ /*! * Snapshots */ +use crate::error::Result; use chrono::{DateTime, TimeZone, Utc}; +use futures::AsyncReadExt; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use typed_builder::TypedBuilder; use super::table_metadata::SnapshotLog; +use crate::io::FileIO; +use crate::spec::{ManifestList, SchemaId, SchemaRef, StructType, TableMetadata}; +use crate::{Error, ErrorKind}; use _serde::SnapshotV2; /// Reference to [`Snapshot`]. @@ -84,7 +89,7 @@ pub struct Snapshot { summary: Summary, /// ID of the table’s current schema when the snapshot was created. #[builder(setter(strip_option), default = None)] - schema_id: Option, + schema_id: Option, } /// Type to distinguish between a path to a manifestlist file or a vector of manifestfile locations @@ -103,6 +108,13 @@ impl Snapshot { pub fn snapshot_id(&self) -> i64 { self.snapshot_id } + + /// Get parent snapshot id. + #[inline] + pub fn parent_snapshot_id(&self) -> Option { + self.parent_snapshot_id + } + /// Get sequence_number of the snapshot. Is 0 for Iceberg V1 tables. #[inline] pub fn sequence_number(&self) -> i64 { @@ -113,6 +125,20 @@ impl Snapshot { pub fn manifest_list(&self) -> &ManifestListLocation { &self.manifest_list } + + /// Return the manifest list file path. + /// + /// It will return an error if the manifest list is not a file but a list of manifest file paths. + #[inline] + pub fn manifest_list_file_path(&self) -> Result<&str> { + match &self.manifest_list { + ManifestListLocation::ManifestListFile(s) => Ok(s), + _ => Err(Error::new( + ErrorKind::DataInvalid, + "Manifest list is not a file but a list of manifest files.", + )), + } + } /// Get summary of the snapshot #[inline] pub fn summary(&self) -> &Summary { @@ -124,6 +150,70 @@ impl Snapshot { Utc.timestamp_millis_opt(self.timestamp_ms).unwrap() } + /// Get the schema id of this snapshot. + #[inline] + pub fn schema_id(&self) -> Option { + self.schema_id + } + + /// Get the schema of this snapshot. + pub fn schema(&self, table_metadata: &TableMetadata) -> Result { + Ok(match self.schema_id() { + Some(schema_id) => table_metadata + .schema_by_id(schema_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Schema with id {} not found", schema_id), + ) + })? + .clone(), + None => table_metadata.current_schema().clone(), + }) + } + + /// Get parent snapshot. + #[cfg(test)] + pub(crate) fn parent_snapshot(&self, table_metadata: &TableMetadata) -> Option { + match self.parent_snapshot_id { + Some(id) => table_metadata.snapshot_by_id(id).cloned(), + None => None, + } + } + + /// Load manifest list. + pub async fn load_manifest_list( + &self, + file_io: &FileIO, + table_metadata: &TableMetadata, + ) -> Result { + match &self.manifest_list { + ManifestListLocation::ManifestListFile(file) => { + let mut manifest_list_content= Vec::new(); + file_io + .new_input(file)? + .reader().await? + .read_to_end(&mut manifest_list_content) + .await?; + + let schema = self.schema(table_metadata)?; + + let partition_type_provider = |partition_spec_id: i32| -> Result> { + table_metadata.partition_spec_by_id(partition_spec_id).map(|partition_spec| { + partition_spec.partition_type(&schema) + }).transpose() + }; + + ManifestList::parse_with_version(&manifest_list_content, table_metadata.format_version(), + partition_type_provider, ) + } + ManifestListLocation::ManifestFiles(_) => Err(Error::new( + ErrorKind::FeatureUnsupported, + "Loading manifests from `manifests` is currently not supported, we only support loading from `manifest-list` file, see https://iceberg.apache.org/spec/#snapshots for more information.", + )), + } + } + pub(crate) fn log(&self) -> SnapshotLog { SnapshotLog { timestamp_ms: self.timestamp_ms, @@ -141,6 +231,7 @@ pub(super) mod _serde { use serde::{Deserialize, Serialize}; + use crate::spec::SchemaId; use crate::{Error, ErrorKind}; use super::{ManifestListLocation, Operation, Snapshot, Summary}; @@ -157,7 +248,7 @@ pub(super) mod _serde { pub manifest_list: String, pub summary: Summary, #[serde(skip_serializing_if = "Option::is_none")] - pub schema_id: Option, + pub schema_id: Option, } #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] @@ -175,7 +266,7 @@ pub(super) mod _serde { #[serde(skip_serializing_if = "Option::is_none")] pub summary: Option, #[serde(skip_serializing_if = "Option::is_none")] - pub schema_id: Option, + pub schema_id: Option, } impl From for Snapshot { diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 905c82307..c7f5111e5 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -27,7 +27,7 @@ use uuid::Uuid; use super::{ snapshot::{Snapshot, SnapshotReference, SnapshotRetention}, - PartitionSpecRef, SchemaRef, SnapshotRef, SortOrderRef, + PartitionSpecRef, SchemaId, SchemaRef, SnapshotRef, SortOrderRef, }; use _serde::TableMetadataEnum; @@ -38,6 +38,12 @@ static MAIN_BRANCH: &str = "main"; static DEFAULT_SPEC_ID: i32 = 0; static DEFAULT_SORT_ORDER_ID: i64 = 0; +pub(crate) static EMPTY_SNAPSHOT_ID: i64 = -1; +pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0; + +/// Reference to [`TableMetadata`]. +pub type TableMetadataRef = Arc; + #[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)] #[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")] /// Fields for the version 2 of the table metadata. @@ -147,7 +153,7 @@ impl TableMetadata { /// Lookup schema by id. #[inline] - pub fn schema_by_id(&self, schema_id: i32) -> Option<&SchemaRef> { + pub fn schema_by_id(&self, schema_id: SchemaId) -> Option<&SchemaRef> { self.schemas.get(&schema_id) } @@ -280,7 +286,7 @@ pub(super) mod _serde { use serde::{Deserialize, Serialize}; use uuid::Uuid; - use crate::spec::Snapshot; + use crate::spec::{Snapshot, EMPTY_SNAPSHOT_ID}; use crate::{ spec::{ schema::_serde::{SchemaV1, SchemaV2}, @@ -558,8 +564,12 @@ pub(super) mod _serde { schemas, properties: value.properties.unwrap_or_default(), - current_snapshot_id: if let &Some(-1) = &value.current_snapshot_id { - None + current_snapshot_id: if let &Some(id) = &value.current_snapshot_id { + if id == EMPTY_SNAPSHOT_ID { + None + } else { + Some(id) + } } else { value.current_snapshot_id }, diff --git a/crates/iceberg/src/spec/values.rs b/crates/iceberg/src/spec/values.rs index 39f870602..a8a748d65 100644 --- a/crates/iceberg/src/spec/values.rs +++ b/crates/iceberg/src/spec/values.rs @@ -569,16 +569,12 @@ impl IntoIterator for Struct { } } -impl FromIterator<(i32, Option, String)> for Struct { - fn from_iter, String)>>(iter: I) -> Self { +impl FromIterator> for Struct { + fn from_iter>>(iter: I) -> Self { let mut fields = Vec::new(); - let mut field_ids = Vec::new(); - let mut field_names = Vec::new(); let mut null_bitmap = BitVec::new(); - for (id, value, name) in iter.into_iter() { - field_ids.push(id); - field_names.push(name); + for value in iter.into_iter() { match value { Some(value) => { fields.push(value); @@ -749,20 +745,16 @@ impl Literal { if let JsonValue::Object(mut object) = value { Ok(Some(Literal::Struct(Struct::from_iter( schema.fields().iter().map(|field| { - ( - field.id, - object.remove(&field.id.to_string()).and_then(|value| { - Literal::try_from_json(value, &field.field_type) - .and_then(|value| { - value.ok_or(Error::new( - ErrorKind::DataInvalid, - "Key of map cannot be null", - )) - }) - .ok() - }), - field.name.clone(), - ) + object.remove(&field.id.to_string()).and_then(|value| { + Literal::try_from_json(value, &field.field_type) + .and_then(|value| { + value.ok_or(Error::new( + ErrorKind::DataInvalid, + "Key of map cannot be null", + )) + }) + .ok() + }) }), )))) } else { @@ -1558,7 +1550,7 @@ mod _serde { optional: _, }) => match ty { Type::Struct(struct_ty) => { - let iters: Vec<(i32, Option, String)> = required + let iters: Vec> = required .into_iter() .map(|(field_name, value)| { let field = struct_ty @@ -1570,7 +1562,7 @@ mod _serde { ) })?; let value = value.try_into(&field.field_type)?; - Ok((field.id, value, field.name.clone())) + Ok(value) }) .collect::>()?; Ok(Some(Literal::Struct(super::Struct::from_iter(iters)))) @@ -1659,11 +1651,8 @@ mod tests { .unwrap(); let avro_schema = schema_to_avro_schema("test", &schema).unwrap(); let struct_type = Type::Struct(StructType::new(fields)); - let struct_literal = Literal::Struct(Struct::from_iter(vec![( - 1, - Some(expected_literal.clone()), - "col".to_string(), - )])); + let struct_literal = + Literal::Struct(Struct::from_iter(vec![Some(expected_literal.clone())])); let mut writer = apache_avro::Writer::new(&avro_schema, Vec::new()); let raw_literal = RawLiteral::try_from(struct_literal.clone(), &struct_type).unwrap(); @@ -1688,11 +1677,7 @@ mod tests { .unwrap(); let avro_schema = schema_to_avro_schema("test", &schema).unwrap(); let struct_type = Type::Struct(StructType::new(fields)); - let struct_literal = Literal::Struct(Struct::from_iter(vec![( - 1, - Some(literal.clone()), - "col".to_string(), - )])); + let struct_literal = Literal::Struct(Struct::from_iter(vec![Some(literal.clone())])); let mut writer = apache_avro::Writer::new(&avro_schema, Vec::new()); let raw_literal = RawLiteral::try_from(struct_literal.clone(), &struct_type).unwrap(); let value = to_value(raw_literal) @@ -1849,19 +1834,11 @@ mod tests { check_json_serde( record, Literal::Struct(Struct::from_iter(vec![ - ( - 1, - Some(Literal::Primitive(PrimitiveLiteral::Int(1))), - "id".to_string(), - ), - ( - 2, - Some(Literal::Primitive(PrimitiveLiteral::String( - "bar".to_string(), - ))), - "name".to_string(), - ), - (3, None, "address".to_string()), + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + Some(Literal::Primitive(PrimitiveLiteral::String( + "bar".to_string(), + ))), + None, ])), &Type::Struct(StructType::new(vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), @@ -2216,19 +2193,11 @@ mod tests { fn avro_convert_test_record() { check_convert_with_avro( Literal::Struct(Struct::from_iter(vec![ - ( - 1, - Some(Literal::Primitive(PrimitiveLiteral::Int(1))), - "id".to_string(), - ), - ( - 2, - Some(Literal::Primitive(PrimitiveLiteral::String( - "bar".to_string(), - ))), - "name".to_string(), - ), - (3, None, "address".to_string()), + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + Some(Literal::Primitive(PrimitiveLiteral::String( + "bar".to_string(), + ))), + None, ])), &Type::Struct(StructType::new(vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index fad91394c..e3260a83a 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -16,9 +16,9 @@ // under the License. //! Table API for Apache Iceberg - use crate::io::FileIO; -use crate::spec::TableMetadata; +use crate::scan::TableScanBuilder; +use crate::spec::{TableMetadata, TableMetadataRef}; use crate::TableIdent; use typed_builder::TypedBuilder; @@ -26,9 +26,10 @@ use typed_builder::TypedBuilder; #[derive(TypedBuilder, Debug)] pub struct Table { file_io: FileIO, - #[builder(default, setter(strip_option))] + #[builder(default, setter(strip_option, into))] metadata_location: Option, - metadata: TableMetadata, + #[builder(setter(into))] + metadata: TableMetadataRef, identifier: TableIdent, } @@ -42,8 +43,23 @@ impl Table { &self.metadata } + /// Returns current metadata ref. + pub fn metadata_ref(&self) -> TableMetadataRef { + self.metadata.clone() + } + /// Returns current metadata location. pub fn metadata_location(&self) -> Option<&str> { self.metadata_location.as_deref() } + + /// Returns file io used in this table. + pub fn file_io(&self) -> &FileIO { + &self.file_io + } + + /// Creates a table scan. + pub fn scan(&self) -> TableScanBuilder<'_> { + TableScanBuilder::new(self) + } } diff --git a/crates/iceberg/testdata/example_table_metadata_v2.json b/crates/iceberg/testdata/example_table_metadata_v2.json new file mode 100644 index 000000000..809c35587 --- /dev/null +++ b/crates/iceberg/testdata/example_table_metadata_v2.json @@ -0,0 +1,61 @@ +{ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "{{ table_location }}", + "last-sequence-number": 34, + "last-updated-ms": 1602638573590, + "last-column-id": 3, + "current-schema-id": 1, + "schemas": [ + {"type": "struct", "schema-id": 0, "fields": [{"id": 1, "name": "x", "required": true, "type": "long"}]}, + { + "type": "struct", + "schema-id": 1, + "identifier-field-ids": [1, 2], + "fields": [ + {"id": 1, "name": "x", "required": true, "type": "long"}, + {"id": 2, "name": "y", "required": true, "type": "long", "doc": "comment"}, + {"id": 3, "name": "z", "required": true, "type": "long"} + ] + } + ], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": [{"name": "x", "transform": "identity", "source-id": 1, "field-id": 1000}]}], + "last-partition-id": 1000, + "default-sort-order-id": 3, + "sort-orders": [ + { + "order-id": 3, + "fields": [ + {"transform": "identity", "source-id": 2, "direction": "asc", "null-order": "nulls-first"}, + {"transform": "bucket[4]", "source-id": 3, "direction": "desc", "null-order": "nulls-last"} + ] + } + ], + "properties": {"read.split.target.size": "134217728"}, + "current-snapshot-id": 3055729675574597004, + "snapshots": [ + { + "snapshot-id": 3051729675574597004, + "timestamp-ms": 1515100955770, + "sequence-number": 0, + "summary": {"operation": "append"}, + "manifest-list": "{{ manifest_list_1_location }}" + }, + { + "snapshot-id": 3055729675574597004, + "parent-snapshot-id": 3051729675574597004, + "timestamp-ms": 1555100955770, + "sequence-number": 1, + "summary": {"operation": "append"}, + "manifest-list": "{{ manifest_list_2_location }}", + "schema-id": 1 + } + ], + "snapshot-log": [ + {"snapshot-id": 3051729675574597004, "timestamp-ms": 1515100955770}, + {"snapshot-id": 3055729675574597004, "timestamp-ms": 1555100955770} + ], + "metadata-log": [{"metadata-file": "{{ table_metadata_1_location }}", "timestamp-ms": 1515100}], + "refs": {"test": {"snapshot-id": 3051729675574597004, "type": "tag", "max-ref-age-ms": 10000000}} +} \ No newline at end of file