Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#239: Improve task scheduler for MPI backend #245

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

arunjose696
Copy link
Contributor

@arunjose696 arunjose696 commented Feb 15, 2023

What do these changes do?

Improve task scheduler for MPI backend, Replaced the round robin algorithm to submit tasks, with a getting a worker rank with maximum data share. Data is pushed to workers in the put method.
Data id and data sizes would be stored in object store.

@arunjose696 arunjose696 force-pushed the scheduling_MPI branch 3 times, most recently from d4c6ecb to b619245 Compare February 27, 2023 16:58
else:
data_id = object_store.generate_data_id(garbage_collector)
dest_rank = RoundRobin.get_instance().schedule_rank()
object_store.put(data_id, data)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to put data into the object store of the master process? We want to push data to a worker and save the owner of the data into the object store of the main process. In order to achieve this we should probably rework push_data method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added an additional function(push_data_directly_to_worker) to push data directly without storing in object store. Did not modify push_data method as the new method would need the actual data also as argument and it relied on object_store.

# dest_rank = RoundRobin.get_instance().schedule_rank()
dest_rank = choose_destination_rank(collected_data_ids)
print(dest_rank, collected_data_ids)
push_data_owners(dest_rank, collected_data_ids)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, there is a case when we push the data to, for instance, rank 2 during ref = unidist.put(data) and then we push the data owner (the same worker with rank 2) to it itself during foo.remote(ref). We should probably avoid such cases, i.e., don't push the data owner if the worker is the data owner itself.

# simple example
import unidist

unidist.init()

o_ref = unidist.put(1)

@unidist.remote
def foo(obj):
    return obj

foo.remote(o_ref) # here we push the owner (rank 2) to the worker with rank 2

Comment on lines +323 to +357
if isinstance(value, (list, tuple)):
for v in value:
collect_all_data_id_from_args(v, collected_data_ids)
elif isinstance(value, dict):
for v in value.values():
collect_all_data_id_from_args(v, collected_data_ids)
elif is_data_id(value):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible if we don't get into any of if branches? You can add raise an exception at the end of the function to check this when testing with Modin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as the plan is to just collect data_ids, recursively so if the args are not data_ids, eg functions, python objects etc it is expected not to get into any if branches. Will add an exception if object store does not contain an owner.

int
A rank number.
"""
if not data_ids:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is data_ids an empty list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would for the case where there are no arguments for the remote function. So the data_ids would be empty and none of workers will have data shares so submitting them using round robin.

eg.

@unidist.remotedef foo():
    return 1 
o = foo.remote()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a comment describing this into the code.

# Data sizes
self._data_sizes = defaultdict(int)
# Mapping from python identity id() to DataID {id() : DataID}
self._identity_data_id_map = defaultdict(lambda: None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer required, will remove this.

The original purpose of this map was to make sure to not send same python object to workers multiple times, as put was planned to be used in submit method for all arguments but this was later changed.

@arunjose696 arunjose696 force-pushed the scheduling_MPI branch 2 times, most recently from 9edb5b7 to 569e88b Compare March 6, 2023 15:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[MPI] Improve task scheduler for MPI backend
2 participants