Skip to content

Commit

Permalink
[BUG] Add resources to Ray stateful UDF actor (#2987)
Browse files Browse the repository at this point in the history
Fix for #2878
  • Loading branch information
kevinzwang authored Oct 4, 2024
1 parent 62d0581 commit a62d276
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 9 deletions.
1 change: 1 addition & 0 deletions daft/execution/physical_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def actor_pool_project(
with get_context().runner().actor_pool_context(
actor_pool_name,
actor_resource_request,
task_resource_request,
num_actors,
projection,
) as actor_pool_id:
Expand Down
15 changes: 11 additions & 4 deletions daft/runners/pyrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,20 +337,27 @@ def run_iter_tables(

@contextlib.contextmanager
def actor_pool_context(
self, name: str, resource_request: ResourceRequest, num_actors: int, projection: ExpressionsProjection
self,
name: str,
actor_resource_request: ResourceRequest,
_task_resource_request: ResourceRequest,
num_actors: int,
projection: ExpressionsProjection,
) -> Iterator[str]:
actor_pool_id = f"py_actor_pool-{name}"

total_resource_request = resource_request * num_actors
total_resource_request = actor_resource_request * num_actors
admitted = self._attempt_admit_task(total_resource_request)

if not admitted:
raise RuntimeError(
f"Not enough resources available to admit {num_actors} actors, each with resource request: {resource_request}"
f"Not enough resources available to admit {num_actors} actors, each with resource request: {actor_resource_request}"
)

try:
self._actor_pools[actor_pool_id] = PyActorPool(actor_pool_id, num_actors, resource_request, projection)
self._actor_pools[actor_pool_id] = PyActorPool(
actor_pool_id, num_actors, actor_resource_request, projection
)
self._actor_pools[actor_pool_id].setup()
logger.debug("Created actor pool %s with resources: %s", actor_pool_id, total_resource_request)
yield actor_pool_id
Expand Down
16 changes: 14 additions & 2 deletions daft/runners/ray_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,8 +986,12 @@ def __init__(
self._projection = projection

def setup(self) -> None:
ray_options = _get_ray_task_options(self._resource_request_per_actor)

self._actors = [
DaftRayActor.options(name=f"rank={rank}-{self._id}").remote(self._execution_config, self._projection) # type: ignore
DaftRayActor.options(name=f"rank={rank}-{self._id}", **ray_options).remote( # type: ignore
self._execution_config, self._projection
)
for rank in range(self._num_actors)
]

Expand Down Expand Up @@ -1155,8 +1159,16 @@ def run_iter_tables(

@contextlib.contextmanager
def actor_pool_context(
self, name: str, resource_request: ResourceRequest, num_actors: PartID, projection: ExpressionsProjection
self,
name: str,
actor_resource_request: ResourceRequest,
task_resource_request: ResourceRequest,
num_actors: PartID,
projection: ExpressionsProjection,
) -> Iterator[str]:
# Ray runs actor methods serially, so the resource request for an actor should be both the actor's resources and the task's resources
resource_request = actor_resource_request + task_resource_request

execution_config = get_context().daft_execution_config
if self.ray_client_mode:
try:
Expand Down
3 changes: 2 additions & 1 deletion daft/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def run_iter_tables(
def actor_pool_context(
self,
name: str,
resource_request: ResourceRequest,
actor_resource_request: ResourceRequest,
task_resource_request: ResourceRequest,
num_actors: int,
projection: ExpressionsProjection,
) -> Iterator[str]:
Expand Down
4 changes: 3 additions & 1 deletion tests/actor_pool/test_pyactor_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,7 @@ def test_pyactor_pool_not_enough_resources():
assert isinstance(runner, PyRunner)

with pytest.raises(RuntimeError, match=f"Requested {float(cpu_count + 1)} CPUs but found only"):
with runner.actor_pool_context("my-pool", ResourceRequest(num_cpus=1), cpu_count + 1, projection) as _:
with runner.actor_pool_context(
"my-pool", ResourceRequest(num_cpus=1), ResourceRequest(), cpu_count + 1, projection
) as _:
pass
78 changes: 77 additions & 1 deletion tests/test_resource_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import daft
from daft import context, udf
from daft.context import get_context
from daft.context import get_context, set_planning_config
from daft.daft import SystemInfo
from daft.expressions import col
from daft.internal.gpu import cuda_device_count
Expand Down Expand Up @@ -127,6 +127,19 @@ def test_requesting_too_much_memory():
###


@pytest.fixture(scope="function", params=[True])
def enable_actor_pool():
try:
original_config = get_context().daft_planning_config

set_planning_config(
config=get_context().daft_planning_config.with_config_values(enable_actor_pool_projections=True)
)
yield
finally:
set_planning_config(config=original_config)


@udf(return_dtype=daft.DataType.int64())
def assert_resources(c, num_cpus=None, num_gpus=None, memory=None):
assigned_resources = ray.get_runtime_context().get_assigned_resources()
Expand All @@ -141,6 +154,24 @@ def assert_resources(c, num_cpus=None, num_gpus=None, memory=None):
return c


@udf(return_dtype=daft.DataType.int64())
class AssertResourcesStateful:
def __init__(self):
pass

def __call__(self, c, num_cpus=None, num_gpus=None, memory=None):
assigned_resources = ray.get_runtime_context().get_assigned_resources()

for resource, ray_resource_key in [(num_cpus, "CPU"), (num_gpus, "GPU"), (memory, "memory")]:
if resource is None:
assert ray_resource_key not in assigned_resources or assigned_resources[ray_resource_key] is None
else:
assert ray_resource_key in assigned_resources
assert assigned_resources[ray_resource_key] == resource

return c


RAY_VERSION_LT_2 = int(ray.__version__.split(".")[0]) < 2


Expand Down Expand Up @@ -187,6 +218,51 @@ def test_with_column_folded_rayrunner():
df.collect()


@pytest.mark.skipif(
RAY_VERSION_LT_2, reason="The ray.get_runtime_context().get_assigned_resources() was only added in Ray >= 2.0"
)
@pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use")
def test_with_column_rayrunner_class(enable_actor_pool):
assert_resources = AssertResourcesStateful.with_concurrency(1)

df = daft.from_pydict(DATA).repartition(2)

assert_resources_parametrized = assert_resources.override_options(num_cpus=1, memory_bytes=1_000_000, num_gpus=None)
df = df.with_column(
"resources_ok",
assert_resources_parametrized(col("id"), num_cpus=1, num_gpus=None, memory=1_000_000),
)

df.collect()


@pytest.mark.skipif(
RAY_VERSION_LT_2, reason="The ray.get_runtime_context().get_assigned_resources() was only added in Ray >= 2.0"
)
@pytest.mark.skipif(get_context().runner_config.name not in {"ray"}, reason="requires RayRunner to be in use")
def test_with_column_folded_rayrunner_class(enable_actor_pool):
assert_resources = AssertResourcesStateful.with_concurrency(1)

df = daft.from_pydict(DATA).repartition(2)

df = df.with_column(
"no_requests",
assert_resources(col("id"), num_cpus=1), # UDFs have 1 CPU by default
)

assert_resources_1 = assert_resources.override_options(num_cpus=1, memory_bytes=5_000_000)
df = df.with_column(
"more_memory_request",
assert_resources_1(col("id"), num_cpus=1, memory=5_000_000),
)
assert_resources_2 = assert_resources.override_options(num_cpus=1, memory_bytes=None)
df = df.with_column(
"more_cpu_request",
assert_resources_2(col("id"), num_cpus=1),
)
df.collect()


###
# GPU tests - can only run if machine has a GPU
###
Expand Down

0 comments on commit a62d276

Please sign in to comment.