From 1aaf3cf15b7a54473167da182557c4decd6c9966 Mon Sep 17 00:00:00 2001 From: Kev Wang Date: Tue, 15 Oct 2024 17:11:13 -0700 Subject: [PATCH] [BUG] Fix write_deltalake add action file path prefix (#3053) --- daft/table/table_io.py | 5 ++++- src/daft-parquet/src/lib.rs | 2 +- tests/io/delta_lake/test_table_write.py | 10 ++++++++++ 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/daft/table/table_io.py b/daft/table/table_io.py index 0f892534d9..f540d7ced6 100644 --- a/daft/table/table_io.py +++ b/daft/table/table_io.py @@ -672,9 +672,12 @@ def __call__(self, written_file): else: size = 0 + # remove leading slash + path = written_file.path[1:] if written_file.path.startswith("/") else written_file.path + self.parent.add_actions.append( AddAction( - written_file.path, + path, size, self.partition_values, int(datetime.now().timestamp() * 1000), diff --git a/src/daft-parquet/src/lib.rs b/src/daft-parquet/src/lib.rs index 039124e4a2..76fe734ee0 100644 --- a/src/daft-parquet/src/lib.rs +++ b/src/daft-parquet/src/lib.rs @@ -27,7 +27,7 @@ pub enum Error { #[snafu(display("Parquet reader timed out while trying to read: {path} with a time budget of {duration_ms} ms"))] FileReadTimeout { path: String, duration_ms: i64 }, - #[snafu(display("Internal IO Error when Opening: {path}:\nDetails:\n{source}"))] + #[snafu(display("Internal IO Error when opening: {path}:\nDetails:\n{source}"))] InternalIOError { path: String, source: std::io::Error, diff --git a/tests/io/delta_lake/test_table_write.py b/tests/io/delta_lake/test_table_write.py index 7a65d835cb..03d84571b1 100644 --- a/tests/io/delta_lake/test_table_write.py +++ b/tests/io/delta_lake/test_table_write.py @@ -300,3 +300,13 @@ def test_deltalake_write_partitioned_existing_table(tmp_path): assert result["rows"] == [1, 1] check_equal_both_daft_and_delta_rs(df1.concat(df2), path, [("int", "ascending"), ("string", "ascending")]) + + +def test_deltalake_write_roundtrip(tmp_path): + path = tmp_path / "some_table" + df = daft.from_pydict({"a": [1, 2, 3, 4]}) + df.write_deltalake(str(path)) + + read_df = daft.read_deltalake(str(path)) + assert df.schema() == read_df.schema() + assert df.to_arrow() == read_df.to_arrow()