Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Streaming Catalog Writes #2966

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ daft-local-execution = {path = "src/daft-local-execution", default-features = fa
daft-micropartition = {path = "src/daft-micropartition", default-features = false}
daft-minhash = {path = "src/daft-minhash", default-features = false}
daft-parquet = {path = "src/daft-parquet", default-features = false}
daft-physical-plan = {path = "src/daft-physical-plan", default-features = false}
daft-plan = {path = "src/daft-plan", default-features = false}
daft-scan = {path = "src/daft-scan", default-features = false}
daft-scheduler = {path = "src/daft-scheduler", default-features = false}
Expand Down Expand Up @@ -47,6 +48,7 @@ python = [
"daft-json/python",
"daft-micropartition/python",
"daft-parquet/python",
"daft-physical-plan/python",
"daft-plan/python",
"daft-scan/python",
"daft-scheduler/python",
Expand Down
214 changes: 214 additions & 0 deletions daft/io/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import uuid
from typing import Optional, Union, TYPE_CHECKING

from daft.daft import IOConfig, PyMicroPartition
from daft.dependencies import pa, pacsv, pq
from daft.filesystem import (
_resolve_paths_and_filesystem,
canonicalize_protocol,
get_protocol_from_path,
)
from daft.iceberg.iceberg_write import (
add_missing_columns,
coerce_pyarrow_table_to_schema,
to_partition_representation,
)
from daft.table.micropartition import MicroPartition

if TYPE_CHECKING:
from pyiceberg.partitioning import PartitionSpec as IcebergPartitionSpec
from pyiceberg.schema import Schema as IcebergSchema
from pyiceberg.table import TableProperties as IcebergTableProperties


class FileWriterBase:
def __init__(
self,
root_dir: str,
file_idx: int,
file_format: str,
compression: Optional[str] = None,
io_config: Optional[IOConfig] = None,
):
[self.resolved_path], self.fs = _resolve_paths_and_filesystem(
root_dir, io_config=io_config
)
protocol = get_protocol_from_path(root_dir)
canonicalized_protocol = canonicalize_protocol(protocol)
is_local_fs = canonicalized_protocol == "file"
if is_local_fs:
self.fs.create_dir(self.resolved_path, recursive=True)

self.file_name = f"{uuid.uuid4()}-{file_idx}.{file_format}"
self.full_path = f"{self.resolved_path}/{self.file_name}"
self.compression = compression if compression is not None else "none"
self.current_writer: Optional[Union[pq.ParquetWriter, pacsv.CSVWriter]] = None

def _create_writer(self, schema: pa.Schema):
raise NotImplementedError("Subclasses must implement this method.")

def write(self, table: MicroPartition):
if self.current_writer is None:
self.current_writer = self._create_writer(
table.schema().to_pyarrow_schema()
)
self.current_writer.write_table(table.to_arrow())

def close(self) -> Optional[str]:
if self.current_writer is None:
return None
self.current_writer.close()
return self.full_path


class ParquetFileWriter(FileWriterBase):
def __init__(
self,
root_dir: str,
file_idx: int,
compression: str = "none",
io_config: Optional[IOConfig] = None,
):
super().__init__(root_dir, file_idx, "parquet", compression, io_config)

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
)


class CSVFileWriter(FileWriterBase):
def __init__(
self, root_dir: str, file_idx: int, io_config: Optional[IOConfig] = None
):
super().__init__(root_dir, file_idx, "csv", None, io_config)

def _create_writer(self, schema: pa.Schema) -> pacsv.CSVWriter:
file_path = f"{self.resolved_path}/{self.file_name}"
return pacsv.CSVWriter(
file_path,
schema,
)


class IcebergFileWriter(FileWriterBase):
def __init__(
self,
root_dir: str,
file_idx: int,
schema: "IcebergSchema",
properties: "IcebergTableProperties",
partition_spec: "IcebergPartitionSpec",
partition_values: Optional[MicroPartition] = None,
compression: str = "zstd",
io_config: Optional[IOConfig] = None,
):
from pyiceberg.typedef import Record as IcebergRecord
from pyiceberg.io.pyarrow import schema_to_pyarrow
print("IcebergFileWriter root_dir:", root_dir)
super().__init__(root_dir, file_idx, "parquet", compression, io_config)
if partition_values is None:
self.part_record = IcebergRecord()
else:
part_vals = partition_values.to_pylist()
iceberg_part_vals = {
k: to_partition_representation(v) for k, v in part_vals.items()
}
self.part_record = IcebergRecord(**iceberg_part_vals)
print("IcebergFileWriter iceberg schema:", schema)
self.iceberg_schema = schema
self.file_schema = schema_to_pyarrow(schema)
print("IcebergFileWriter file schema:", self.file_schema)
self.metadata_collector = []
self.partition_spec = partition_spec
self.properties = properties

def _create_writer(self, schema: pa.Schema) -> pq.ParquetWriter:
return pq.ParquetWriter(
self.full_path,
schema,
compression=self.compression,
use_compliant_nested_type=False,
filesystem=self.fs,
metadata_collector=self.metadata_collector,
)

def write(self, table: MicroPartition):
print("IcebergFileWriter write table:", table)
if self.current_writer is None:
self.current_writer = self._create_writer(self.file_schema)
table = add_missing_columns(table, self.file_schema)
casted = coerce_pyarrow_table_to_schema(table.to_arrow(), self.file_schema)
self.current_writer.write_table(casted)
print("IcebergFileWriter write table done")

def close(self) -> PyMicroPartition:
import pyiceberg
from packaging.version import parse
from pyiceberg.io.pyarrow import (
compute_statistics_plan,
parquet_path_to_id_mapping,
)
from pyiceberg.manifest import DataFile, DataFileContent
from pyiceberg.manifest import FileFormat as IcebergFileFormat

file_path = super().close()
print("IcebergFileWriter close file_path:", file_path)
metadata = self.metadata_collector[0]
print("IcebergFileWriter close metadata:", metadata)
row_groups_size = sum(metadata.row_group(i).total_byte_size for i in range(metadata.num_row_groups))
metadata_size = metadata.serialized_size

total_file_size_bytes = row_groups_size + metadata_size
kwargs = {
"content": DataFileContent.DATA,
"file_path": file_path,
"file_format": IcebergFileFormat.PARQUET,
"partition": self.part_record,
"file_size_in_bytes": total_file_size_bytes,
# After this has been fixed:
# https://github.com/apache/iceberg-python/issues/271
# "sort_order_id": task.sort_order_id,
"sort_order_id": None,
# Just copy these from the table for now
"spec_id": self.partition_spec.spec_id,
"equality_ids": None,
"key_metadata": None,
}

if parse(pyiceberg.__version__) >= parse("0.7.0"):
from pyiceberg.io.pyarrow import data_file_statistics_from_parquet_metadata
print("IcebergFileWriter close before statistics")
statistics = data_file_statistics_from_parquet_metadata(
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(
self.iceberg_schema, self.properties
),
parquet_column_mapping=parquet_path_to_id_mapping(self.iceberg_schema),
)
print("IcebergFileWriter close statistics:", statistics)
data_file = DataFile(
**{
**kwargs,
**statistics.to_serialized_dict(),
}
)
else:
from pyiceberg.io.pyarrow import fill_parquet_file_metadata

data_file = DataFile(**kwargs)

fill_parquet_file_metadata(
data_file=data_file,
parquet_metadata=metadata,
stats_columns=compute_statistics_plan(
self.iceberg_schema, self.properties
),
parquet_column_mapping=parquet_path_to_id_mapping(self.iceberg_schema),
)
print("IcebergFileWriter close data_file:", data_file)
return MicroPartition.from_pydict({"data_file": [data_file]})._micropartition
4 changes: 2 additions & 2 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pathlib
import random
import time
from typing import IO, TYPE_CHECKING, Any, Iterator, Union
from typing import IO, TYPE_CHECKING, Any, Iterable, Iterator, Union
from uuid import uuid4

from daft.context import get_context
Expand Down Expand Up @@ -536,7 +536,7 @@ def write_iceberg(
partition_field_to_expr,
partitioned_table_to_iceberg_iter,
)

print("write_iceberg base_path:", base_path)
[resolved_path], fs = _resolve_paths_and_filesystem(base_path, io_config=io_config)
if isinstance(base_path, pathlib.Path):
path_str = str(base_path)
Expand Down
89 changes: 89 additions & 0 deletions src/daft-local-execution/src/buffer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use std::{cmp::Ordering::*, collections::VecDeque, sync::Arc};

use common_error::DaftResult;
use daft_micropartition::MicroPartition;

pub struct RowBasedBuffer {
pub buffer: VecDeque<Arc<MicroPartition>>,
pub curr_len: usize,
pub threshold: usize,
}

impl RowBasedBuffer {
pub fn new(threshold: usize) -> Self {
assert!(threshold > 0);
Self {
buffer: VecDeque::new(),
curr_len: 0,
threshold,
}
}

pub fn push(&mut self, part: Arc<MicroPartition>) {
self.curr_len += part.len();
self.buffer.push_back(part);
}

pub fn pop_enough(&mut self) -> DaftResult<Option<Vec<Arc<MicroPartition>>>> {
match self.curr_len.cmp(&self.threshold) {
Less => Ok(None),
Equal => {
if self.buffer.len() == 1 {
let part = self.buffer.pop_front().unwrap();
self.curr_len = 0;
Ok(Some(vec![part]))
} else {
let chunk = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
self.curr_len = 0;
Ok(Some(vec![Arc::new(chunk)]))
}
}
Greater => {
let num_ready_chunks = self.curr_len / self.threshold;
let concated = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
let mut start = 0;
let mut parts_to_return = Vec::with_capacity(num_ready_chunks);
for _ in 0..num_ready_chunks {
let end = start + self.threshold;
let part = Arc::new(concated.slice(start, end)?);
parts_to_return.push(part);
start = end;
}
if start < concated.len() {
let part = Arc::new(concated.slice(start, concated.len())?);
self.curr_len = part.len();
self.buffer.push_back(part);
} else {
self.curr_len = 0;
}
Ok(Some(parts_to_return))
}
}
}

pub fn pop_all(&mut self) -> DaftResult<Option<Arc<MicroPartition>>> {
assert!(self.curr_len < self.threshold);
if self.buffer.is_empty() {
Ok(None)
} else {
let concated = MicroPartition::concat(
&std::mem::take(&mut self.buffer)
.iter()
.map(|x| x.as_ref())
.collect::<Vec<_>>(),
)?;
self.curr_len = 0;
Ok(Some(Arc::new(concated)))
}
}
}
Loading