-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PERF-#239: Improve task scheduler for MPI backend #245
base: master
Are you sure you want to change the base?
Conversation
d4c6ecb
to
b619245
Compare
else: | ||
data_id = object_store.generate_data_id(garbage_collector) | ||
dest_rank = RoundRobin.get_instance().schedule_rank() | ||
object_store.put(data_id, data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to put data into the object store of the master process? We want to push data to a worker and save the owner of the data into the object store of the main process. In order to achieve this we should probably rework push_data method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have added an additional function(push_data_directly_to_worker) to push data directly without storing in object store. Did not modify push_data method as the new method would need the actual data also as argument and it relied on object_store.
# dest_rank = RoundRobin.get_instance().schedule_rank() | ||
dest_rank = choose_destination_rank(collected_data_ids) | ||
print(dest_rank, collected_data_ids) | ||
push_data_owners(dest_rank, collected_data_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand, there is a case when we push the data to, for instance, rank 2 during ref = unidist.put(data)
and then we push the data owner (the same worker with rank 2) to it itself during foo.remote(ref)
. We should probably avoid such cases, i.e., don't push the data owner if the worker is the data owner itself.
# simple example
import unidist
unidist.init()
o_ref = unidist.put(1)
@unidist.remote
def foo(obj):
return obj
foo.remote(o_ref) # here we push the owner (rank 2) to the worker with rank 2
if isinstance(value, (list, tuple)): | ||
for v in value: | ||
collect_all_data_id_from_args(v, collected_data_ids) | ||
elif isinstance(value, dict): | ||
for v in value.values(): | ||
collect_all_data_id_from_args(v, collected_data_ids) | ||
elif is_data_id(value): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible if we don't get into any of if branches? You can add raise an exception at the end of the function to check this when testing with Modin.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here as the plan is to just collect data_ids, recursively so if the args are not data_ids, eg functions, python objects etc it is expected not to get into any if branches. Will add an exception if object store does not contain an owner.
int | ||
A rank number. | ||
""" | ||
if not data_ids: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is data_ids an empty list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would for the case where there are no arguments for the remote function. So the data_ids would be empty and none of workers will have data shares so submitting them using round robin.
eg.
@unidist.remotedef foo():
return 1
o = foo.remote()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put a comment describing this into the code.
# Data sizes | ||
self._data_sizes = defaultdict(int) | ||
# Mapping from python identity id() to DataID {id() : DataID} | ||
self._identity_data_id_map = defaultdict(lambda: None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this map?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer required, will remove this.
The original purpose of this map was to make sure to not send same python object to workers multiple times, as put was planned to be used in submit method for all arguments but this was later changed.
…ethod and submitting task to workers with maximum data share
…collect and send data_id
Signed-off-by: arun696 <[email protected]>
9edb5b7
to
569e88b
Compare
…ta_id Signed-off-by: arunjose696 <[email protected]>
569e88b
to
47b59cb
Compare
600c169
to
3d093cb
Compare
f4717d9
to
51718b5
Compare
Signed-off-by: arun696 <[email protected]>
51718b5
to
73fb416
Compare
…data_id Signed-off-by: arunjose696 <[email protected]>
6a63384
to
e60fe96
Compare
What do these changes do?
Improve task scheduler for MPI backend, Replaced the round robin algorithm to submit tasks, with a getting a worker rank with maximum data share. Data is pushed to workers in the put method.
Data id and data sizes would be stored in object store.
flake8 .
black .
git commit -s
docs/developer/architecture.rst
is up-to-date