Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add parquet writer #176

Merged
merged 6 commits into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,3 @@ iceberg_test_utils = { path = "../test_utils", features = ["tests"] }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
tokio = { workspace = true }
11 changes: 7 additions & 4 deletions crates/iceberg/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
use tokio::io::AsyncWrite as TokioAsyncWrite;
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
use url::Url;

Expand Down Expand Up @@ -244,9 +245,9 @@ impl InputFile {
}

/// Trait for writing file.
pub trait FileWrite: AsyncWrite {}
pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite weird that FileWrite requires to have both AsyncWrite and TokioAsyncWrite. It increases the burden for the downstream to implement.

Did I missed something here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added TokioAsyncWrite because the parquet writer accepct a tokio writer. Can we remove the futures::AsyncWrite? Since for now, we don't need it.

Copy link
Member

@Xuanwo Xuanwo Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the futures::AsyncWrite? Since for now, we don't need it.

That's fine to remove futures::AsyncWrite here if not needed.

But I want to mention that using AsyncWrite is more friendly for users who don't want to depend on TokioAsyncWrite and there are async_compat crate to transform a futures::AsyncWrite to TokioAsyncWrite.


Maybe we can add our own trait to support both of them? I believe this is another issue and should not be part of this PR.


impl<T> FileWrite for T where T: AsyncWrite {}
impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}

/// Output file is used for writing to files..
#[derive(Debug)]
Expand Down Expand Up @@ -282,8 +283,10 @@ impl OutputFile {
}

/// Creates output file for writing.
pub async fn writer(&self) -> Result<impl FileWrite> {
Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
Ok(Box::new(
self.op.writer(&self.path[self.relative_path_pos..]).await?,
))
}
}

