Skip to content

Commit

Permalink
Addressed review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Srinath Krishnamachari <[email protected]>
  • Loading branch information
srinathk10 committed Oct 17, 2024
1 parent 175662c commit 1085e80
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,22 +289,22 @@ def base_resource_usage(self) -> ExecutionResources:
)

def current_processor_usage(self) -> ExecutionResources:
# Only alive actors count towards our current resource usage.
num_alive_workers = self._actor_pool.num_alive_actors()
# Both pending and running actors count towards our current resource usage.
num_active_workers = self._actor_pool.current_size()
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_alive_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_alive_workers,
cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
)

def pending_processor_usage(self) -> ExecutionResources:
# Both pending and restarting actors count towards pending processor usage
num_pending_proc_workers = (
num_pending_workers = (
self._actor_pool.num_pending_actors()
+ self._actor_pool.num_restarting_actors()
)
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_pending_proc_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_pending_proc_workers,
cpu=self._ray_remote_args.get("num_cpus", 0) * num_pending_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_pending_workers,
)

def num_active_actors(self) -> int:
Expand Down Expand Up @@ -362,25 +362,14 @@ def _apply_default_remote_args(ray_remote_args: Dict[str, Any]) -> Dict[str, Any
def get_autoscaling_actor_pools(self) -> List[AutoscalingActorPool]:
return [self._actor_pool]

def _manage_actor_restarting_state(self, actor):
actor_state = actor._get_local_state()
if actor_state is None:
# actor._get_local_state can return None if the state is Unknown
return
elif actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE:
# If an actor is not ALIVE, it's a candidate to be marked as a
# restarting actor.
assert actor_state is gcs_pb2.ActorTableData.ActorState.RESTARTING
self._actor_pool.mark_running_actor_as_restarting(actor)
else:
# If an actor is ALIVE, it's a candidate to be marked as an
# alive actor, if not already the case.
self._actor_pool.mark_actor_as_alive(actor)

def update_resource_usage(self) -> None:
"""Updates resources usage."""
for actor in self._actor_pool.get_running_actor_refs():
self._manage_actor_restarting_state(actor)
actor_state = actor._get_local_state()
if actor_state is None:
# actor._get_local_state can return None if the state is Unknown
continue
self._actor_pool.update_running_actor_state(actor, actor_state)


class _MapWorker:
Expand Down Expand Up @@ -419,17 +408,17 @@ def __repr__(self):


@dataclass
class _ActorRunningState:
"""Actor running state"""
class _ActorState:
"""Actor state"""

# Number of tasks in flight per actor
num_tasks_in_flight: int

# Node id of each ready actor
actor_location: str

# Is Actor in restarting state
is_restarting: bool
# Actor state
actor_state: gcs_pb2.ActorTableData.ActorState


class _ActorPool(AutoscalingActorPool):
Expand Down Expand Up @@ -458,7 +447,7 @@ def __init__(
assert self._create_actor_fn is not None

# Actors that have started running, including alive and restarting actors.
self._running_actors: Dict[ray.actor.ActorHandle, _ActorRunningState] = {}
self._running_actors: Dict[ray.actor.ActorHandle, _ActorState] = {}
# Actors that are not yet ready (still pending creation).
self._pending_actors: Dict[ObjectRef, ray.actor.ActorHandle] = {}
# Whether actors that become idle should be eagerly killed. This is False until
Expand All @@ -483,28 +472,23 @@ def num_running_actors(self) -> int:
return len(self._running_actors)

def num_restarting_actors(self) -> int:
"""Restarting actors are all the running actors not in ALIVE state."""
return sum(
running_actor_state.is_restarting
running_actor_state.actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE
for running_actor_state in self._running_actors.values()
)

def num_active_actors(self) -> int:
"""Active actors are all the running actors with inflight tasks."""
return sum(
1 if running_actor_state.num_tasks_in_flight > 0 else 0
for running_actor_state in self._running_actors.values()
)

def num_alive_actors(self) -> int:
"""Alive actors must have inflight tasks and each of those should be in ALIVE
state.
"""
"""Alive actors are all the running actors in ALIVE state."""
return sum(
1
if (
running_actor_state.num_tasks_in_flight > 0
and running_actor_state.is_restarting is False
)
else 0
running_actor_state.actor_state == gcs_pb2.ActorTableData.ActorState.ALIVE
for running_actor_state in self._running_actors.values()
)

Expand Down Expand Up @@ -535,23 +519,19 @@ def scale_down(self, num_actors: int) -> int:

# === End of overriding methods of AutoscalingActorPool ===

def mark_running_actor_as_restarting(self, actor: ray.actor.ActorHandle):
"""Mark the running actor as restarting.
Args:
actor: The running actor to be marked as restarting.
"""
assert actor in self._running_actors
self._running_actors[actor].is_restarting = True

def mark_actor_as_alive(self, actor: ray.actor.ActorHandle):
"""Mark the running actor as alive.
def update_running_actor_state(
self,
actor: ray.actor.ActorHandle,
actor_state: gcs_pb2.ActorTableData.ActorState,
):
"""Update running actor state.
Args:
actor: The running actor to be marked as alive.
actor: The running actor that needs state update.
actor_state: Updated actor state for the running actor.
"""
assert actor in self._running_actors
self._running_actors[actor].is_restarting = False
self._running_actors[actor].actor_state = actor_state

def add_pending_actor(self, actor: ray.actor.ActorHandle, ready_ref: ray.ObjectRef):
"""Adds a pending actor to the pool.
Expand Down Expand Up @@ -583,10 +563,10 @@ def pending_to_running(self, ready_ref: ray.ObjectRef) -> bool:
# The actor has been removed from the pool before becoming running.
return False
actor = self._pending_actors.pop(ready_ref)
self._running_actors[actor] = _ActorRunningState(
self._running_actors[actor] = _ActorState(
num_tasks_in_flight=0,
actor_location=ray.get(ready_ref),
is_restarting=False,
actor_state=gcs_pb2.ActorTableData.ActorState.ALIVE,
)
return True

Expand All @@ -613,25 +593,33 @@ def pick_actor(
def penalty_key(actor):
"""Returns the key that should be minimized for the best actor.
We prioritize valid actors, those with argument locality, and those that
are not busy, in that order.
We prioritize actors with argument locality, and those that are not busy,
in that order.
"""
busyness = self._running_actors[actor].num_tasks_in_flight
is_restarting = self._running_actors[actor].is_restarting
invalid = busyness >= self._max_tasks_in_flight or is_restarting
requires_remote_fetch = (
self._running_actors[actor].actor_location != preferred_loc
)
return invalid, requires_remote_fetch, busyness
return requires_remote_fetch, busyness

actor = min(self._running_actors.keys(), key=penalty_key)
if (
self._running_actors[actor].num_tasks_in_flight >= self._max_tasks_in_flight
or self._running_actors[actor].is_restarting
):
# All actors are at capacity or restarting.
# Filter out actors that are invalid, i.e. actors with tasks in flight
# exceeding _max_tasks_in_flight or actor_state is not ALIVE.
valid_actors = [
actor
for actor in self._running_actors
if self._running_actors[actor].num_tasks_in_flight
< self._max_tasks_in_flight
and self._running_actors[actor].actor_state
== gcs_pb2.ActorTableData.ActorState.ALIVE
]

if not valid_actors:
# All actors are at capacity or actor state is not ALIVE.
return None

# Pick the best valid actor based on the penalty key
actor = min(valid_actors, key=penalty_key)

if locality_hint:
if self._running_actors[actor].actor_location == preferred_loc:
self._locality_hits += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,6 @@ def mark_finished(self, exception: Optional[Exception] = None):
else:
self._exception = exception

def update_resource_usage(self) -> None:
"""Updates resource usage of this operator at runtime.
This method will be called at runtime in each StreamingExecutor iteration.
Subclasses can override it to account for dynamic resource usage updates due to
restarting actors, retrying tasks, lost objects, etc.
"""
self.op.update_resource_usage()


def build_streaming_topology(
dag: PhysicalOperator, options: ExecutionOptions
Expand Down
64 changes: 6 additions & 58 deletions python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import ray
from ray._private.test_utils import wait_for_condition
from ray.actor import ActorHandle
from ray.core.generated import gcs_pb2
from ray.data._internal.compute import ActorPoolStrategy
from ray.data._internal.execution.operators.actor_pool_map_operator import _ActorPool
from ray.data._internal.execution.util import make_ref_bundles
Expand Down Expand Up @@ -129,12 +130,14 @@ def test_pending_to_running(self):
assert pool.num_free_slots() == 3

def test_restarting_to_alive(self):
# Test that actor is correctly transitioned from restarting to running.
# Test that actor is correctly transitioned from restarting to alive.
pool = self._create_actor_pool(max_tasks_in_flight=1)
actor = self._add_ready_actor(pool)

# Mark the actor as restarting and test pick_actor fails
pool.mark_running_actor_as_restarting(actor)
pool.update_running_actor_state(
actor, gcs_pb2.ActorTableData.ActorState.RESTARTING
)
assert pool.pick_actor() is None
assert pool.current_size() == 1
assert pool.num_pending_actors() == 0
Expand All @@ -146,7 +149,7 @@ def test_restarting_to_alive(self):
assert pool.num_free_slots() == 1

# Mark the actor as alive and test pick_actor succeeds
pool.mark_actor_as_alive(actor)
pool.update_running_actor_state(actor, gcs_pb2.ActorTableData.ActorState.ALIVE)
picked_actor = pool.pick_actor()
assert picked_actor == actor
assert pool.current_size() == 1
Expand Down Expand Up @@ -190,30 +193,6 @@ def test_return_actor(self):
assert pool.num_idle_actors() == 1 # Actor should now be idle.
assert pool.num_free_slots() == 999

def test_returned_actor_to_running(self):
# Test that we can return the actor and it will be marked as running and clear
# restarting flag.
pool = self._create_actor_pool(max_tasks_in_flight=999)
self._add_ready_actor(pool)
# Pick the actor
picked_actor = pool.pick_actor()
assert pool.num_restarting_actors() == 0
assert pool.num_alive_actors() == 1
pool.mark_running_actor_as_restarting(picked_actor)
assert pool.num_restarting_actors() == 1
assert pool.num_alive_actors() == 0
# Return the actor
pool.mark_actor_as_alive(picked_actor)
pool.return_actor(picked_actor)
assert pool.num_restarting_actors() == 0
# Check that the per-state pool sizes are as expected.
assert pool.current_size() == 1
assert pool.num_pending_actors() == 0
assert pool.num_running_actors() == 1
assert pool.num_active_actors() == 0
assert pool.num_idle_actors() == 1 # Actor should now be idle.
assert pool.num_free_slots() == 999

def test_pick_max_tasks_in_flight(self):
# Test that we can't pick an actor beyond the max_tasks_in_flight cap.
pool = self._create_actor_pool(max_tasks_in_flight=2)
Expand Down Expand Up @@ -270,35 +249,6 @@ def test_pick_all_max_tasks_in_flight(self):
# Check that the next pick doesn't return an actor.
assert pool.pick_actor() is None

def test_pick_ordering_restarting(self):
# Test that pick ordering is honored by restarting actors
pool = self._create_actor_pool(max_tasks_in_flight=2)
# Add 4 actors to the pool.
actors = [self._add_ready_actor(pool) for _ in range(4)]

# Pick actors
for _ in range(4):
picked_actor = pool.pick_actor()
assert pool._running_actors[picked_actor].num_tasks_in_flight == 1

# Mark actor[0] as restarting
pool.mark_running_actor_as_restarting(actors[0])

# Verify clearing restarting makes the actor pickable
for _ in range(4):
picked_actor = pool.pick_actor()
if picked_actor is not None:
assert pool._running_actors[picked_actor].num_tasks_in_flight == 2
else:
picked_actor = actors[0]
assert pool._running_actors[picked_actor].num_tasks_in_flight == 1
pool.mark_actor_as_alive(picked_actor)
picked_actor = pool.pick_actor()
picked_actor = actors[0]
assert pool._running_actors[picked_actor].num_tasks_in_flight == 2
# Check that the next pick doesn't return an actor.
assert pool.pick_actor() is None

def test_pick_ordering_with_returns(self):
# Test that pick ordering works with returns.
pool = self._create_actor_pool()
Expand Down Expand Up @@ -625,8 +575,6 @@ def __call__(self, x):

from ray.exceptions import GetTimeoutError

ray.shutdown()
ray.init()
ray.data.DataContext.get_current().wait_for_min_actors_s = 1

with pytest.raises(
Expand Down
12 changes: 6 additions & 6 deletions python/ray/data/tests/test_executor_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co
assert op.incremental_resource_usage() == ExecutionResources(
cpu=0, gpu=0, object_store_memory=inc_obj_store_mem
)
assert op.current_processor_usage() == ExecutionResources(cpu=0, gpu=0)
assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0)
assert op.metrics.obj_store_mem_internal_inqueue == 0
assert op.metrics.obj_store_mem_internal_outqueue == 0
assert op.metrics.obj_store_mem_pending_task_inputs == 0
Expand All @@ -304,7 +304,7 @@ def test_actor_pool_resource_reporting(ray_start_10_cpus_shared, restore_data_co
cpu=0, gpu=0, object_store_memory=inc_obj_store_mem
)
op.add_input(input_op.get_next(), 0)
assert op.current_processor_usage() == ExecutionResources(cpu=0, gpu=0)
assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0)
assert op.metrics.obj_store_mem_internal_inqueue == pytest.approx(
(i + 1) * 800, rel=0.5
)
Expand Down Expand Up @@ -384,7 +384,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
assert op.incremental_resource_usage() == ExecutionResources(
cpu=0, gpu=0, object_store_memory=inc_obj_store_mem
)
assert op.current_processor_usage() == ExecutionResources(cpu=0, gpu=0)
assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0)
assert op.metrics.obj_store_mem_internal_inqueue == 0
assert op.metrics.obj_store_mem_internal_outqueue == 0
assert op.metrics.obj_store_mem_pending_task_inputs == 0
Expand All @@ -396,7 +396,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
cpu=0, gpu=0, object_store_memory=inc_obj_store_mem
)
op.add_input(input_op.get_next(), 0)
assert op.current_processor_usage() == ExecutionResources(cpu=0, gpu=0)
assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0)
assert op.metrics.obj_store_mem_internal_inqueue == pytest.approx(
(i + 1) * 800, rel=0.5
)
Expand All @@ -405,7 +405,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
assert op.metrics.obj_store_mem_pending_task_outputs == 0

# Pool is still idle while waiting for actors to start.
assert op.current_processor_usage() == ExecutionResources(cpu=0, gpu=0)
assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0)
assert op.metrics.obj_store_mem_internal_inqueue == pytest.approx(3200, rel=0.5)
assert op.metrics.obj_store_mem_internal_outqueue == 0
assert op.metrics.obj_store_mem_pending_task_inputs == 0
Expand All @@ -417,7 +417,7 @@ def test_actor_pool_resource_reporting_with_bundling(ray_start_10_cpus_shared):
run_op_tasks_sync(op, only_existing=True)

# Actors have now started and the pool is actively running tasks.
assert op.current_processor_usage() == ExecutionResources(cpu=1, gpu=0)
assert op.current_processor_usage() == ExecutionResources(cpu=2, gpu=0)

# Indicate that no more inputs will arrive.
op.all_inputs_done()
Expand Down

0 comments on commit 1085e80

Please sign in to comment.