-
Notifications
You must be signed in to change notification settings - Fork 5.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Mark restarting actors are pending actors #47946
base: master
Are you sure you want to change the base?
Mark restarting actors are pending actors #47946
Conversation
8e4578a
to
766db7d
Compare
in self._actor_pool._restarting_actors | ||
) | ||
# Move the actor from restarting to running state. | ||
self._actor_pool.restarting_to_running(actor_to_return) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to return the actor in this case as well. Otherwise, the actor will be no longer usable.
BTW, let's add a unit test in test_actor_pool_map_operator.py
to cover this case.
@@ -221,7 +231,7 @@ def _task_done_callback(actor_to_return): | |||
self._submit_data_task( | |||
gen, | |||
bundle, | |||
lambda: _task_done_callback(actor_to_return), | |||
lambda: _task_done_callback(actor_to_return), # noqa: B023 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this change seems unrelated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, was suppressing a warning with ./scripts/format.sh
actors = list(self._actor_pool._num_tasks_in_flight.keys()) | ||
for actor in actors: | ||
actor_state = actor._get_local_state() | ||
if actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add an assertion here to make sure we are only handling RESTARTING state here.
if actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE: | ||
# If an actor is not ALIVE, it's a candidate to be marked as a | ||
# restarting actor. | ||
self._actor_pool.running_to_restarting(actor, actor.get_location) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._actor_pool.running_to_restarting(actor, actor.get_location) | |
if self._actor_pool.is_actor_running(actor): | |
self._actor_pool.running_to_restarting(actor, actor.get_location.remote()) |
- Moving the running check here would be cleaner. And more importantly, we should only send
get_location
when the actor switched from running to restarting. .remote()
was missed afterget_location
. It didn't error out probably because actor locality is disabled by default right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah ok. get_location is valid only when state is ALIVE
else: | ||
# If an actor is ALIVE, it's a candidate to be marked as a | ||
# running actor, if not already the case. | ||
self._actor_pool.restarting_to_running(actor.get_location) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should use the actor handle as the key, actor.get_location
is a method, not an object ref.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the above comment, let's add some unit test to cover the state transitions.
from ray.tests.conftest import * # noqa | ||
|
||
|
||
def test_removed_nodes_and_added_back(ray_start_cluster): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- let's also test
pending_processor_usage
reports the correct usages during different stages. - maybe just move this test to
test_actor_pool_map_operator.py
.
# The actor has been removed from the pool before becoming running. | ||
return False | ||
actor = self._restarting_actors.pop(ready_ref) | ||
self._num_tasks_in_flight[actor] = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to keep the old _num_tasks_in_flights
and restore it here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see, makes sense
return True | ||
|
||
# Next prioritize killing restarting actor. | ||
killed = self._maybe_kill_restarting_actor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's only keep restarting actors with in_flight_tasks = 0 here.
6c0de82
to
0c6ee88
Compare
766ba8c
to
3969db2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
High-level structure looks good. Left some comments
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/interfaces/physical_operator.py
Outdated
Show resolved
Hide resolved
else: | ||
# If an actor is ALIVE, it's a candidate to be marked as a | ||
# running actor, if not already the case. | ||
self._actor_pool.clear_restarting_from_running_actor(actor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, the method names sound a bit too verbose. maybe just mark_actor_as_alive/restarting
?
@@ -309,6 +309,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: | |||
i += 1 | |||
if i % PROGRESS_BAR_UPDATE_INTERVAL == 0: | |||
self._refresh_progress_bars(topology) | |||
topology[op].update_resource_usage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just op.update_resource_usage()
so we don't need the extra indirection in OpState
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
self._num_tasks_in_flight[actor] -= 1 | ||
if self._should_kill_idle_actors and self._num_tasks_in_flight[actor] == 0: | ||
# Mark restarting as false, now that the actor in running | ||
self._running_actors[actor]._is_restarting = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after a second thought, I think it'd be slightly more clear to remove this and let the next update_resource_usage to handle the state transition
4c6b6c6
to
1f6474f
Compare
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
def num_alive_actors(self) -> int: | ||
return sum( | ||
1 | ||
if ( | ||
running_actor_state.num_tasks_in_flight > 0 | ||
and running_actor_state.is_restarting is False | ||
) | ||
else 0 | ||
for running_actor_state in self._running_actors.values() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean we don't count an actor as alive if it doesn't have any tasks in flight? If so, what're the implications of that (if any)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For scheduling tasks, we invoke pick_actors(). Earlier, it did not cover for the restarting case, but now it excludes actors restarting even with low in flight tasks.
Also resource accounting APIs current_processor_usage() and pending_processor_usage() account for restarting actors.
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
actor_state = actor._get_local_state() | ||
if actor_state != gcs_pb2.ActorTableData.ActorState.ALIVE: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if _get_local_state
returns None
? Looks like we assume that the actor is restarting -- do we need to worry about this edge case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If raylet.pyx:4371 get_local_actor_state() returns None, then actor.py:1561 _get_local_state() can return None.
I think it's defensive to check for None here, given I am not sure about the interface guarantee for get_local_actor_state().
Good catch! Let me fix the code.
|
||
def update_resource_usage(self) -> None: | ||
"""Updates resources usage.""" | ||
for actor in self._actor_pool._running_actors.keys(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Should we add a method to _ActorPool
that provides a list of actor handles so that we don't access the internal _running_actors
attribute?
def update_resource_usage(self) -> None: | ||
"""Updates resources usage.""" | ||
self.op.update_resource_usage() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is OpState.update_resource_usage
called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is invoked by _scheduling_loop_step in streaming_executor.py. Will add a comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that calls PhysicalOperator.update_resource_usage
and not OpState.update_resource_usage
? AFAIK OpState.update_resource_usage
doesn't have any references
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, nevermind, saw you removed OpState.update_resource_usage
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
self._running_actors[actor].num_tasks_in_flight >= self._max_tasks_in_flight | ||
or self._running_actors[actor].is_restarting | ||
): | ||
# All actors are at capacity or restarting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, not a new issue of this PR. but I think it'd be more clear if we filter the running actors by validness and then find the min.
python/ray/data/_internal/execution/streaming_executor_state.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
1085e80
to
caae0a5
Compare
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Outdated
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Show resolved
Hide resolved
python/ray/data/_internal/execution/operators/actor_pool_map_operator.py
Show resolved
Hide resolved
def test_actor_pool_fault_tolerance_e2e(ray_start_cluster): | ||
"""Test that a dataset with actor pools can finish, when | ||
all nodes in the cluster are removed and added back.""" | ||
ray.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this shutdown shouldn't be needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this shutdown, ray.init() line 608 is throwing.
E RuntimeError: Maybe you called ray.init twice by accident? This error can be suppressed by passing in 'ignore_reinit_error=True' or by calling 'ray.shutdown()' prior to 'ray.init()'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's because the previously test didn't shutdown the cluster. We can change the ray_start_regular_shared
to ray_start_regular
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. a few final small comments
def test_actor_pool_fault_tolerance_e2e(ray_start_cluster): | ||
"""Test that a dataset with actor pools can finish, when | ||
all nodes in the cluster are removed and added back.""" | ||
ray.shutdown() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's because the previously test didn't shutdown the cluster. We can change the ray_start_regular_shared
to ray_start_regular
actor_str += f", (pending: {pending})" | ||
desc += actor_str | ||
# Actors info | ||
desc += self.actor_info_progress_str() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, sorry, I meant just adding this actor_info_progress_str
in PhysicalOperator
and get rid of the num_xxx_actors
methods. Because it seems a bit overkill to have so many methods and indirections
@@ -309,6 +309,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool: | |||
i += 1 | |||
if i % PROGRESS_BAR_UPDATE_INTERVAL == 0: | |||
self._refresh_progress_bars(topology) | |||
op.update_resource_usage() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move this call inside ResourceManager.update_usages()
here
Because the updated info will be used in that function
59602a9
to
86954bf
Compare
86954bf
to
a2aac45
Compare
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
This reverts commit ad9070d. Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
…or.py Co-authored-by: Hao Chen <[email protected]> Signed-off-by: srinathk10 <[email protected]>
…perator.py Co-authored-by: Hao Chen <[email protected]> Signed-off-by: srinathk10 <[email protected]>
Co-authored-by: Hao Chen <[email protected]> Signed-off-by: srinathk10 <[email protected]>
Co-authored-by: Hao Chen <[email protected]> Signed-off-by: srinathk10 <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
…perator.py Co-authored-by: Balaji Veeramani <[email protected]> Signed-off-by: srinathk10 <[email protected]>
…perator.py Co-authored-by: Balaji Veeramani <[email protected]> Signed-off-by: srinathk10 <[email protected]>
…perator.py Co-authored-by: Balaji Veeramani <[email protected]> Signed-off-by: srinathk10 <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: Srinath Krishnamachari <[email protected]>
5e3fb9a
to
217ca5b
Compare
Signed-off-by: Srinath Krishnamachari <[email protected]>
217ca5b
to
b168682
Compare
Why are these changes needed?
In ActorPoolMapOperator that executes tasks on Actor pool, to schedule an incoming task, pick_actor is invoked. The pick_actor is a simple bin packing algo to pick a running Actor with least inflight tasks. When Actor restarts though, pick_actor needs to exclude it from task scheduling.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.