diff --git a/daft/execution/physical_plan.py b/daft/execution/physical_plan.py index 91be85e02a..03c88b4779 100644 --- a/daft/execution/physical_plan.py +++ b/daft/execution/physical_plan.py @@ -241,6 +241,7 @@ def actor_pool_project( with get_context().runner().actor_pool_context( actor_pool_name, actor_resource_request, + task_resource_request, num_actors, projection, ) as actor_pool_id: diff --git a/daft/runners/pyrunner.py b/daft/runners/pyrunner.py index 31c56c3ad4..d861e2e060 100644 --- a/daft/runners/pyrunner.py +++ b/daft/runners/pyrunner.py @@ -337,20 +337,27 @@ def run_iter_tables( @contextlib.contextmanager def actor_pool_context( - self, name: str, resource_request: ResourceRequest, num_actors: int, projection: ExpressionsProjection + self, + name: str, + actor_resource_request: ResourceRequest, + _task_resource_request: ResourceRequest, + num_actors: int, + projection: ExpressionsProjection, ) -> Iterator[str]: actor_pool_id = f"py_actor_pool-{name}" - total_resource_request = resource_request * num_actors + total_resource_request = actor_resource_request * num_actors admitted = self._attempt_admit_task(total_resource_request) if not admitted: raise RuntimeError( - f"Not enough resources available to admit {num_actors} actors, each with resource request: {resource_request}" + f"Not enough resources available to admit {num_actors} actors, each with resource request: {actor_resource_request}" ) try: - self._actor_pools[actor_pool_id] = PyActorPool(actor_pool_id, num_actors, resource_request, projection) + self._actor_pools[actor_pool_id] = PyActorPool( + actor_pool_id, num_actors, actor_resource_request, projection + ) self._actor_pools[actor_pool_id].setup() logger.debug("Created actor pool %s with resources: %s", actor_pool_id, total_resource_request) yield actor_pool_id diff --git a/daft/runners/ray_runner.py b/daft/runners/ray_runner.py index d29a15c9f2..c0579e11e5 100644 --- a/daft/runners/ray_runner.py +++ b/daft/runners/ray_runner.py @@ -986,8 +986,12 @@ def __init__( self._projection = projection def setup(self) -> None: + ray_options = _get_ray_task_options(self._resource_request_per_actor) + self._actors = [ - DaftRayActor.options(name=f"rank={rank}-{self._id}").remote(self._execution_config, self._projection) # type: ignore + DaftRayActor.options(name=f"rank={rank}-{self._id}", **ray_options).remote( # type: ignore + self._execution_config, self._projection + ) for rank in range(self._num_actors) ] @@ -1155,8 +1159,16 @@ def run_iter_tables( @contextlib.contextmanager def actor_pool_context( - self, name: str, resource_request: ResourceRequest, num_actors: PartID, projection: ExpressionsProjection + self, + name: str, + actor_resource_request: ResourceRequest, + task_resource_request: ResourceRequest, + num_actors: PartID, + projection: ExpressionsProjection, ) -> Iterator[str]: + # Ray runs actor methods serially, so the resource request for an actor should be both the actor's resources and the task's resources + resource_request = actor_resource_request + task_resource_request + execution_config = get_context().daft_execution_config if self.ray_client_mode: try: diff --git a/daft/runners/runner.py b/daft/runners/runner.py index c1dd30f64e..730f5e1a4a 100644 --- a/daft/runners/runner.py +++ b/daft/runners/runner.py @@ -67,7 +67,8 @@ def run_iter_tables( def actor_pool_context( self, name: str, - resource_request: ResourceRequest, + actor_resource_request: ResourceRequest, + task_resource_request: ResourceRequest, num_actors: int, projection: ExpressionsProjection, ) -> Iterator[str]: diff --git a/tests/actor_pool/test_pyactor_pool.py b/tests/actor_pool/test_pyactor_pool.py index e95feec9ed..f34d91bd7b 100644 --- a/tests/actor_pool/test_pyactor_pool.py +++ b/tests/actor_pool/test_pyactor_pool.py @@ -69,5 +69,7 @@ def test_pyactor_pool_not_enough_resources(): assert isinstance(runner, PyRunner) with pytest.raises(RuntimeError, match=f"Requested {float(cpu_count + 1)} CPUs but found only"): - with runner.actor_pool_context("my-pool", ResourceRequest(num_cpus=1), cpu_count + 1, projection) as _: + with runner.actor_pool_context( + "my-pool", ResourceRequest(num_cpus=1), ResourceRequest(), cpu_count + 1, projection + ) as _: pass diff --git a/tests/test_resource_requests.py b/tests/test_resource_requests.py index 20a2aadf21..ec867aada7 100644 --- a/tests/test_resource_requests.py +++ b/tests/test_resource_requests.py @@ -8,7 +8,7 @@ import daft from daft import context, udf -from daft.context import get_context +from daft.context import get_context, set_planning_config from daft.daft import SystemInfo from daft.expressions import col from daft.internal.gpu import cuda_device_count @@ -127,6 +127,19 @@ def test_requesting_too_much_memory(): ### +@pytest.fixture(scope="function", params=[True]) +def enable_actor_pool(): + try: + original_config = get_context().daft_planning_config + + set_planning_config( + config=get_context().daft_planning_config.with_config_values(enable_actor_pool_projections=True) + ) + yield + finally: + set_planning_config(config=original_config) + + @udf(return_dtype=daft.DataType.int64()) def assert_resources(c, num_cpus=None, num_gpus=None, memory=None): assigned_resources = ray.get_runtime_context().get_assigned_resources() @@ -141,6 +154,24 @@ def assert_resources(c, num_cpus=None, num_gpus=None, memory=None): return c +@udf(return_dtype=daft.DataType.int64()) +class AssertResourcesStateful: + def __init__(self): + pass + + def __call__(self, c, num_cpus=None, num_gpus=None, memory=None): + assigned_resources = ray.get_runtime_context().get_assigned_resources() + + for resource, ray_resource_key in [(num_cpus, "CPU"), (num_gpus, "GPU"), (memory, "memory")]: + if resource is None: + assert ray_resource_key not in assigned_resources or assigned_resources[ray_resource_key] is None + else: + assert ray_resource_key in assigned_resources + assert assigned_resources[ray_resource_key] == resource + + return c + + RAY_VERSION_LT_2 = int(ray.__version__.split(".")[0]) < 2 @@ -187,6 +218,51 @@ def test_with_column_folded_rayrunner(): df.collect() +@pytest.mark.skipif( + RAY_VERSION_LT_2, reason="The ray.get_runtime_context().get_assigned_resources() was only added in Ray >= 2.0" +) +@pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use") +def test_with_column_rayrunner_class(enable_actor_pool): + assert_resources = AssertResourcesStateful.with_concurrency(1) + + df = daft.from_pydict(DATA).repartition(2) + + assert_resources_parametrized = assert_resources.override_options(num_cpus=1, memory_bytes=1_000_000, num_gpus=None) + df = df.with_column( + "resources_ok", + assert_resources_parametrized(col("id"), num_cpus=1, num_gpus=None, memory=1_000_000), + ) + + df.collect() + + +@pytest.mark.skipif( + RAY_VERSION_LT_2, reason="The ray.get_runtime_context().get_assigned_resources() was only added in Ray >= 2.0" +) +@pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use") +def test_with_column_folded_rayrunner_class(enable_actor_pool): + assert_resources = AssertResourcesStateful.with_concurrency(1) + + df = daft.from_pydict(DATA).repartition(2) + + df = df.with_column( + "no_requests", + assert_resources(col("id"), num_cpus=1), # UDFs have 1 CPU by default + ) + + assert_resources_1 = assert_resources.override_options(num_cpus=1, memory_bytes=5_000_000) + df = df.with_column( + "more_memory_request", + assert_resources_1(col("id"), num_cpus=1, memory=5_000_000), + ) + assert_resources_2 = assert_resources.override_options(num_cpus=1, memory_bytes=None) + df = df.with_column( + "more_cpu_request", + assert_resources_2(col("id"), num_cpus=1), + ) + df.collect() + + ### # GPU tests - can only run if machine has a GPU ###