Expand Down
17 changes: 10 additions & 7 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl FileScanTask {
mod tests {
use crate::io::{FileIO, OutputFile};
use crate::spec::{
DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, Manifest,
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter, ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
};
Expand Down Expand Up @@ -314,14 +314,15 @@ mod tests {
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
.build(),
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
Expand All @@ -330,14 +331,15 @@ mod tests {
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
.build(),
.build()
.unwrap(),
)
.build(),
ManifestEntry::builder()
Expand All @@ -346,14 +348,15 @@ mod tests {
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
DataFile::builder()
DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet", &self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
.build(),
.build()
.unwrap(),
)
.build(),
],
Expand Down
34 changes: 17 additions & 17 deletions crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -932,34 +932,34 @@ impl TryFrom<i32> for ManifestStatus {
}

/// Data file carries data file path, partition tuple, metrics, …
#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
#[derive(Debug, PartialEq, Clone, Eq, Builder)]
ZENOTME marked this conversation as resolved.
Show resolved Hide resolved
pub struct DataFile {
/// field id: 134
///
/// Type of content stored by the data file: data, equality deletes,
/// or position deletes (all v1 files are data files)
content: DataContentType,
pub(crate) content: DataContentType,
/// field id: 100
///
/// Full URI for the file with FS scheme
file_path: String,
pub(crate) file_path: String,
/// field id: 101
///
/// String file format name, avro, orc or parquet
file_format: DataFileFormat,
pub(crate) file_format: DataFileFormat,
/// field id: 102
///
/// Partition data tuple, schema based on the partition spec output using
/// partition field ids for the struct field ids
partition: Struct,
pub(crate) partition: Struct,
/// field id: 103
///
/// Number of records in this file
record_count: u64,
pub(crate) record_count: u64,
/// field id: 104
///
/// Total file size in bytes
file_size_in_bytes: u64,
pub(crate) file_size_in_bytes: u64,
/// field id: 108
/// key field id: 117
/// value field id: 118
Expand All @@ -968,29 +968,29 @@ pub struct DataFile {
/// store the column. Does not include bytes necessary to read other
/// columns, like footers. Leave null for row-oriented formats (Avro)
#[builder(default)]
column_sizes: HashMap<i32, u64>,
pub(crate) column_sizes: HashMap<i32, u64>,
/// field id: 109
/// key field id: 119
/// value field id: 120
///
/// Map from column id to number of values in the column (including null
/// and NaN values)
#[builder(default)]
value_counts: HashMap<i32, u64>,
pub(crate) value_counts: HashMap<i32, u64>,
/// field id: 110
/// key field id: 121
/// value field id: 122
///
/// Map from column id to number of null values in the column
#[builder(default)]
null_value_counts: HashMap<i32, u64>,
pub(crate) null_value_counts: HashMap<i32, u64>,
/// field id: 137
/// key field id: 138
/// value field id: 139
///
/// Map from column id to number of NaN values in the column
#[builder(default)]
nan_value_counts: HashMap<i32, u64>,
pub(crate) nan_value_counts: HashMap<i32, u64>,
/// field id: 125
/// key field id: 126
/// value field id: 127
Expand All @@ -1003,7 +1003,7 @@ pub struct DataFile {
///
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
#[builder(default)]
lower_bounds: HashMap<i32, Literal>,
pub(crate) lower_bounds: HashMap<i32, Literal>,
/// field id: 128
/// key field id: 129
/// value field id: 130
Expand All @@ -1016,19 +1016,19 @@ pub struct DataFile {
///
/// - [Binary single-value serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
#[builder(default)]
upper_bounds: HashMap<i32, Literal>,
pub(crate) upper_bounds: HashMap<i32, Literal>,
/// field id: 131
///
/// Implementation-specific key metadata for encryption
#[builder(default)]
key_metadata: Vec<u8>,
pub(crate) key_metadata: Vec<u8>,
/// field id: 132
/// element field id: 133
///
/// Split offsets for the data file. For example, all row group offsets
/// in a Parquet file. Must be sorted ascending
#[builder(default)]
split_offsets: Vec<i64>,
pub(crate) split_offsets: Vec<i64>,
/// field id: 135
/// element field id: 136
///
Expand All @@ -1037,7 +1037,7 @@ pub struct DataFile {
/// otherwise. Fields with ids listed in this column must be present
/// in the delete file
#[builder(default)]
equality_ids: Vec<i32>,
pub(crate) equality_ids: Vec<i32>,
/// field id: 140
///
/// ID representing sort order for this file.
Expand All @@ -1049,7 +1049,7 @@ pub struct DataFile {
/// order id to null. Readers must ignore sort order id for position
/// delete files.
#[builder(default, setter(strip_option))]
sort_order_id: Option<i32>,
pub(crate) sort_order_id: Option<i32>,
}

/// Type of content stored by the data file: data, equality deletes, or
Expand Down
38 changes: 19 additions & 19 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,66 +52,66 @@ pub type TableMetadataRef = Arc<TableMetadata>;
/// We check the validity of this data structure when constructing.
pub struct TableMetadata {
/// Integer Version for the format.
format_version: FormatVersion,
pub(crate) format_version: FormatVersion,
/// A UUID that identifies the table
table_uuid: Uuid,
pub(crate) table_uuid: Uuid,
/// Location tables base location
location: String,
pub(crate) location: String,
/// The tables highest sequence number
last_sequence_number: i64,
pub(crate) last_sequence_number: i64,
/// Timestamp in milliseconds from the unix epoch when the table was last updated.
last_updated_ms: i64,
pub(crate) last_updated_ms: i64,
/// An integer; the highest assigned column ID for the table.
last_column_id: i32,
pub(crate) last_column_id: i32,
/// A list of schemas, stored as objects with schema-id.
schemas: HashMap<i32, SchemaRef>,
pub(crate) schemas: HashMap<i32, SchemaRef>,
/// ID of the table’s current schema.
current_schema_id: i32,
pub(crate) current_schema_id: i32,
/// A list of partition specs, stored as full partition spec objects.
partition_specs: HashMap<i32, PartitionSpecRef>,
pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
/// ID of the “current” spec that writers should use by default.
default_spec_id: i32,
pub(crate) default_spec_id: i32,
/// An integer; the highest assigned partition field ID across all partition specs for the table.
last_partition_id: i32,
pub(crate) last_partition_id: i32,
///A string to string map of table properties. This is used to control settings that
/// affect reading and writing and is not intended to be used for arbitrary metadata.
/// For example, commit.retry.num-retries is used to control the number of commit retries.
properties: HashMap<String, String>,
pub(crate) properties: HashMap<String, String>,
/// long ID of the current table snapshot; must be the same as the current
/// ID of the main branch in refs.
current_snapshot_id: Option<i64>,
pub(crate) current_snapshot_id: Option<i64>,
///A list of valid snapshots. Valid snapshots are snapshots for which all
/// data files exist in the file system. A data file must not be deleted
/// from the file system until the last snapshot in which it was listed is
/// garbage collected.
snapshots: HashMap<i64, SnapshotRef>,
pub(crate) snapshots: HashMap<i64, SnapshotRef>,
/// A list (optional) of timestamp and snapshot ID pairs that encodes changes
/// to the current snapshot for the table. Each time the current-snapshot-id
/// is changed, a new entry should be added with the last-updated-ms
/// and the new current-snapshot-id. When snapshots are expired from
/// the list of valid snapshots, all entries before a snapshot that has
/// expired should be removed.
snapshot_log: Vec<SnapshotLog>,
pub(crate) snapshot_log: Vec<SnapshotLog>,

/// A list (optional) of timestamp and metadata file location pairs
/// that encodes changes to the previous metadata files for the table.
/// Each time a new metadata file is created, a new entry of the
/// previous metadata file location should be added to the list.
/// Tables can be configured to remove oldest metadata log entries and
/// keep a fixed-size log of the most recent entries after a commit.
metadata_log: Vec<MetadataLog>,
pub(crate) metadata_log: Vec<MetadataLog>,

/// A list of sort orders, stored as full sort order objects.
sort_orders: HashMap<i64, SortOrderRef>,
pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
/// Default sort order id of the table. Note that this could be used by
/// writers, but is not used when reading because reads use the specs
/// stored in manifest files.
default_sort_order_id: i64,
pub(crate) default_sort_order_id: i64,
///A map of snapshot references. The map keys are the unique snapshot reference
/// names in the table, and the map values are snapshot reference objects.
/// There is always a main branch reference pointing to the current-snapshot-id
/// even if the refs map is null.
refs: HashMap<String, SnapshotReference>,
pub(crate) refs: HashMap<String, SnapshotReference>,
}

impl TableMetadata {
Expand Down
Loading
Loading