Skip to content

Commit

Permalink
Finally remove logical_op_runners.py
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiayue Charles Lin committed Jun 8, 2023
1 parent 122b8ed commit 930cce1
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 119 deletions.
117 changes: 112 additions & 5 deletions daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,25 @@
else:
from typing import Protocol

import daft
from daft.datasources import (
CSVSourceInfo,
JSONSourceInfo,
ParquetSourceInfo,
StorageType,
)
from daft.expressions import Expression, ExpressionsProjection, col
from daft.logical import logical_plan
from daft.logical.logical_plan import FileWrite, TabularFilesScan
from daft.logical.map_partition_ops import MapPartitionOp
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartialPartitionMetadata, PartitionMetadata
from daft.table import Table
from daft.runners.partitioning import (
PartialPartitionMetadata,
PartitionMetadata,
vPartitionParseCSVOptions,
vPartitionReadOptions,
vPartitionSchemaInferenceOptions,
)
from daft.table import Table, table_io

PartitionT = TypeVar("PartitionT")
ID_GEN = itertools.count()
Expand Down Expand Up @@ -305,7 +317,7 @@ def run(self, inputs: list[Table]) -> list[Table]:
def _read_file(self, inputs: list[Table]) -> list[Table]:
assert len(inputs) == 1
[filepaths_partition] = inputs
partition = daft.runners.pyrunner.LocalLogicalPartitionOpRunner()._handle_tabular_files_scan(
partition = self._handle_tabular_files_scan(
inputs={self.logplan._filepaths_child.id(): filepaths_partition},
scan=self.logplan,
index=self.index,
Expand All @@ -327,6 +339,75 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
)
]

def _handle_tabular_files_scan(
self, inputs: dict[int, Table], scan: TabularFilesScan, index: int | None = None
) -> Table:
child_id = scan._children()[0].id()
prev_partition = inputs[child_id]
data = prev_partition.to_pydict()
assert (
scan._filepaths_column_name in data
), f"TabularFilesScan should be ran on vPartitions with '{scan._filepaths_column_name}' column"
filepaths = data[scan._filepaths_column_name]

if index is not None:
filepaths = [filepaths[index]]

# Common options for reading vPartition
fs = scan._fs
schema = scan._schema
schema_options = vPartitionSchemaInferenceOptions(schema=schema)
read_options = vPartitionReadOptions(
num_rows=scan._limit_rows,
column_names=scan._column_names, # read only specified columns
)

if scan._source_info.scan_type() == StorageType.CSV:
assert isinstance(scan._source_info, CSVSourceInfo)
return Table.concat(
[
table_io.read_csv(
file=fp,
fs=fs,
csv_options=vPartitionParseCSVOptions(
delimiter=scan._source_info.delimiter,
has_headers=scan._source_info.has_headers,
skip_rows_before_header=0,
skip_rows_after_header=0,
),
schema_options=schema_options,
read_options=read_options,
)
for fp in filepaths
]
)
elif scan._source_info.scan_type() == StorageType.JSON:
assert isinstance(scan._source_info, JSONSourceInfo)
return Table.concat(
[
table_io.read_json(
file=fp,
fs=fs,
read_options=read_options,
)
for fp in filepaths
]
)
elif scan._source_info.scan_type() == StorageType.PARQUET:
assert isinstance(scan._source_info, ParquetSourceInfo)
return Table.concat(
[
table_io.read_parquet(
file=fp,
fs=fs,
read_options=read_options,
)
for fp in filepaths
]
)
else:
raise NotImplementedError(f"Scan is not implemented for: {scan._source_info.scan_type()}")


@dataclass(frozen=True)
class WriteFile(SingleOutputInstruction):
Expand All @@ -338,7 +419,7 @@ def run(self, inputs: list[Table]) -> list[Table]:

def _write_file(self, inputs: list[Table]) -> list[Table]:
[input] = inputs
partition = daft.runners.pyrunner.LocalLogicalPartitionOpRunner()._handle_file_write(
partition = self._handle_file_write(
inputs={self.logplan._children()[0].id(): input},
file_write=self.logplan,
)
Expand All @@ -353,6 +434,32 @@ def run_partial_metadata(self, input_metadatas: list[PartialPartitionMetadata])
)
]

def _handle_file_write(self, inputs: dict[int, Table], file_write: FileWrite) -> Table:
child_id = file_write._children()[0].id()
assert file_write._storage_type == StorageType.PARQUET or file_write._storage_type == StorageType.CSV
if file_write._storage_type == StorageType.PARQUET:
file_names = table_io.write_parquet(
inputs[child_id],
path=file_write._root_dir,
compression=file_write._compression,
partition_cols=file_write._partition_cols,
)
else:
file_names = table_io.write_csv(
inputs[child_id],
path=file_write._root_dir,
compression=file_write._compression,
partition_cols=file_write._partition_cols,
)

output_schema = file_write.schema()
assert len(output_schema) == 1
return Table.from_pydict(
{
output_schema.column_names()[0]: file_names,
}
)


@dataclass(frozen=True)
class Filter(SingleOutputInstruction):
Expand Down
114 changes: 0 additions & 114 deletions daft/execution/logical_op_runners.py

This file was deleted.

0 comments on commit 930cce1

Please sign in to comment.