Skip to content

Commit

Permalink
[FEAT] [New Query Planner] Support for Ray runner in new query planne…
Browse files Browse the repository at this point in the history
…r. (#1265)

This PR enables the Ray runner in the new query planner. This is
accomplished by encapsulating the Rust-side `PhysicalPlan` in a
Python-facing `PhysicalPlanScheduler`, which we then make pickleable.

## TODOs

- [x] Add support for `ResourceRequest`s
  • Loading branch information
clarkzinzow committed Aug 14, 2023
1 parent 5a2dc7a commit c43c76c
Show file tree
Hide file tree
Showing 51 changed files with 440 additions and 227 deletions.
3 changes: 2 additions & 1 deletion daft/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class daft:
from_pylist,
from_ray_dataset,
)
from daft.daft import ImageFormat
from daft.daft import ImageFormat, ResourceRequest
from daft.dataframe import DataFrame
from daft.datatype import DataType, ImageMode, TimeUnit
from daft.expressions import col, lit
Expand Down Expand Up @@ -106,4 +106,5 @@ class daft:
"TimeUnit",
"register_viz_hook",
"udf",
"ResourceRequest",
]
9 changes: 7 additions & 2 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,18 @@
from daft.api_annotations import DataframePublicAPI
from daft.context import get_context
from daft.convert import InputListType
from daft.daft import FileFormat, JoinType, PartitionScheme, PartitionSpec
from daft.daft import (
FileFormat,
JoinType,
PartitionScheme,
PartitionSpec,
ResourceRequest,
)
from daft.dataframe.preview import DataFramePreview
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
from daft.expressions import Expression, ExpressionsProjection, col, lit
from daft.logical.builder import LogicalPlanBuilder
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry, PartitionSet
from daft.runners.pyrunner import LocalPartitionSet
from daft.table import Table
Expand Down
2 changes: 1 addition & 1 deletion daft/execution/execution_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
JoinType,
JsonSourceConfig,
ParquetSourceConfig,
ResourceRequest,
)
from daft.expressions import Expression, ExpressionsProjection, col
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import (
PartialPartitionMetadata,
PartitionMetadata,
Expand Down
3 changes: 1 addition & 2 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from loguru import logger

from daft.daft import FileFormat, FileFormatConfig, JoinType
from daft.daft import FileFormat, FileFormatConfig, JoinType, ResourceRequest
from daft.execution import execution_step
from daft.execution.execution_step import (
Instruction,
Expand All @@ -35,7 +35,6 @@
)
from daft.expressions import ExpressionsProjection
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartialPartitionMetadata

PartitionT = TypeVar("PartitionT")
Expand Down
34 changes: 25 additions & 9 deletions daft/execution/rust_physical_plan_shim.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
from typing import Iterator, TypeVar, cast

from daft.context import get_context
from daft.daft import FileFormat, FileFormatConfig, JoinType, PyExpr, PySchema, PyTable
from daft.daft import (
FileFormat,
FileFormatConfig,
JoinType,
PyExpr,
PySchema,
PyTable,
ResourceRequest,
)
from daft.execution import execution_step, physical_plan
from daft.expressions import Expression, ExpressionsProjection
from daft.logical.map_partition_ops import MapPartitionOp
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.table import Table

PartitionT = TypeVar("PartitionT")
Expand All @@ -27,29 +34,38 @@ def local_aggregate(
return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=aggregation_step,
resource_request=ResourceRequest(), # TODO use real resource request
resource_request=ResourceRequest(),
)


def tabular_scan(
schema: PySchema, file_info_table: PyTable, file_format_config: FileFormatConfig, limit: int
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
parts = cast(Iterator[PartitionT], [Table._from_pytable(file_info_table)])
file_info_iter = physical_plan.partition_read(iter(parts))
# TODO(Clark): Fix this Ray runner hack.
part = Table._from_pytable(file_info_table)
if get_context().is_ray_runner:
import ray

parts = [ray.put(part)]
else:
parts = [part]
parts_t = cast(Iterator[PartitionT], parts)

file_info_iter = physical_plan.partition_read(iter(parts_t))
filepaths_column_name = get_context().runner().runner_io().FS_LISTING_PATH_COLUMN_NAME
return physical_plan.file_read(
file_info_iter, limit, Schema._from_pyschema(schema), None, None, file_format_config, filepaths_column_name
)


def project(
input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr]
input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr], resource_request: ResourceRequest
) -> physical_plan.InProgressPhysicalPlan[PartitionT]:
expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection])
return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=execution_step.Project(expr_projection),
resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest.
resource_request=resource_request,
)


