Skip to content

Commit

Permalink
Integrate new parquet APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jay Chia committed Jul 27, 2023
1 parent 1e5309e commit 0b76eee
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 13 deletions.
10 changes: 6 additions & 4 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)

from daft.datasources import ParquetSourceInfo, SourceInfo
from daft.table import Table

_CACHED_FSES: dict[str, FileSystem] = {}

Expand Down Expand Up @@ -329,10 +330,11 @@ def glob_path_with_stats(
# Set number of rows if available.
if isinstance(source_info, ParquetSourceInfo):
if source_info.use_native_downloader:
# TODO(sammy): [RUST-PARQUET]
# file_metadata = get_parquet_metadata(list(filepaths_to_infos.keys()), io_config=source_info.io_config)
# ... (for now we only need `file_metadata[i].num_rows` to be valid)
raise NotImplementedError("[RUST-PARQUET] Implement batch read of metadata")
parquet_statistics = Table.read_parquet_statistics(

Check warning on line 333 in daft/filesystem.py

View check run for this annotation

Codecov / codecov/patch

daft/filesystem.py#L333

Added line #L333 was not covered by tests
list(filepaths_to_infos.keys()), source_info.io_config
).to_pydict()
for path, num_rows in zip(parquet_statistics["uris"], parquet_statistics["row_count"]):
filepaths_to_infos[path]["rows"] = num_rows

Check warning on line 337 in daft/filesystem.py

View check run for this annotation

Codecov / codecov/patch

daft/filesystem.py#L336-L337

Added lines #L336 - L337 were not covered by tests
else:
parquet_metadatas = ThreadPoolExecutor().map(_get_parquet_metadata_single, filepaths_to_infos.keys())
for path, parquet_metadata in zip(filepaths_to_infos.keys(), parquet_metadatas):
Expand Down
6 changes: 1 addition & 5 deletions daft/table/schema_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,7 @@ def from_parquet(
"""Infers a Schema from a Parquet file"""
if use_native_downloader:
assert isinstance(file, (str, pathlib.Path))
# TODO(sammy): [RUST-PARQUET] Implement getting a schema from a Parquet file
# return get_parquet_metadata([file], io_config=io_config)[0].get_daft_schema()
raise NotImplementedError(
"Not implemented: use Rust native downloader to retrieve a Daft Schema from a Parquet file"
)
return Schema.from_parquet(str(file), io_config=io_config)

Check warning on line 84 in daft/table/schema_inference.py

View check run for this annotation

Codecov / codecov/patch

daft/table/schema_inference.py#L83-L84

Added lines #L83 - L84 were not covered by tests

if not isinstance(file, (str, pathlib.Path)):
# BytesIO path.
Expand Down
4 changes: 2 additions & 2 deletions daft/table/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,6 @@ def read_parquet_statistics(
io_config: IOConfig | None = None,
) -> Table:
if not isinstance(paths, Series):
paths = Series.from_pylist(paths)

paths = Series.from_pylist(paths, name="uris")
assert paths.name() == "uris", f"Expected input series to have name 'uris', but found: {paths.name()}"

Check warning on line 368 in daft/table/table.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table.py#L367-L368

Added lines #L367 - L368 were not covered by tests
return Table._from_pytable(_read_parquet_statistics(uris=paths._series, io_config=io_config))
3 changes: 1 addition & 2 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ def read_parquet(
return Table.read_parquet(

Check warning on line 116 in daft/table/table_io.py

View check run for this annotation

Codecov / codecov/patch

daft/table/table_io.py#L115-L116

Added lines #L115 - L116 were not covered by tests
str(file),
columns=read_options.column_names,
# TODO(sammy): [RUST-PARQUET] Add API to limit number of rows read here, instead of rowgroups
# num_rows=read_options.num_rows,
num_rows=read_options.num_rows,
io_config=io_config,
)

Expand Down

0 comments on commit 0b76eee

Please sign in to comment.