diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index e93fd0e892..247f56fb50 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -10,13 +10,25 @@ else: from typing import Protocol -import daft +from daft.datasources import ( + CSVSourceInfo, + JSONSourceInfo, + ParquetSourceInfo, + StorageType, +) from daft.expressions import Expression, ExpressionsProjection, col from daft.logical import logical_plan +from daft.logical.logical_plan import FileWrite, TabularFilesScan from daft.logical.map_partition_ops import MapPartitionOp +from daft.logical.schema import Schema from daft.resource_request import ResourceRequest -from daft.runners.partitioning import PartialPartitionMetadata, PartitionMetadata -from daft.table import Table +from daft.runners.partitioning import ( + PartialPartitionMetadata, + PartitionMetadata, + TableParseCSVOptions, + TableReadOptions, +) +from daft.table import Table, table_io PartitionT = TypeVar("PartitionT") ID_GEN = itertools.count() @@ -305,7 +317,7 @@ def run(self, inputs: list[Table]) -> list[Table]: def _read_file(self, inputs: list[Table]) -> list[Table]: assert len(inputs) == 1 [filepaths_partition] = inputs - partition = daft.runners.pyrunner.LocalLogicalPartitionOpRunner()._handle_tabular_files_scan( + partition = self._handle_tabular_files_scan( inputs={self.logplan._filepaths_child.id(): filepaths_partition}, scan=self.logplan, index=self.index, @@ -327,6 +339,84 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) ) ] + def _handle_tabular_files_scan( + self, inputs: dict[int, Table], scan: TabularFilesScan, index: int | None = None + ) -> Table: + child_id = scan._children()[0].id() + prev_partition = inputs[child_id] + data = prev_partition.to_pydict() + assert ( + scan._filepaths_column_name in data + ), f"TabularFilesScan should be ran on vPartitions with '{scan._filepaths_column_name}' column" + filepaths = data[scan._filepaths_column_name] + + if index is not None: + filepaths = [filepaths[index]] + + # Common options for reading vPartition + fs = scan._fs + schema = scan._schema + read_options = TableReadOptions( + num_rows=scan._limit_rows, + column_names=scan._column_names, # read only specified columns + ) + + if scan._source_info.scan_type() == StorageType.CSV: + assert isinstance(scan._source_info, CSVSourceInfo) + table = Table.concat( + [ + table_io.read_csv( + file=fp, + schema=schema, + fs=fs, + csv_options=TableParseCSVOptions( + delimiter=scan._source_info.delimiter, + header_index=0 if scan._source_info.has_headers else None, + ), + read_options=read_options, + ) + for fp in filepaths + ] + ) + elif scan._source_info.scan_type() == StorageType.JSON: + assert isinstance(scan._source_info, JSONSourceInfo) + table = Table.concat( + [ + table_io.read_json( + file=fp, + schema=schema, + fs=fs, + read_options=read_options, + ) + for fp in filepaths + ] + ) + elif scan._source_info.scan_type() == StorageType.PARQUET: + assert isinstance(scan._source_info, ParquetSourceInfo) + table = Table.concat( + [ + table_io.read_parquet( + file=fp, + schema=schema, + fs=fs, + read_options=read_options, + ) + for fp in filepaths + ] + ) + else: + raise NotImplementedError(f"PyRunner has not implemented scan: {scan._source_info.scan_type()}") + + expected_schema = ( + Schema._from_fields([schema[name] for name in read_options.column_names]) + if read_options.column_names is not None + else schema + ) + assert ( + table.schema() == expected_schema + ), f"Expected table to have schema:\n{expected_schema}\n\nReceived instead:\n{table.schema()}" + return table + @dataclass(frozen=True) class WriteFile(SingleOutputInstruction): @@ -338,7 +428,7 @@ def run(self, inputs: list[Table]) -> list[Table]: def _write_file(self, inputs: list[Table]) -> list[Table]: [input] = inputs - partition = daft.runners.pyrunner.LocalLogicalPartitionOpRunner()._handle_file_write( + partition = self._handle_file_write( inputs={self.logplan._children()[0].id(): input}, file_write=self.logplan, ) @@ -353,6 +443,32 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata]) ) ] + def _handle_file_write(self, inputs: dict[int, Table], file_write: FileWrite) -> Table: + child_id = file_write._children()[0].id() + assert file_write._storage_type == StorageType.PARQUET or file_write._storage_type == StorageType.CSV + if file_write._storage_type == StorageType.PARQUET: + file_names = table_io.write_parquet( + inputs[child_id], + path=file_write._root_dir, + compression=file_write._compression, + partition_cols=file_write._partition_cols, + ) + else: + file_names = table_io.write_csv( + inputs[child_id], + path=file_write._root_dir, + compression=file_write._compression, + partition_cols=file_write._partition_cols, + ) + + output_schema = file_write.schema() + assert len(output_schema) == 1 + return Table.from_pydict( + { + output_schema.column_names()[0]: file_names, + } + ) + @dataclass(frozen=True) class Filter(SingleOutputInstruction): diff --git a/daft/execution/logical_op_runners.py b/daft/execution/logical_op_runners.py deleted file mode 100644 index d571336874..0000000000 --- a/daft/execution/logical_op_runners.py +++ /dev/null @@ -1,120 +0,0 @@ -from __future__ import annotations - -from daft.datasources import ( - CSVSourceInfo, - JSONSourceInfo, - ParquetSourceInfo, - StorageType, -) -from daft.logical.logical_plan import FileWrite, TabularFilesScan -from daft.logical.schema import Schema -from daft.runners.partitioning import TableParseCSVOptions, TableReadOptions -from daft.table import Table, table_io - - -class LogicalPartitionOpRunner: - # TODO(charles): move to ExecutionStep - - def _handle_tabular_files_scan( - self, inputs: dict[int, Table], scan: TabularFilesScan, index: int | None = None - ) -> Table: - child_id = scan._children()[0].id() - prev_partition = inputs[child_id] - data = prev_partition.to_pydict() - assert ( - scan._filepaths_column_name in data - ), f"TabularFilesScan should be ran on vPartitions with '{scan._filepaths_column_name}' column" - filepaths = data[scan._filepaths_column_name] - - if index is not None: - filepaths = [filepaths[index]] - - # Common options for reading vPartition - fs = scan._fs - schema = scan._schema - read_options = TableReadOptions( - num_rows=scan._limit_rows, - column_names=scan._column_names, # read only specified columns - ) - - if scan._source_info.scan_type() == StorageType.CSV: - assert isinstance(scan._source_info, CSVSourceInfo) - table = Table.concat( - [ - table_io.read_csv( - file=fp, - schema=schema, - fs=fs, - csv_options=TableParseCSVOptions( - delimiter=scan._source_info.delimiter, - header_index=0 if scan._source_info.has_headers else None, - ), - read_options=read_options, - ) - for fp in filepaths - ] - ) - elif scan._source_info.scan_type() == StorageType.JSON: - assert isinstance(scan._source_info, JSONSourceInfo) - table = Table.concat( - [ - table_io.read_json( - file=fp, - schema=schema, - fs=fs, - read_options=read_options, - ) - for fp in filepaths - ] - ) - elif scan._source_info.scan_type() == StorageType.PARQUET: - assert isinstance(scan._source_info, ParquetSourceInfo) - table = Table.concat( - [ - table_io.read_parquet( - file=fp, - schema=schema, - fs=fs, - read_options=read_options, - ) - for fp in filepaths - ] - ) - else: - raise NotImplementedError(f"PyRunner has not implemented scan: {scan._source_info.scan_type()}") - - expected_schema = ( - Schema._from_fields([schema[name] for name in read_options.column_names]) - if read_options.column_names is not None - else schema - ) - assert ( - table.schema() == expected_schema - ), f"Expected table to have schema:\n{expected_schema}\n\nReceived instead:\n{table.schema()}" - return table - - def _handle_file_write(self, inputs: dict[int, Table], file_write: FileWrite) -> Table: - child_id = file_write._children()[0].id() - assert file_write._storage_type == StorageType.PARQUET or file_write._storage_type == StorageType.CSV - if file_write._storage_type == StorageType.PARQUET: - file_names = table_io.write_parquet( - inputs[child_id], - path=file_write._root_dir, - compression=file_write._compression, - partition_cols=file_write._partition_cols, - ) - else: - file_names = table_io.write_csv( - inputs[child_id], - path=file_write._root_dir, - compression=file_write._compression, - partition_cols=file_write._partition_cols, - ) - - output_schema = file_write.schema() - assert len(output_schema) == 1 - return Table.from_pydict( - { - output_schema.column_names()[0]: file_names, - } - ) diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index abcf3c8f1c..b1062500b4 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -13,7 +13,6 @@ from daft.datasources import SourceInfo from daft.execution import physical_plan, physical_plan_factory from daft.execution.execution_step import Instruction, MaterializedResult, PartitionTask -from daft.execution.logical_op_runners import LogicalPartitionOpRunner from daft.filesystem import get_filesystem_from_path, glob_path_with_stats from daft.internal.gpu import cuda_device_count from daft.internal.rule_runner import FixedPointPolicy, Once, RuleBatch, RuleRunner @@ -136,10 +135,6 @@ def get_schema_from_first_filepath( return runner_io.sample_schema(first_filepath, source_info, fs) -class LocalLogicalPartitionOpRunner(LogicalPartitionOpRunner): - ... - - class PyRunner(Runner[Table]): def __init__(self, use_thread_pool: bool | None) -> None: super().__init__()