Expand All @@ -74,7 +90,7 @@ def explode(
return physical_plan.pipeline_instruction(
child_plan=input,
pipeable_instruction=execution_step.MapPartition(explode_op),
resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest.
resource_request=ResourceRequest(),
)


Expand Down Expand Up @@ -106,7 +122,7 @@ def split_by_hash(
return physical_plan.pipeline_instruction(
input,
fanout_instruction,
ResourceRequest(), # TODO(Clark): Propagate resource request.
ResourceRequest(),
)


Expand Down
18 changes: 6 additions & 12 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
JoinType,
PartitionScheme,
PartitionSpec,
ResourceRequest,
)
from daft.expressions.expressions import Expression, ExpressionsProjection
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry

if TYPE_CHECKING:
from daft.planner import QueryPlanner
from daft.planner import PhysicalPlanScheduler


class LogicalPlanBuilder(ABC):
Expand All @@ -28,12 +28,12 @@ class LogicalPlanBuilder(ABC):
"""

@abstractmethod
def to_planner(self) -> QueryPlanner:
def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler:
"""
Convert this logical plan builder to a query planner, which is used to translate
a logical plan to a physical plan and to generate executable tasks for the physical plan.
Convert the underlying logical plan to a physical plan scheduler, which is
used to generate executable tasks for the physical plan.
This should be called after trigger optimization with self.optimize().
This should be called after triggering optimization with self.optimize().
"""

@abstractmethod
Expand All @@ -54,12 +54,6 @@ def num_partitions(self) -> int:
"""
return self.partition_spec().num_partitions

@abstractmethod
def resource_request(self) -> ResourceRequest:
"""
Returns a custom ResourceRequest if one has been attached to this logical plan.
"""

@abstractmethod
def pretty_print(self) -> str:
"""
Expand Down
13 changes: 5 additions & 8 deletions daft/logical/logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
JoinType,
PartitionScheme,
PartitionSpec,
ResourceRequest,
)
from daft.datatype import DataType
from daft.errors import ExpressionTypeError
Expand All @@ -26,12 +27,11 @@
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.map_partition_ops import ExplodeOp, MapPartitionOp
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry
from daft.table import Table

if TYPE_CHECKING:
from daft.planner.py_planner import PyQueryPlanner
from daft.planner.py_planner import PyPhysicalPlanScheduler


class OpLevel(IntEnum):
Expand All @@ -47,20 +47,17 @@ def __init__(self, plan: LogicalPlan):
def __repr__(self) -> str:
return self._plan.pretty_print()

def to_planner(self) -> PyQueryPlanner:
from daft.planner.py_planner import PyQueryPlanner
def to_physical_plan_scheduler(self) -> PyPhysicalPlanScheduler:
from daft.planner.py_planner import PyPhysicalPlanScheduler

return PyQueryPlanner(self._plan)
return PyPhysicalPlanScheduler(self._plan)

def schema(self) -> Schema:
return self._plan.schema()

def partition_spec(self) -> PartitionSpec:
return self._plan.partition_spec()

def resource_request(self) -> ResourceRequest:
return self._plan.resource_request()

def pretty_print(self) -> str:
return self._plan.pretty_print()

Expand Down
5 changes: 2 additions & 3 deletions daft/logical/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

from loguru import logger

from daft import resource_request
from daft.daft import PartitionScheme
from daft.daft import PartitionScheme, ResourceRequest
from daft.expressions import ExpressionsProjection, col
from daft.internal.rule import Rule
from daft.logical.logical_plan import (
Expand Down Expand Up @@ -404,7 +403,7 @@ def _drop_double_projection(self, parent: Projection, child: Projection) -> Logi
return Projection(
grandchild,
ExpressionsProjection(new_exprs),
custom_resource_request=resource_request.ResourceRequest.max_resources(
custom_resource_request=ResourceRequest.max_resources(
[parent.resource_request(), child.resource_request()]
),
)
Expand Down
19 changes: 6 additions & 13 deletions daft/logical/rust_logical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,15 @@
from daft.context import get_context
from daft.daft import FileFormat, FileFormatConfig, JoinType
from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
from daft.daft import PartitionScheme, PartitionSpec
from daft.daft import PartitionScheme, PartitionSpec, ResourceRequest
from daft.errors import ExpressionTypeError
from daft.expressions.expressions import Expression, ExpressionsProjection
from daft.logical.builder import LogicalPlanBuilder
from daft.logical.schema import Schema
from daft.resource_request import ResourceRequest
from daft.runners.partitioning import PartitionCacheEntry

if TYPE_CHECKING:
from daft.planner.rust_planner import RustQueryPlanner
from daft.planner.rust_planner import RustPhysicalPlanScheduler


class RustLogicalPlanBuilder(LogicalPlanBuilder):
Expand All @@ -27,10 +26,10 @@ class RustLogicalPlanBuilder(LogicalPlanBuilder):
def __init__(self, builder: _LogicalPlanBuilder) -> None:
self._builder = builder

def to_planner(self) -> RustQueryPlanner:
from daft.planner.rust_planner import RustQueryPlanner
def to_physical_plan_scheduler(self) -> RustPhysicalPlanScheduler:
from daft.planner.rust_planner import RustPhysicalPlanScheduler

return RustQueryPlanner(self._builder)
return RustPhysicalPlanScheduler(self._builder.to_physical_plan_scheduler())

def schema(self) -> Schema:
pyschema = self._builder.schema()
Expand All @@ -40,10 +39,6 @@ def partition_spec(self) -> PartitionSpec:
# TODO(Clark): Push PartitionSpec into planner.
return self._builder.partition_spec()

def resource_request(self) -> ResourceRequest:
# TODO(Clark): Expose resource request via builder, or push it into the planner.
return ResourceRequest()

def pretty_print(self) -> str:
return repr(self)

Expand Down Expand Up @@ -95,11 +90,9 @@ def project(
projection: ExpressionsProjection,
custom_resource_request: ResourceRequest = ResourceRequest(),
) -> RustLogicalPlanBuilder:
if custom_resource_request != ResourceRequest():
raise NotImplementedError("ResourceRequests not supported for new query planner")
schema = projection.resolve_schema(self.schema())
exprs = [expr._expr for expr in projection]
builder = self._builder.project(exprs, schema._schema)
builder = self._builder.project(exprs, schema._schema, custom_resource_request)
return RustLogicalPlanBuilder(builder)

def filter(self, predicate: Expression) -> RustLogicalPlanBuilder:
Expand Down
4 changes: 2 additions & 2 deletions daft/planner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from __future__ import annotations

from daft.planner.planner import QueryPlanner
from daft.planner.planner import PhysicalPlanScheduler

__all__ = ["QueryPlanner"]
__all__ = ["PhysicalPlanScheduler"]
7 changes: 3 additions & 4 deletions daft/planner/planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
from daft.runners.partitioning import PartitionT


class QueryPlanner(ABC):
class PhysicalPlanScheduler(ABC):
"""
An interface for translating a logical plan to a physical plan, and generating
executable tasks for the latter.
An interface for generating executable tasks for an underlying physical plan.
"""

@abstractmethod
def plan(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
pass
6 changes: 3 additions & 3 deletions daft/planner/py_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

from daft.execution import physical_plan, physical_plan_factory
from daft.logical import logical_plan
from daft.planner.planner import PartitionT, QueryPlanner
from daft.planner.planner import PartitionT, PhysicalPlanScheduler


class PyQueryPlanner(QueryPlanner):
class PyPhysicalPlanScheduler(PhysicalPlanScheduler):
def __init__(self, plan: logical_plan.LogicalPlan):
self._plan = plan

def plan(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(physical_plan_factory._get_physical_plan(self._plan, psets))
14 changes: 7 additions & 7 deletions daft/planner/rust_planner.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from __future__ import annotations

from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder
from daft.daft import PhysicalPlanScheduler as _PhysicalPlanScheduler
from daft.execution import physical_plan
from daft.planner.planner import PartitionT, QueryPlanner
from daft.planner.planner import PartitionT, PhysicalPlanScheduler


class RustQueryPlanner(QueryPlanner):
def __init__(self, builder: _LogicalPlanBuilder):
self._builder = builder
class RustPhysicalPlanScheduler(PhysicalPlanScheduler):
def __init__(self, scheduler: _PhysicalPlanScheduler):
self._scheduler = scheduler

def plan(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._builder.to_partition_tasks(psets))
def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan:
return physical_plan.materialize(self._scheduler.to_partition_tasks(psets))
Loading

0 comments on commit c43c76c

Please sign in to comment.