From c43c76cade503b2662e906918067b608d47d66c3 Mon Sep 17 00:00:00 2001 From: Clark Zinzow Date: Mon, 14 Aug 2023 13:43:26 -0700 Subject: [PATCH] [FEAT] [New Query Planner] Support for Ray runner in new query planner. (#1265) This PR enables the Ray runner in the new query planner. This is accomplished by encapsulating the Rust-side `PhysicalPlan` in a Python-facing `PhysicalPlanScheduler`, which we then make pickleable. ## TODOs - [x] Add support for `ResourceRequest`s --- daft/__init__.py | 3 +- daft/dataframe/dataframe.py | 9 +- daft/execution/execution_step.py | 2 +- daft/execution/physical_plan.py | 3 +- daft/execution/rust_physical_plan_shim.py | 34 +++++-- daft/logical/builder.py | 18 ++-- daft/logical/logical_plan.py | 13 +-- daft/logical/optimizer.py | 5 +- daft/logical/rust_logical_plan.py | 19 ++-- daft/planner/__init__.py | 4 +- daft/planner/planner.py | 7 +- daft/planner/py_planner.py | 6 +- daft/planner/rust_planner.py | 14 +-- daft/resource_request.py | 52 ---------- daft/runners/pyrunner.py | 11 +-- daft/runners/ray_runner.py | 25 +++-- daft/runners/runner.py | 4 - docs/source/learn/user_guides/udf.rst | 2 +- src/daft-core/src/schema.rs | 6 ++ src/daft-plan/src/builder.rs | 11 ++- src/daft-plan/src/lib.rs | 5 + src/daft-plan/src/logical_plan.rs | 35 +++---- src/daft-plan/src/ops/project.rs | 5 +- src/daft-plan/src/partitioning.rs | 10 ++ src/daft-plan/src/physical_ops/agg.rs | 3 +- src/daft-plan/src/physical_ops/coalesce.rs | 3 +- src/daft-plan/src/physical_ops/concat.rs | 3 +- src/daft-plan/src/physical_ops/csv.rs | 5 +- src/daft-plan/src/physical_ops/explode.rs | 3 +- src/daft-plan/src/physical_ops/fanout.rs | 7 +- src/daft-plan/src/physical_ops/filter.rs | 3 +- src/daft-plan/src/physical_ops/flatten.rs | 3 +- src/daft-plan/src/physical_ops/in_memory.rs | 3 +- src/daft-plan/src/physical_ops/join.rs | 3 +- src/daft-plan/src/physical_ops/json.rs | 5 +- src/daft-plan/src/physical_ops/limit.rs | 3 +- src/daft-plan/src/physical_ops/parquet.rs | 5 +- src/daft-plan/src/physical_ops/project.rs | 18 +++- src/daft-plan/src/physical_ops/reduce.rs | 3 +- src/daft-plan/src/physical_ops/sort.rs | 3 +- src/daft-plan/src/physical_ops/split.rs | 3 +- src/daft-plan/src/physical_plan.rs | 62 +++++++++++- src/daft-plan/src/planner.rs | 6 +- src/daft-plan/src/resource_request.rs | 95 +++++++++++++++++++ src/daft-plan/src/sink_info.rs | 3 +- src/daft-plan/src/source_info.rs | 78 +++++++++++++-- tests/conftest.py | 2 - tests/optimizer/test_fold_projections.py | 2 +- .../test_resource_requests.py | 38 ++++---- tests_legacy/__init__.py | 0 .../text_to_image_generation.ipynb | 2 +- 51 files changed, 440 insertions(+), 227 deletions(-) delete mode 100644 daft/resource_request.py create mode 100644 src/daft-plan/src/resource_request.rs rename {tests_legacy => tests}/test_resource_requests.py (81%) delete mode 100644 tests_legacy/__init__.py diff --git a/daft/__init__.py b/daft/__init__.py index 41eb3c312f..99ff9618ae 100644 --- a/daft/__init__.py +++ b/daft/__init__.py @@ -76,7 +76,7 @@ class daft: from_pylist, from_ray_dataset, ) -from daft.daft import ImageFormat +from daft.daft import ImageFormat, ResourceRequest from daft.dataframe import DataFrame from daft.datatype import DataType, ImageMode, TimeUnit from daft.expressions import col, lit @@ -106,4 +106,5 @@ class daft: "TimeUnit", "register_viz_hook", "udf", + "ResourceRequest", ] diff --git a/daft/dataframe/dataframe.py b/daft/dataframe/dataframe.py index 384a486e9c..6d20c25324 100644 --- a/daft/dataframe/dataframe.py +++ b/daft/dataframe/dataframe.py @@ -24,13 +24,18 @@ from daft.api_annotations import DataframePublicAPI from daft.context import get_context from daft.convert import InputListType -from daft.daft import FileFormat, JoinType, PartitionScheme, PartitionSpec +from daft.daft import ( + FileFormat, + JoinType, + PartitionScheme, + PartitionSpec, + ResourceRequest, +) from daft.dataframe.preview import DataFramePreview from daft.datatype import DataType from daft.errors import ExpressionTypeError from daft.expressions import Expression, ExpressionsProjection, col, lit from daft.logical.builder import LogicalPlanBuilder -from daft.resource_request import ResourceRequest from daft.runners.partitioning import PartitionCacheEntry, PartitionSet from daft.runners.pyrunner import LocalPartitionSet from daft.table import Table diff --git a/daft/execution/execution_step.py b/daft/execution/execution_step.py index deadc561f7..701d82e441 100644 --- a/daft/execution/execution_step.py +++ b/daft/execution/execution_step.py @@ -21,11 +21,11 @@ JoinType, JsonSourceConfig, ParquetSourceConfig, + ResourceRequest, ) from daft.expressions import Expression, ExpressionsProjection, col from daft.logical.map_partition_ops import MapPartitionOp from daft.logical.schema import Schema -from daft.resource_request import ResourceRequest from daft.runners.partitioning import ( PartialPartitionMetadata, PartitionMetadata, diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index e90985af26..cf6bc01cee 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -23,7 +23,7 @@ from loguru import logger -from daft.daft import FileFormat, FileFormatConfig, JoinType +from daft.daft import FileFormat, FileFormatConfig, JoinType, ResourceRequest from daft.execution import execution_step from daft.execution.execution_step import ( Instruction, @@ -35,7 +35,6 @@ ) from daft.expressions import ExpressionsProjection from daft.logical.schema import Schema -from daft.resource_request import ResourceRequest from daft.runners.partitioning import PartialPartitionMetadata PartitionT = TypeVar("PartitionT") diff --git a/daft/execution/rust_physical_plan_shim.py b/daft/execution/rust_physical_plan_shim.py index 690117802f..92db0888cc 100644 --- a/daft/execution/rust_physical_plan_shim.py +++ b/daft/execution/rust_physical_plan_shim.py @@ -3,12 +3,19 @@ from typing import Iterator, TypeVar, cast from daft.context import get_context -from daft.daft import FileFormat, FileFormatConfig, JoinType, PyExpr, PySchema, PyTable +from daft.daft import ( + FileFormat, + FileFormatConfig, + JoinType, + PyExpr, + PySchema, + PyTable, + ResourceRequest, +) from daft.execution import execution_step, physical_plan from daft.expressions import Expression, ExpressionsProjection from daft.logical.map_partition_ops import MapPartitionOp from daft.logical.schema import Schema -from daft.resource_request import ResourceRequest from daft.table import Table PartitionT = TypeVar("PartitionT") @@ -27,15 +34,24 @@ def local_aggregate( return physical_plan.pipeline_instruction( child_plan=input, pipeable_instruction=aggregation_step, - resource_request=ResourceRequest(), # TODO use real resource request + resource_request=ResourceRequest(), ) def tabular_scan( schema: PySchema, file_info_table: PyTable, file_format_config: FileFormatConfig, limit: int ) -> physical_plan.InProgressPhysicalPlan[PartitionT]: - parts = cast(Iterator[PartitionT], [Table._from_pytable(file_info_table)]) - file_info_iter = physical_plan.partition_read(iter(parts)) + # TODO(Clark): Fix this Ray runner hack. + part = Table._from_pytable(file_info_table) + if get_context().is_ray_runner: + import ray + + parts = [ray.put(part)] + else: + parts = [part] + parts_t = cast(Iterator[PartitionT], parts) + + file_info_iter = physical_plan.partition_read(iter(parts_t)) filepaths_column_name = get_context().runner().runner_io().FS_LISTING_PATH_COLUMN_NAME return physical_plan.file_read( file_info_iter, limit, Schema._from_pyschema(schema), None, None, file_format_config, filepaths_column_name @@ -43,13 +59,13 @@ def tabular_scan( def project( - input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr] + input: physical_plan.InProgressPhysicalPlan[PartitionT], projection: list[PyExpr], resource_request: ResourceRequest ) -> physical_plan.InProgressPhysicalPlan[PartitionT]: expr_projection = ExpressionsProjection([Expression._from_pyexpr(expr) for expr in projection]) return physical_plan.pipeline_instruction( child_plan=input, pipeable_instruction=execution_step.Project(expr_projection), - resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest. + resource_request=resource_request, ) @@ -74,7 +90,7 @@ def explode( return physical_plan.pipeline_instruction( child_plan=input, pipeable_instruction=execution_step.MapPartition(explode_op), - resource_request=ResourceRequest(), # TODO(Clark): Use real ResourceRequest. + resource_request=ResourceRequest(), ) @@ -106,7 +122,7 @@ def split_by_hash( return physical_plan.pipeline_instruction( input, fanout_instruction, - ResourceRequest(), # TODO(Clark): Propagate resource request. + ResourceRequest(), ) diff --git a/daft/logical/builder.py b/daft/logical/builder.py index f1e085de28..6f8179e2b7 100644 --- a/daft/logical/builder.py +++ b/daft/logical/builder.py @@ -12,14 +12,14 @@ JoinType, PartitionScheme, PartitionSpec, + ResourceRequest, ) from daft.expressions.expressions import Expression, ExpressionsProjection from daft.logical.schema import Schema -from daft.resource_request import ResourceRequest from daft.runners.partitioning import PartitionCacheEntry if TYPE_CHECKING: - from daft.planner import QueryPlanner + from daft.planner import PhysicalPlanScheduler class LogicalPlanBuilder(ABC): @@ -28,12 +28,12 @@ class LogicalPlanBuilder(ABC): """ @abstractmethod - def to_planner(self) -> QueryPlanner: + def to_physical_plan_scheduler(self) -> PhysicalPlanScheduler: """ - Convert this logical plan builder to a query planner, which is used to translate - a logical plan to a physical plan and to generate executable tasks for the physical plan. + Convert the underlying logical plan to a physical plan scheduler, which is + used to generate executable tasks for the physical plan. - This should be called after trigger optimization with self.optimize(). + This should be called after triggering optimization with self.optimize(). """ @abstractmethod @@ -54,12 +54,6 @@ def num_partitions(self) -> int: """ return self.partition_spec().num_partitions - @abstractmethod - def resource_request(self) -> ResourceRequest: - """ - Returns a custom ResourceRequest if one has been attached to this logical plan. - """ - @abstractmethod def pretty_print(self) -> str: """ diff --git a/daft/logical/logical_plan.py b/daft/logical/logical_plan.py index 9f2fcc4616..3c7d80392f 100644 --- a/daft/logical/logical_plan.py +++ b/daft/logical/logical_plan.py @@ -16,6 +16,7 @@ JoinType, PartitionScheme, PartitionSpec, + ResourceRequest, ) from daft.datatype import DataType from daft.errors import ExpressionTypeError @@ -26,12 +27,11 @@ from daft.logical.builder import LogicalPlanBuilder from daft.logical.map_partition_ops import ExplodeOp, MapPartitionOp from daft.logical.schema import Schema -from daft.resource_request import ResourceRequest from daft.runners.partitioning import PartitionCacheEntry from daft.table import Table if TYPE_CHECKING: - from daft.planner.py_planner import PyQueryPlanner + from daft.planner.py_planner import PyPhysicalPlanScheduler class OpLevel(IntEnum): @@ -47,10 +47,10 @@ def __init__(self, plan: LogicalPlan): def __repr__(self) -> str: return self._plan.pretty_print() - def to_planner(self) -> PyQueryPlanner: - from daft.planner.py_planner import PyQueryPlanner + def to_physical_plan_scheduler(self) -> PyPhysicalPlanScheduler: + from daft.planner.py_planner import PyPhysicalPlanScheduler - return PyQueryPlanner(self._plan) + return PyPhysicalPlanScheduler(self._plan) def schema(self) -> Schema: return self._plan.schema() @@ -58,9 +58,6 @@ def schema(self) -> Schema: def partition_spec(self) -> PartitionSpec: return self._plan.partition_spec() - def resource_request(self) -> ResourceRequest: - return self._plan.resource_request() - def pretty_print(self) -> str: return self._plan.pretty_print() diff --git a/daft/logical/optimizer.py b/daft/logical/optimizer.py index 8e0ad2ae9b..20bdce62e4 100644 --- a/daft/logical/optimizer.py +++ b/daft/logical/optimizer.py @@ -2,8 +2,7 @@ from loguru import logger -from daft import resource_request -from daft.daft import PartitionScheme +from daft.daft import PartitionScheme, ResourceRequest from daft.expressions import ExpressionsProjection, col from daft.internal.rule import Rule from daft.logical.logical_plan import ( @@ -404,7 +403,7 @@ def _drop_double_projection(self, parent: Projection, child: Projection) -> Logi return Projection( grandchild, ExpressionsProjection(new_exprs), - custom_resource_request=resource_request.ResourceRequest.max_resources( + custom_resource_request=ResourceRequest.max_resources( [parent.resource_request(), child.resource_request()] ), ) diff --git a/daft/logical/rust_logical_plan.py b/daft/logical/rust_logical_plan.py index f48db728d1..6bc7bfd334 100644 --- a/daft/logical/rust_logical_plan.py +++ b/daft/logical/rust_logical_plan.py @@ -9,16 +9,15 @@ from daft.context import get_context from daft.daft import FileFormat, FileFormatConfig, JoinType from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder -from daft.daft import PartitionScheme, PartitionSpec +from daft.daft import PartitionScheme, PartitionSpec, ResourceRequest from daft.errors import ExpressionTypeError from daft.expressions.expressions import Expression, ExpressionsProjection from daft.logical.builder import LogicalPlanBuilder from daft.logical.schema import Schema -from daft.resource_request import ResourceRequest from daft.runners.partitioning import PartitionCacheEntry if TYPE_CHECKING: - from daft.planner.rust_planner import RustQueryPlanner + from daft.planner.rust_planner import RustPhysicalPlanScheduler class RustLogicalPlanBuilder(LogicalPlanBuilder): @@ -27,10 +26,10 @@ class RustLogicalPlanBuilder(LogicalPlanBuilder): def __init__(self, builder: _LogicalPlanBuilder) -> None: self._builder = builder - def to_planner(self) -> RustQueryPlanner: - from daft.planner.rust_planner import RustQueryPlanner + def to_physical_plan_scheduler(self) -> RustPhysicalPlanScheduler: + from daft.planner.rust_planner import RustPhysicalPlanScheduler - return RustQueryPlanner(self._builder) + return RustPhysicalPlanScheduler(self._builder.to_physical_plan_scheduler()) def schema(self) -> Schema: pyschema = self._builder.schema() @@ -40,10 +39,6 @@ def partition_spec(self) -> PartitionSpec: # TODO(Clark): Push PartitionSpec into planner. return self._builder.partition_spec() - def resource_request(self) -> ResourceRequest: - # TODO(Clark): Expose resource request via builder, or push it into the planner. - return ResourceRequest() - def pretty_print(self) -> str: return repr(self) @@ -95,11 +90,9 @@ def project( projection: ExpressionsProjection, custom_resource_request: ResourceRequest = ResourceRequest(), ) -> RustLogicalPlanBuilder: - if custom_resource_request != ResourceRequest(): - raise NotImplementedError("ResourceRequests not supported for new query planner") schema = projection.resolve_schema(self.schema()) exprs = [expr._expr for expr in projection] - builder = self._builder.project(exprs, schema._schema) + builder = self._builder.project(exprs, schema._schema, custom_resource_request) return RustLogicalPlanBuilder(builder) def filter(self, predicate: Expression) -> RustLogicalPlanBuilder: diff --git a/daft/planner/__init__.py b/daft/planner/__init__.py index 059660e248..6ddcae6303 100644 --- a/daft/planner/__init__.py +++ b/daft/planner/__init__.py @@ -1,5 +1,5 @@ from __future__ import annotations -from daft.planner.planner import QueryPlanner +from daft.planner.planner import PhysicalPlanScheduler -__all__ = ["QueryPlanner"] +__all__ = ["PhysicalPlanScheduler"] diff --git a/daft/planner/planner.py b/daft/planner/planner.py index 95ad664422..5ee5a66346 100644 --- a/daft/planner/planner.py +++ b/daft/planner/planner.py @@ -6,12 +6,11 @@ from daft.runners.partitioning import PartitionT -class QueryPlanner(ABC): +class PhysicalPlanScheduler(ABC): """ - An interface for translating a logical plan to a physical plan, and generating - executable tasks for the latter. + An interface for generating executable tasks for an underlying physical plan. """ @abstractmethod - def plan(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: + def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: pass diff --git a/daft/planner/py_planner.py b/daft/planner/py_planner.py index f659509c09..bf5321a7bb 100644 --- a/daft/planner/py_planner.py +++ b/daft/planner/py_planner.py @@ -2,12 +2,12 @@ from daft.execution import physical_plan, physical_plan_factory from daft.logical import logical_plan -from daft.planner.planner import PartitionT, QueryPlanner +from daft.planner.planner import PartitionT, PhysicalPlanScheduler -class PyQueryPlanner(QueryPlanner): +class PyPhysicalPlanScheduler(PhysicalPlanScheduler): def __init__(self, plan: logical_plan.LogicalPlan): self._plan = plan - def plan(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: + def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: return physical_plan.materialize(physical_plan_factory._get_physical_plan(self._plan, psets)) diff --git a/daft/planner/rust_planner.py b/daft/planner/rust_planner.py index cb520774c4..cd1c00fd83 100644 --- a/daft/planner/rust_planner.py +++ b/daft/planner/rust_planner.py @@ -1,13 +1,13 @@ from __future__ import annotations -from daft.daft import LogicalPlanBuilder as _LogicalPlanBuilder +from daft.daft import PhysicalPlanScheduler as _PhysicalPlanScheduler from daft.execution import physical_plan -from daft.planner.planner import PartitionT, QueryPlanner +from daft.planner.planner import PartitionT, PhysicalPlanScheduler -class RustQueryPlanner(QueryPlanner): - def __init__(self, builder: _LogicalPlanBuilder): - self._builder = builder +class RustPhysicalPlanScheduler(PhysicalPlanScheduler): + def __init__(self, scheduler: _PhysicalPlanScheduler): + self._scheduler = scheduler - def plan(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: - return physical_plan.materialize(self._builder.to_partition_tasks(psets)) + def to_partition_tasks(self, psets: dict[str, list[PartitionT]]) -> physical_plan.MaterializedPhysicalPlan: + return physical_plan.materialize(self._scheduler.to_partition_tasks(psets)) diff --git a/daft/resource_request.py b/daft/resource_request.py deleted file mode 100644 index 28740c23f8..0000000000 --- a/daft/resource_request.py +++ /dev/null @@ -1,52 +0,0 @@ -from __future__ import annotations - -import dataclasses -import functools - - -@dataclasses.dataclass(frozen=True) -class ResourceRequest: - - num_cpus: int | float | None = None - num_gpus: int | float | None = None - memory_bytes: int | float | None = None - - @staticmethod - def max_resources(resource_requests: list[ResourceRequest]) -> ResourceRequest: - """Gets the maximum of all resources in a list of ResourceRequests as a new ResourceRequest""" - return functools.reduce( - lambda acc, req: acc._max_for_each_resource(req), - resource_requests, - ResourceRequest(num_cpus=None, num_gpus=None, memory_bytes=None), - ) - - def _max_for_each_resource(self, other: ResourceRequest) -> ResourceRequest: - """Get a new ResourceRequest that consists of the maximum requests for each resource""" - resource_names = [f.name for f in dataclasses.fields(ResourceRequest)] - max_resources = {} - for name in resource_names: - if getattr(self, name) is None: - max_resources[name] = getattr(other, name) - elif getattr(other, name) is None: - max_resources[name] = getattr(self, name) - else: - max_resources[name] = max(getattr(self, name), getattr(other, name)) - return ResourceRequest(**max_resources) - - def __add__(self, other: ResourceRequest) -> ResourceRequest: - return ResourceRequest( - num_cpus=add_optional_numeric(self.num_cpus, other.num_cpus), - num_gpus=add_optional_numeric(self.num_gpus, other.num_gpus), - memory_bytes=add_optional_numeric(self.memory_bytes, other.memory_bytes), - ) - - -def add_optional_numeric(a: int | float | None, b: int | float | None) -> int | float | None: - """ - Add a and b together, treating None as 0. - If a and b are both None, then returns None (i.e. preserves None if all inputs are None). - """ - if a is None and b is None: - return None - - return (a or 0) + (b or 0) diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 5c429f3867..c2b86bffae 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -10,14 +10,13 @@ import pyarrow as pa from loguru import logger -from daft.daft import FileFormatConfig +from daft.daft import FileFormatConfig, ResourceRequest from daft.execution import physical_plan from daft.execution.execution_step import Instruction, MaterializedResult, PartitionTask from daft.filesystem import get_filesystem_from_path, glob_path_with_stats from daft.internal.gpu import cuda_device_count from daft.logical.builder import LogicalPlanBuilder from daft.logical.schema import Schema -from daft.resource_request import ResourceRequest from daft.runners import runner_io from daft.runners.partitioning import ( PartID, @@ -154,16 +153,16 @@ def run(self, builder: LogicalPlanBuilder) -> PartitionCacheEntry: def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[Table]: # Optimize the logical plan. builder = builder.optimize() - # Finalize the logical plan and get a query planner for translating the - # logical plan to executable tasks. - planner = builder.to_planner() + # Finalize the logical plan and get a physical plan scheduler for translating the + # physical plan to executable tasks. + plan_scheduler = builder.to_physical_plan_scheduler() psets = { key: entry.value.values() for key, entry in self._part_set_cache._uuid_to_partition_set.items() if entry.value is not None } # Get executable tasks from planner. - tasks = planner.plan(psets) + tasks = plan_scheduler.to_partition_tasks(psets) with profiler("profile_PyRunner.run_{datetime.now().isoformat()}.json"): partitions_gen = self._physical_plan_to_partitions(tasks) diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index ac557c474e..cfd552c53b 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -13,7 +13,7 @@ from loguru import logger from daft.logical.builder import LogicalPlanBuilder -from daft.planner.planner import QueryPlanner +from daft.planner import PhysicalPlanScheduler try: import ray @@ -23,7 +23,7 @@ ) raise -from daft.daft import FileFormatConfig +from daft.daft import FileFormatConfig, ResourceRequest from daft.datatype import DataType from daft.execution.execution_step import ( FanoutInstruction, @@ -35,7 +35,6 @@ SingleOutputPartitionTask, ) from daft.filesystem import get_filesystem_from_path, glob_path_with_stats -from daft.resource_request import ResourceRequest from daft.runners import runner_io from daft.runners.partitioning import ( PartID, @@ -417,7 +416,7 @@ def next(self, result_uuid: str) -> ray.ObjectRef | StopIteration: def run_plan( self, - planner: QueryPlanner, + plan_scheduler: PhysicalPlanScheduler, psets: dict[str, ray.ObjectRef], result_uuid: str, ) -> None: @@ -425,7 +424,7 @@ def run_plan( target=self._run_plan, name=result_uuid, kwargs={ - "planner": planner, + "plan_scheduler": plan_scheduler, "psets": psets, "result_uuid": result_uuid, }, @@ -435,14 +434,14 @@ def run_plan( def _run_plan( self, - planner: QueryPlanner, + plan_scheduler: PhysicalPlanScheduler, psets: dict[str, ray.ObjectRef], result_uuid: str, ) -> None: from loguru import logger - # Get executable tasks from planner. - tasks = planner.plan(psets) + # Get executable tasks from plan scheduler. + tasks = plan_scheduler.to_partition_tasks(psets) # Note: For autoscaling clusters, we will probably want to query cores dynamically. # Keep in mind this call takes about 0.3ms. @@ -618,9 +617,9 @@ def __init__( def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[ray.ObjectRef]: # Optimize the logical plan. builder = builder.optimize() - # Finalize the logical plan and get a query planner for translating the - # logical plan to executable tasks. - planner = builder.to_planner() + # Finalize the logical plan and get a physical plan scheduler for translating the + # physical plan to executable tasks. + plan_scheduler = builder.to_physical_plan_scheduler() psets = { key: entry.value.values() @@ -631,7 +630,7 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[ray.ObjectRef]: if isinstance(self.ray_context, ray.client_builder.ClientContext): ray.get( self.scheduler_actor.run_plan.remote( - planner=planner, + plan_scheduler=plan_scheduler, psets=psets, result_uuid=result_uuid, ) @@ -639,7 +638,7 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[ray.ObjectRef]: else: self.scheduler.run_plan( - planner=planner, + plan_scheduler=plan_scheduler, psets=psets, result_uuid=result_uuid, ) diff --git a/daft/runners/runner.py b/daft/runners/runner.py index afdf9ba5da..24128a9840 100644 --- a/daft/runners/runner.py +++ b/daft/runners/runner.py @@ -4,7 +4,6 @@ from typing import Generic, Iterator, TypeVar from daft.logical.builder import LogicalPlanBuilder -from daft.logical.logical_plan import LogicalPlan from daft.runners.partitioning import ( PartitionCacheEntry, PartitionSet, @@ -43,6 +42,3 @@ def run_iter(self, builder: LogicalPlanBuilder) -> Iterator[PartitionT]: def run_iter_tables(self, builder: LogicalPlanBuilder) -> Iterator[Table]: """Similar to run_iter(), but always dereference and yield Table objects.""" ... - - def optimize(self, plan: LogicalPlan) -> LogicalPlan: - return plan diff --git a/docs/source/learn/user_guides/udf.rst b/docs/source/learn/user_guides/udf.rst index de7de0898c..16b9961cdf 100644 --- a/docs/source/learn/user_guides/udf.rst +++ b/docs/source/learn/user_guides/udf.rst @@ -186,7 +186,7 @@ Custom resources can be requested when you call :meth:`df.with_column() Self { + Self::empty() + } +} + impl Display for Schema { // Produces an ASCII table. fn fmt(&self, f: &mut Formatter) -> Result { diff --git a/src/daft-plan/src/builder.rs b/src/daft-plan/src/builder.rs index 9e756ad3a2..06b6a6c14b 100644 --- a/src/daft-plan/src/builder.rs +++ b/src/daft-plan/src/builder.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use crate::{logical_plan::LogicalPlan, JoinType}; +use crate::{logical_plan::LogicalPlan, ResourceRequest}; #[cfg(feature = "python")] use { @@ -12,12 +12,11 @@ use { ExternalInfo as ExternalSourceInfo, FileInfo as InputFileInfo, InMemoryInfo, PyFileFormatConfig, SourceInfo, }, - FileFormat, PartitionScheme, PartitionSpec, + FileFormat, JoinType, PartitionScheme, PartitionSpec, PhysicalPlanScheduler, }, daft_core::{datatypes::Field, python::schema::PySchema, schema::Schema, DataType}, daft_dsl::{python::PyExpr, Expr}, pyo3::{exceptions::PyValueError, prelude::*}, - std::collections::HashMap, }; #[cfg_attr(feature = "python", pyclass)] @@ -83,6 +82,7 @@ impl LogicalPlanBuilder { &self, projection: Vec, projected_schema: &PySchema, + resource_request: ResourceRequest, ) -> PyResult { let projection_exprs = projection .iter() @@ -91,6 +91,7 @@ impl LogicalPlanBuilder { let logical_plan: LogicalPlan = ops::Project::new( projection_exprs, projected_schema.clone().into(), + resource_request, self.plan.clone(), ) .into(); @@ -277,9 +278,9 @@ impl LogicalPlanBuilder { Ok(self.plan.partition_spec().as_ref().clone()) } - pub fn to_partition_tasks(&self, psets: HashMap>) -> PyResult { + pub fn to_physical_plan_scheduler(&self) -> PyResult { let physical_plan = plan(self.plan.as_ref())?; - Python::with_gil(|py| physical_plan.to_partition_tasks(py, &psets)) + Ok(Arc::new(physical_plan).into()) } pub fn repr_ascii(&self) -> PyResult { diff --git a/src/daft-plan/src/lib.rs b/src/daft-plan/src/lib.rs index aa1bbc1603..6126fb5f00 100644 --- a/src/daft-plan/src/lib.rs +++ b/src/daft-plan/src/lib.rs @@ -7,6 +7,7 @@ mod partitioning; mod physical_ops; mod physical_plan; mod planner; +mod resource_request; mod sink_info; mod source_info; @@ -14,6 +15,8 @@ pub use builder::LogicalPlanBuilder; pub use join::JoinType; pub use logical_plan::LogicalPlan; pub use partitioning::{PartitionScheme, PartitionSpec}; +pub use physical_plan::PhysicalPlanScheduler; +pub use resource_request::ResourceRequest; pub use source_info::{ CsvSourceConfig, FileFormat, JsonSourceConfig, ParquetSourceConfig, PyFileFormatConfig, }; @@ -32,6 +35,8 @@ pub fn register_modules(_py: Python, parent: &PyModule) -> PyResult<()> { parent.add_class::()?; parent.add_class::()?; parent.add_class::()?; + parent.add_class::()?; + parent.add_class::()?; Ok(()) } diff --git a/src/daft-plan/src/logical_plan.rs b/src/daft-plan/src/logical_plan.rs index ff2ebddfb2..3b18ba59d0 100644 --- a/src/daft-plan/src/logical_plan.rs +++ b/src/daft-plan/src/logical_plan.rs @@ -84,22 +84,25 @@ impl LogicalPlan { right, left_on, .. - }) => match max( - input.partition_spec().num_partitions, - right.partition_spec().num_partitions, - ) { - // NOTE: This duplicates the repartitioning logic in the planner, where we - // conditionally repartition the left and right tables. - // TODO(Clark): Consolidate this logic with the planner logic when we push the partition spec - // to be an entirely planner-side concept. - 1 => input.partition_spec(), - num_partitions => PartitionSpec::new_internal( - PartitionScheme::Hash, - num_partitions, - Some(left_on.clone()), - ) - .into(), - }, + }) => { + let input_partition_spec = input.partition_spec(); + match max( + input_partition_spec.num_partitions, + right.partition_spec().num_partitions, + ) { + // NOTE: This duplicates the repartitioning logic in the planner, where we + // conditionally repartition the left and right tables. + // TODO(Clark): Consolidate this logic with the planner logic when we push the partition spec + // to be an entirely planner-side concept. + 1 => input_partition_spec, + num_partitions => PartitionSpec::new_internal( + PartitionScheme::Hash, + num_partitions, + Some(left_on.clone()), + ) + .into(), + } + } Self::Sink(Sink { input, .. }) => input.partition_spec(), } } diff --git a/src/daft-plan/src/ops/project.rs b/src/daft-plan/src/ops/project.rs index eab9306bea..fbd74c7d65 100644 --- a/src/daft-plan/src/ops/project.rs +++ b/src/daft-plan/src/ops/project.rs @@ -3,12 +3,13 @@ use std::sync::Arc; use daft_core::schema::SchemaRef; use daft_dsl::Expr; -use crate::LogicalPlan; +use crate::{LogicalPlan, ResourceRequest}; #[derive(Clone, Debug)] pub struct Project { pub projection: Vec, pub projected_schema: SchemaRef, + pub resource_request: ResourceRequest, // Upstream node. pub input: Arc, } @@ -17,11 +18,13 @@ impl Project { pub(crate) fn new( projection: Vec, projected_schema: SchemaRef, + resource_request: ResourceRequest, input: Arc, ) -> Self { Self { projection, projected_schema, + resource_request, input, } } diff --git a/src/daft-plan/src/partitioning.rs b/src/daft-plan/src/partitioning.rs index d52a449653..def141d74c 100644 --- a/src/daft-plan/src/partitioning.rs +++ b/src/daft-plan/src/partitioning.rs @@ -63,6 +63,16 @@ impl PartitionSpec { } } +impl Default for PartitionSpec { + fn default() -> Self { + Self { + scheme: PartitionScheme::Unknown, + num_partitions: 1, + by: None, + } + } +} + #[cfg(feature = "python")] #[pymethods] impl PartitionSpec { diff --git a/src/daft-plan/src/physical_ops/agg.rs b/src/daft-plan/src/physical_ops/agg.rs index 7ab5c5ac7f..4d948ab489 100644 --- a/src/daft-plan/src/physical_ops/agg.rs +++ b/src/daft-plan/src/physical_ops/agg.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use daft_dsl::{AggExpr, Expr}; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Aggregate { /// Aggregations to apply. pub aggregations: Vec, diff --git a/src/daft-plan/src/physical_ops/coalesce.rs b/src/daft-plan/src/physical_ops/coalesce.rs index b9ff95932e..e5b70a3ca7 100644 --- a/src/daft-plan/src/physical_ops/coalesce.rs +++ b/src/daft-plan/src/physical_ops/coalesce.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Coalesce { // Upstream node. pub input: Arc, diff --git a/src/daft-plan/src/physical_ops/concat.rs b/src/daft-plan/src/physical_ops/concat.rs index 679a2bb599..6372f2d277 100644 --- a/src/daft-plan/src/physical_ops/concat.rs +++ b/src/daft-plan/src/physical_ops/concat.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Concat { pub other: Arc, // Upstream node. diff --git a/src/daft-plan/src/physical_ops/csv.rs b/src/daft-plan/src/physical_ops/csv.rs index cb33f06843..12fd42e4b4 100644 --- a/src/daft-plan/src/physical_ops/csv.rs +++ b/src/daft-plan/src/physical_ops/csv.rs @@ -7,8 +7,9 @@ use crate::{ physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::ExternalInfo, PartitionSpec, }; +use serde::{Deserialize, Serialize}; -#[derive(Debug)] +#[derive(Debug, Deserialize, Serialize)] pub struct TabularScanCsv { pub schema: SchemaRef, pub external_info: ExternalInfo, @@ -35,7 +36,7 @@ impl TabularScanCsv { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct TabularWriteCsv { pub schema: SchemaRef, pub file_info: OutputFileInfo, diff --git a/src/daft-plan/src/physical_ops/explode.rs b/src/daft-plan/src/physical_ops/explode.rs index 5e75bc5559..7d8ad5c6c0 100644 --- a/src/daft-plan/src/physical_ops/explode.rs +++ b/src/daft-plan/src/physical_ops/explode.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use daft_dsl::Expr; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Explode { pub explode_exprs: Vec, // Upstream node. diff --git a/src/daft-plan/src/physical_ops/fanout.rs b/src/daft-plan/src/physical_ops/fanout.rs index 5f0e43f587..b9b5c213a7 100644 --- a/src/daft-plan/src/physical_ops/fanout.rs +++ b/src/daft-plan/src/physical_ops/fanout.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use daft_dsl::Expr; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct FanoutRandom { pub num_partitions: usize, // Upstream node. @@ -20,7 +21,7 @@ impl FanoutRandom { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct FanoutByHash { pub num_partitions: usize, pub partition_by: Vec, @@ -42,7 +43,7 @@ impl FanoutByHash { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct FanoutByRange { pub num_partitions: usize, pub sort_by: Vec, diff --git a/src/daft-plan/src/physical_ops/filter.rs b/src/daft-plan/src/physical_ops/filter.rs index 4273c8b912..6102bcb700 100644 --- a/src/daft-plan/src/physical_ops/filter.rs +++ b/src/daft-plan/src/physical_ops/filter.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use daft_dsl::Expr; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Filter { // The Boolean expression to filter on. pub predicate: Expr, diff --git a/src/daft-plan/src/physical_ops/flatten.rs b/src/daft-plan/src/physical_ops/flatten.rs index 0e228c0772..c46b6f887f 100644 --- a/src/daft-plan/src/physical_ops/flatten.rs +++ b/src/daft-plan/src/physical_ops/flatten.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Flatten { // Upstream node. pub input: Arc, diff --git a/src/daft-plan/src/physical_ops/in_memory.rs b/src/daft-plan/src/physical_ops/in_memory.rs index a0e3ac74da..95387bb46f 100644 --- a/src/daft-plan/src/physical_ops/in_memory.rs +++ b/src/daft-plan/src/physical_ops/in_memory.rs @@ -1,8 +1,9 @@ use crate::{source_info::InMemoryInfo, PartitionSpec}; use daft_core::schema::SchemaRef; +use serde::{Deserialize, Serialize}; use std::sync::Arc; -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct InMemoryScan { pub schema: SchemaRef, pub in_memory_info: InMemoryInfo, diff --git a/src/daft-plan/src/physical_ops/join.rs b/src/daft-plan/src/physical_ops/join.rs index 2e4896244e..aaa36a8d1c 100644 --- a/src/daft-plan/src/physical_ops/join.rs +++ b/src/daft-plan/src/physical_ops/join.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use daft_dsl::Expr; use crate::{physical_plan::PhysicalPlan, JoinType}; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Join { pub right: Arc, pub left_on: Vec, diff --git a/src/daft-plan/src/physical_ops/json.rs b/src/daft-plan/src/physical_ops/json.rs index e9de300bf9..a042770361 100644 --- a/src/daft-plan/src/physical_ops/json.rs +++ b/src/daft-plan/src/physical_ops/json.rs @@ -7,8 +7,9 @@ use crate::{ physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::ExternalInfo, PartitionSpec, }; +use serde::{Deserialize, Serialize}; -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct TabularScanJson { pub schema: SchemaRef, pub external_info: ExternalInfo, @@ -35,7 +36,7 @@ impl TabularScanJson { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct TabularWriteJson { pub schema: SchemaRef, pub file_info: OutputFileInfo, diff --git a/src/daft-plan/src/physical_ops/limit.rs b/src/daft-plan/src/physical_ops/limit.rs index 1a0972412e..c50f1d3734 100644 --- a/src/daft-plan/src/physical_ops/limit.rs +++ b/src/daft-plan/src/physical_ops/limit.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Limit { pub limit: i64, pub num_partitions: usize, diff --git a/src/daft-plan/src/physical_ops/parquet.rs b/src/daft-plan/src/physical_ops/parquet.rs index 6e713ab0ad..1a42a7fbc5 100644 --- a/src/daft-plan/src/physical_ops/parquet.rs +++ b/src/daft-plan/src/physical_ops/parquet.rs @@ -7,8 +7,9 @@ use crate::{ physical_plan::PhysicalPlan, sink_info::OutputFileInfo, source_info::ExternalInfo as ExternalSourceInfo, PartitionSpec, }; +use serde::{Deserialize, Serialize}; -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct TabularScanParquet { pub schema: SchemaRef, pub external_info: ExternalSourceInfo, @@ -35,7 +36,7 @@ impl TabularScanParquet { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct TabularWriteParquet { pub schema: SchemaRef, pub file_info: OutputFileInfo, diff --git a/src/daft-plan/src/physical_ops/project.rs b/src/daft-plan/src/physical_ops/project.rs index d488139472..148da57e63 100644 --- a/src/daft-plan/src/physical_ops/project.rs +++ b/src/daft-plan/src/physical_ops/project.rs @@ -2,17 +2,27 @@ use std::sync::Arc; use daft_dsl::Expr; -use crate::physical_plan::PhysicalPlan; +use crate::{physical_plan::PhysicalPlan, ResourceRequest}; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Project { pub projection: Vec, + pub resource_request: ResourceRequest, // Upstream node. pub input: Arc, } impl Project { - pub(crate) fn new(projection: Vec, input: Arc) -> Self { - Self { projection, input } + pub(crate) fn new( + projection: Vec, + resource_request: ResourceRequest, + input: Arc, + ) -> Self { + Self { + projection, + resource_request, + input, + } } } diff --git a/src/daft-plan/src/physical_ops/reduce.rs b/src/daft-plan/src/physical_ops/reduce.rs index 61746a023b..0fdcade742 100644 --- a/src/daft-plan/src/physical_ops/reduce.rs +++ b/src/daft-plan/src/physical_ops/reduce.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct ReduceMerge { // Upstream node. pub input: Arc, diff --git a/src/daft-plan/src/physical_ops/sort.rs b/src/daft-plan/src/physical_ops/sort.rs index bc6a02fc41..cb65926240 100644 --- a/src/daft-plan/src/physical_ops/sort.rs +++ b/src/daft-plan/src/physical_ops/sort.rs @@ -3,8 +3,9 @@ use std::sync::Arc; use daft_dsl::Expr; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Sort { pub sort_by: Vec, pub descending: Vec, diff --git a/src/daft-plan/src/physical_ops/split.rs b/src/daft-plan/src/physical_ops/split.rs index f8dc3ab6bf..d2e5e78643 100644 --- a/src/daft-plan/src/physical_ops/split.rs +++ b/src/daft-plan/src/physical_ops/split.rs @@ -1,8 +1,9 @@ use std::sync::Arc; use crate::physical_plan::PhysicalPlan; +use serde::{Deserialize, Serialize}; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Split { pub input_num_partitions: usize, pub output_num_partitions: usize, diff --git a/src/daft-plan/src/physical_plan.rs b/src/daft-plan/src/physical_plan.rs index 2310d8bd34..efbfb603ac 100644 --- a/src/daft-plan/src/physical_plan.rs +++ b/src/daft-plan/src/physical_plan.rs @@ -11,14 +11,22 @@ use { daft_dsl::python::PyExpr, daft_dsl::Expr, daft_table::python::PyTable, - pyo3::{pyclass, pymethods, PyObject, PyRef, PyRefMut, PyResult, Python}, + pyo3::{ + exceptions::PyValueError, + pyclass, pymethods, + types::{PyBytes, PyTuple}, + PyObject, PyRef, PyRefMut, PyResult, Python, + }, std::collections::HashMap, - std::sync::Arc, }; +use daft_core::impl_bincode_py_state_serialization; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + use crate::physical_ops::*; -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub enum PhysicalPlan { #[cfg(feature = "python")] InMemoryScan(InMemoryScan), @@ -46,6 +54,46 @@ pub enum PhysicalPlan { TabularWriteCsv(TabularWriteCsv), } +#[cfg_attr(feature = "python", pyclass)] +#[derive(Debug, Serialize, Deserialize)] +pub struct PhysicalPlanScheduler { + plan: Arc, +} + +#[cfg(feature = "python")] +#[pymethods] +impl PhysicalPlanScheduler { + #[new] + #[pyo3(signature = (*args))] + pub fn new(args: &PyTuple) -> PyResult { + match args.len() { + // Create dummy inner PhysicalPlan, to be overridden by __setstate__. + 0 => Ok(Arc::new(PhysicalPlan::InMemoryScan(InMemoryScan::new( + Default::default(), + InMemoryInfo::new("".to_string(), args.py().None()), + Default::default(), + ))) + .into()), + _ => Err(PyValueError::new_err(format!( + "expected no arguments to make new PhysicalPlanScheduler, got : {}", + args.len() + ))), + } + } + + pub fn to_partition_tasks(&self, psets: HashMap>) -> PyResult { + Python::with_gil(|py| self.plan.to_partition_tasks(py, &psets)) + } +} + +impl_bincode_py_state_serialization!(PhysicalPlanScheduler); + +impl From> for PhysicalPlanScheduler { + fn from(plan: Arc) -> Self { + Self { plan } + } +} + #[cfg(feature = "python")] #[pyclass] struct PartitionIterator { @@ -171,7 +219,11 @@ impl PhysicalPlan { limit, .. }) => tabular_scan(py, schema, file_info, file_format_config, limit), - PhysicalPlan::Project(Project { input, projection }) => { + PhysicalPlan::Project(Project { + input, + projection, + resource_request, + }) => { let upstream_iter = input.to_partition_tasks(py, psets)?; let projection_pyexprs: Vec = projection .iter() @@ -180,7 +232,7 @@ impl PhysicalPlan { let py_iter = py .import(pyo3::intern!(py, "daft.execution.rust_physical_plan_shim"))? .getattr(pyo3::intern!(py, "project"))? - .call1((upstream_iter, projection_pyexprs))?; + .call1((upstream_iter, projection_pyexprs, resource_request.clone()))?; Ok(py_iter.into()) } PhysicalPlan::Filter(Filter { input, predicate }) => { diff --git a/src/daft-plan/src/planner.rs b/src/daft-plan/src/planner.rs index bb2bb10d8c..854310ff7b 100644 --- a/src/daft-plan/src/planner.rs +++ b/src/daft-plan/src/planner.rs @@ -66,11 +66,15 @@ pub fn plan(logical_plan: &LogicalPlan) -> DaftResult { )), }, LogicalPlan::Project(LogicalProject { - input, projection, .. + input, + projection, + resource_request, + .. }) => { let input_physical = plan(input)?; Ok(PhysicalPlan::Project(Project::new( projection.clone(), + resource_request.clone(), input_physical.into(), ))) } diff --git a/src/daft-plan/src/resource_request.rs b/src/daft-plan/src/resource_request.rs new file mode 100644 index 0000000000..41aa5f9d6f --- /dev/null +++ b/src/daft-plan/src/resource_request.rs @@ -0,0 +1,95 @@ +use daft_core::impl_bincode_py_state_serialization; +#[cfg(feature = "python")] +use { + pyo3::{pyclass, pyclass::CompareOp, pymethods, types::PyBytes, PyResult, Python}, + std::{cmp::max, ops::Add}, +}; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] +#[cfg_attr(feature = "python", pyclass(module = "daft.daft"))] +pub struct ResourceRequest { + pub num_cpus: Option, + pub num_gpus: Option, + pub memory_bytes: Option, +} + +impl ResourceRequest { + pub fn new_internal( + num_cpus: Option, + num_gpus: Option, + memory_bytes: Option, + ) -> Self { + Self { + num_cpus, + num_gpus, + memory_bytes, + } + } +} + +fn lift T>(f: F, left: Option, right: Option) -> Option { + match (left, right) { + (None, right) => right, + (left, None) => left, + (Some(left), Some(right)) => Some(f(left, right)), + } +} + +fn float_max(left: f64, right: f64) -> f64 { + left.max(right) +} + +#[cfg(feature = "python")] +#[pymethods] +impl ResourceRequest { + #[new] + #[pyo3(signature = (num_cpus=None, num_gpus=None, memory_bytes=None))] + pub fn new(num_cpus: Option, num_gpus: Option, memory_bytes: Option) -> Self { + Self::new_internal(num_cpus, num_gpus, memory_bytes) + } + + #[staticmethod] + pub fn max_resources(resource_requests: Vec) -> Self { + resource_requests.iter().fold(Default::default(), |acc, e| { + let max_num_cpus = lift(float_max, acc.num_cpus, e.num_cpus); + let max_num_gpus = lift(float_max, acc.num_gpus, e.num_gpus); + let max_memory_bytes = lift(max, acc.memory_bytes, e.memory_bytes); + Self::new_internal(max_num_cpus, max_num_gpus, max_memory_bytes) + }) + } + + #[getter] + pub fn get_num_cpus(&self) -> PyResult> { + Ok(self.num_cpus) + } + + #[getter] + pub fn get_num_gpus(&self) -> PyResult> { + Ok(self.num_gpus) + } + + #[getter] + pub fn get_memory_bytes(&self) -> PyResult> { + Ok(self.memory_bytes) + } + + fn __add__(&self, other: &Self) -> Self { + Self::new_internal( + lift(Add::add, self.num_cpus, other.num_cpus), + lift(Add::add, self.num_gpus, other.num_gpus), + lift(Add::add, self.memory_bytes, other.memory_bytes), + ) + } + + fn __richcmp__(&self, other: &Self, op: CompareOp) -> bool { + match op { + CompareOp::Eq => self == other, + CompareOp::Ne => !self.__richcmp__(other, CompareOp::Eq), + _ => unimplemented!("not implemented"), + } + } +} + +impl_bincode_py_state_serialization!(ResourceRequest); diff --git a/src/daft-plan/src/sink_info.rs b/src/daft-plan/src/sink_info.rs index 94e0315b64..733736adb9 100644 --- a/src/daft-plan/src/sink_info.rs +++ b/src/daft-plan/src/sink_info.rs @@ -1,13 +1,14 @@ use daft_dsl::Expr; use crate::FileFormat; +use serde::{Deserialize, Serialize}; #[derive(Debug)] pub enum SinkInfo { OutputFileInfo(OutputFileInfo), } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct OutputFileInfo { pub root_dir: String, pub file_format: FileFormat, diff --git a/src/daft-plan/src/source_info.rs b/src/daft-plan/src/source_info.rs index 6b714edb23..9150a08186 100644 --- a/src/daft-plan/src/source_info.rs +++ b/src/daft-plan/src/source_info.rs @@ -19,12 +19,13 @@ use { pyclass::CompareOp, pymethods, types::{PyBytes, PyTuple}, - IntoPy, PyObject, PyResult, Python, + IntoPy, PyObject, PyResult, Python, ToPyObject, }, }; -use serde::Deserialize; -use serde::Serialize; +use serde::de::{Error as DeError, Visitor}; +use serde::{ser::Error as SerError, Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt; #[derive(Debug)] pub enum SourceInfo { @@ -34,9 +35,74 @@ pub enum SourceInfo { } #[cfg(feature = "python")] -#[derive(Debug, Clone)] +fn serialize_py_object(obj: &PyObject, s: S) -> Result +where + S: Serializer, +{ + let bytes = Python::with_gil(|py| { + py.import(pyo3::intern!(py, "ray.cloudpickle")) + .or_else(|_| py.import(pyo3::intern!(py, "pickle"))) + .and_then(|m| m.getattr(pyo3::intern!(py, "dumps"))) + .and_then(|f| f.call1((obj,))) + .and_then(|b| b.extract::>()) + .map_err(|e| SerError::custom(e.to_string())) + })?; + s.serialize_bytes(bytes.as_slice()) +} + +struct PyObjectVisitor; + +#[cfg(feature = "python")] +impl<'de> Visitor<'de> for PyObjectVisitor { + type Value = PyObject; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a byte array containing the pickled partition bytes") + } + + fn visit_bytes(self, v: &[u8]) -> Result + where + E: DeError, + { + Python::with_gil(|py| { + py.import(pyo3::intern!(py, "ray.cloudpickle")) + .or_else(|_| py.import(pyo3::intern!(py, "pickle"))) + .and_then(|m| m.getattr(pyo3::intern!(py, "loads"))) + .and_then(|f| Ok(f.call1((v,))?.to_object(py))) + .map_err(|e| DeError::custom(e.to_string())) + }) + } + + fn visit_byte_buf(self, v: Vec) -> Result + where + E: DeError, + { + Python::with_gil(|py| { + py.import(pyo3::intern!(py, "ray.cloudpickle")) + .or_else(|_| py.import(pyo3::intern!(py, "pickle"))) + .and_then(|m| m.getattr(pyo3::intern!(py, "loads"))) + .and_then(|f| Ok(f.call1((v,))?.to_object(py))) + .map_err(|e| DeError::custom(e.to_string())) + }) + } +} + +#[cfg(feature = "python")] +fn deserialize_py_object<'de, D>(d: D) -> Result +where + D: Deserializer<'de>, +{ + d.deserialize_bytes(PyObjectVisitor) +} + +#[cfg(feature = "python")] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct InMemoryInfo { pub cache_key: String, + #[serde( + serialize_with = "serialize_py_object", + deserialize_with = "deserialize_py_object" + )] pub cache_entry: PyObject, } @@ -50,7 +116,7 @@ impl InMemoryInfo { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct ExternalInfo { pub schema: SchemaRef, pub file_info: Arc, @@ -71,7 +137,7 @@ impl ExternalInfo { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct FileInfo { pub file_paths: Vec, pub file_sizes: Option>, diff --git a/tests/conftest.py b/tests/conftest.py index 1883aefc1f..5e73f7e0eb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -47,8 +47,6 @@ def uuid_ext_type() -> UuidType: @pytest.fixture(params=[False, True]) def use_new_planner(request) -> DaftContext: old_ctx = get_context() - if request.param and old_ctx.is_ray_runner: - pytest.skip() yield set_new_planner() if request.param else set_old_planner() _set_context(old_ctx) diff --git a/tests/optimizer/test_fold_projections.py b/tests/optimizer/test_fold_projections.py index a9c40ac4b0..4a55f809c6 100644 --- a/tests/optimizer/test_fold_projections.py +++ b/tests/optimizer/test_fold_projections.py @@ -4,10 +4,10 @@ import daft from daft import col +from daft.daft import ResourceRequest from daft.internal.rule_runner import Once, RuleBatch, RuleRunner from daft.logical.logical_plan import LogicalPlan from daft.logical.optimizer import FoldProjections -from daft.resource_request import ResourceRequest from tests.optimizer.conftest import assert_plan_eq diff --git a/tests_legacy/test_resource_requests.py b/tests/test_resource_requests.py similarity index 81% rename from tests_legacy/test_resource_requests.py rename to tests/test_resource_requests.py index f6a95bcb87..80311a4d66 100644 --- a/tests_legacy/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -8,7 +8,7 @@ import ray import daft -from daft import resource_request, udf +from daft import ResourceRequest, udf from daft.context import get_context from daft.expressions import col from daft.internal.gpu import cuda_device_count @@ -21,7 +21,7 @@ def no_gpu_available() -> bool: DATA = {"id": [i for i in range(100)]} -@udf(return_dtype=int, input_columns={"c": list}) +@udf(return_dtype=daft.DataType.int64()) def my_udf(c): return [1] * len(c) @@ -35,13 +35,13 @@ def my_udf(c): @pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") -def test_requesting_too_many_cpus(): +def test_requesting_too_many_cpus(use_new_planner): df = daft.from_pydict(DATA) df = df.with_column( "foo", my_udf(col("id")), - resource_request=resource_request.ResourceRequest(num_cpus=multiprocessing.cpu_count() + 1), + resource_request=ResourceRequest(num_cpus=multiprocessing.cpu_count() + 1), ) with pytest.raises(RuntimeError): @@ -49,11 +49,9 @@ def test_requesting_too_many_cpus(): @pytest.mark.skipif(get_context().runner_config.name not in {"py"}, reason="requires PyRunner to be in use") -def test_requesting_too_many_gpus(): +def test_requesting_too_many_gpus(use_new_planner): df = daft.from_pydict(DATA) - df = df.with_column( - "foo", my_udf(col("id")), resource_request=resource_request.ResourceRequest(num_gpus=cuda_device_count() + 1) - ) + df = df.with_column("foo", my_udf(col("id")), resource_request=ResourceRequest(num_gpus=cuda_device_count() + 1)) with pytest.raises(RuntimeError): df.collect() @@ -66,7 +64,7 @@ def test_requesting_too_much_memory(): df = df.with_column( "foo", my_udf(col("id")), - resource_request=resource_request.ResourceRequest(memory_bytes=psutil.virtual_memory().total + 1), + resource_request=ResourceRequest(memory_bytes=psutil.virtual_memory().total + 1), ) with pytest.raises(RuntimeError): @@ -78,7 +76,7 @@ def test_requesting_too_much_memory(): ### -@udf(return_dtype=int, input_columns={"c": list}) +@udf(return_dtype=daft.DataType.int64()) def assert_resources(c, num_cpus=None, num_gpus=None, memory=None): assigned_resources = ray.get_runtime_context().get_assigned_resources() @@ -99,13 +97,13 @@ def assert_resources(c, num_cpus=None, num_gpus=None, memory=None): RAY_VERSION_LT_2, reason="The ray.get_runtime_context().get_assigned_resources() was only added in Ray >= 2.0" ) @pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use") -def test_with_column_rayrunner(): +def test_with_column_rayrunner(use_new_planner): df = daft.from_pydict(DATA).repartition(2) df = df.with_column( "resources_ok", assert_resources(col("id"), num_cpus=1, num_gpus=None, memory=1_000_000), - resource_request=resource_request.ResourceRequest(num_cpus=1, memory_bytes=1_000_000, num_gpus=None), + resource_request=ResourceRequest(num_cpus=1, memory_bytes=1_000_000, num_gpus=None), ) df.collect() @@ -115,7 +113,7 @@ def test_with_column_rayrunner(): RAY_VERSION_LT_2, reason="The ray.get_runtime_context().get_assigned_resources() was only added in Ray >= 2.0" ) @pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use") -def test_with_column_folded_rayrunner(): +def test_with_column_folded_rayrunner(use_new_planner): df = daft.from_pydict(DATA).repartition(2) # Because of Projection Folding optimizations, the expected resource request is the max of the three .with_column requests @@ -127,12 +125,12 @@ def test_with_column_folded_rayrunner(): df = df.with_column( "more_memory_request", assert_resources(col("id"), **expected), - resource_request=resource_request.ResourceRequest(num_cpus=1, memory_bytes=5_000_000, num_gpus=None), + resource_request=ResourceRequest(num_cpus=1, memory_bytes=5_000_000, num_gpus=None), ) df = df.with_column( "more_cpu_request", assert_resources(col("id"), **expected), - resource_request=resource_request.ResourceRequest(num_cpus=1, memory_bytes=None, num_gpus=None), + resource_request=ResourceRequest(num_cpus=1, memory_bytes=None, num_gpus=None), ) df.collect() @@ -142,7 +140,7 @@ def test_with_column_folded_rayrunner(): ### -@udf(return_dtype=int, input_columns={"c": list}) +@udf(return_dtype=daft.DataType.int64()) def assert_num_cuda_visible_devices(c, num_gpus: int = 0): cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") # Env var not set: program is free to use any number of GPUs @@ -167,7 +165,7 @@ def test_with_column_pyrunner_gpu(): df = df.with_column( "foo", assert_num_cuda_visible_devices(col("id"), num_gpus=cuda_device_count()), - resource_request=resource_request.ResourceRequest(num_gpus=1), + resource_request=ResourceRequest(num_gpus=1), ) df.collect() @@ -182,7 +180,7 @@ def test_with_column_rayrunner_gpu(num_gpus): df = df.with_column( "num_cuda_visible_devices", assert_num_cuda_visible_devices(col("id"), num_gpus=num_gpus if num_gpus is not None else 0), - resource_request=resource_request.ResourceRequest(num_gpus=num_gpus), + resource_request=ResourceRequest(num_gpus=num_gpus), ) df.collect() @@ -197,12 +195,12 @@ def test_with_column_max_resources_rayrunner_gpu(): df = df.with_column( "0_gpu_col", assert_num_cuda_visible_devices(col("id"), num_gpus=1), - resource_request=resource_request.ResourceRequest(num_gpus=0), + resource_request=ResourceRequest(num_gpus=0), ) df = df.with_column( "1_gpu_col", assert_num_cuda_visible_devices(col("id"), num_gpus=1), - resource_request=resource_request.ResourceRequest(num_gpus=1), + resource_request=ResourceRequest(num_gpus=1), ) df.collect() diff --git a/tests_legacy/__init__.py b/tests_legacy/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/tutorials/text_to_image/text_to_image_generation.ipynb b/tutorials/text_to_image/text_to_image_generation.ipynb index 49f8b3306d..160afd320c 100644 --- a/tutorials/text_to_image/text_to_image_generation.ipynb +++ b/tutorials/text_to_image/text_to_image_generation.ipynb @@ -237,7 +237,7 @@ "import torch\n", "from min_dalle import MinDalle\n", "\n", - "from daft.resource_request import ResourceRequest\n", + "from daft import ResourceRequest\n", "\n", "\n", "@daft.udf(return_dtype=daft.DataType.python())\n",