-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PERF-#239: Improve task scheduler for MPI backend #245
base: master
Are you sure you want to change the base?
Changes from all commits
b0bc015
1b94726
c496f8f
71cc00d
93204b2
47b59cb
8abb067
3d093cb
186e090
7045e94
73fb416
a3302e4
02f405d
e60fe96
e963e3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
import unidist.core.backends.mpi.core.communication as communication | ||
from unidist.core.backends.mpi.core.async_operations import AsyncOperations | ||
from unidist.core.backends.mpi.core.controller.object_store import object_store | ||
from collections import defaultdict | ||
|
||
logger = common.get_logger("common", "common.log") | ||
|
||
|
@@ -166,7 +167,6 @@ def request_worker_data(data_id): | |
def _push_local_data(dest_rank, data_id): | ||
""" | ||
Send local data associated with passed ID to target rank. | ||
|
||
Parameters | ||
---------- | ||
dest_rank : int | ||
|
@@ -209,6 +209,36 @@ def _push_local_data(dest_rank, data_id): | |
object_store.cache_send_info(data_id, dest_rank) | ||
|
||
|
||
def push_data_directly_to_worker(dest_rank, data_id, value): | ||
""" | ||
Send local data_id and associated value to target rank. | ||
|
||
Parameters | ||
---------- | ||
dest_rank : int | ||
Target rank. | ||
data_id : unidist.core.backends.mpi.core.common.MasterDataID | ||
An ID to data. | ||
value : value of the provided data_id | ||
|
||
""" | ||
async_operations = AsyncOperations.get_instance() | ||
mpi_state = communication.MPIState.get_instance() | ||
operation_type = common.Operation.PUT_DATA | ||
operation_data = { | ||
"id": data_id, | ||
"data": value, | ||
} | ||
h_list, _ = communication.isend_complex_operation( | ||
mpi_state.comm, | ||
operation_type, | ||
operation_data, | ||
dest_rank, | ||
is_serialized=False, | ||
) | ||
async_operations.extend(h_list) | ||
|
||
|
||
def _push_data_owner(dest_rank, data_id): | ||
""" | ||
Send data location associated with data ID to target rank. | ||
|
@@ -262,3 +292,80 @@ def push_data(dest_rank, value): | |
_push_data_owner(dest_rank, value) | ||
else: | ||
raise ValueError("Unknown DataID!") | ||
|
||
|
||
def push_data_owners(dest_rank, data_ids): | ||
""" | ||
Parse and send data owners for all values to destination rank. | ||
|
||
Send ID associated location to the target rank for all ranks that are not data owners. | ||
|
||
Parameters | ||
---------- | ||
dest_rank : int | ||
Rank where task data is needed. | ||
data_ids : list | ||
List of all data_ids required by the worker | ||
""" | ||
for data_id in data_ids: | ||
if object_store.contains_data_owner(data_id): | ||
if object_store.get_data_owner(data_id) != dest_rank: | ||
_push_data_owner(dest_rank, data_id) | ||
else: | ||
raise ValueError("Data owner not known") | ||
|
||
|
||
def choose_destination_rank(data_ids): | ||
""" | ||
Choose destination rank considering which worker has the maximum share of data. | ||
|
||
Get the data shares in each worker process and choose rank with the most data share. | ||
If data_ids are empty choses the rank using the round robin approach. | ||
|
||
Parameters | ||
---------- | ||
data_ids : list | ||
List of all data_ids required by the worker | ||
|
||
Returns | ||
------- | ||
int | ||
A rank number. | ||
""" | ||
# If none of the workers have any data the next rank is chosen by following a round robin. | ||
if not data_ids: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When is data_ids an empty list? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would for the case where there are no arguments for the remote function. So the data_ids would be empty and none of workers will have data shares so submitting them using round robin. eg.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put a comment describing this into the code. |
||
chosen_rank = RoundRobin.get_instance().schedule_rank() | ||
return chosen_rank | ||
data_share = defaultdict(lambda: 0) | ||
for data_id in data_ids: | ||
data_share[object_store.get_data_owner(data_id)] += object_store.get_data_size( | ||
data_id | ||
) | ||
chosen_rank = max(data_share, key=data_share.get) | ||
return chosen_rank | ||
|
||
|
||
def collect_all_data_id_from_args(value, collected_data_ids=[]): | ||
""" | ||
Collect all data ids | ||
|
||
Collect all data ids from the given value and save them to collected_data_ids list. | ||
|
||
Parameters | ||
---------- | ||
value : iterable or dict or object | ||
Arguments to be sent. | ||
collected_data_ids : list to store all collected data_ids | ||
|
||
""" | ||
if isinstance(value, (list, tuple)): | ||
for v in value: | ||
collect_all_data_id_from_args(v, collected_data_ids) | ||
elif isinstance(value, dict): | ||
for v in value.values(): | ||
collect_all_data_id_from_args(v, collected_data_ids) | ||
elif is_data_id(value): | ||
Comment on lines
+361
to
+367
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible if we don't get into any of if branches? You can add raise an exception at the end of the function to check this when testing with Modin. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here as the plan is to just collect data_ids, recursively so if the args are not data_ids, eg functions, python objects etc it is expected not to get into any if branches. Will add an exception if object store does not contain an owner. |
||
if object_store.contains_data_owner(value): | ||
collected_data_ids.append(value) | ||
else: | ||
raise ValueError("Unknown DataID!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand, there is a case when we push the data to, for instance, rank 2 during
ref = unidist.put(data)
and then we push the data owner (the same worker with rank 2) to it itself duringfoo.remote(ref)
. We should probably avoid such cases, i.e., don't push the data owner if the worker is the data owner itself.