Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#239: Improve task scheduler for MPI backend #245

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions unidist/core/backends/mpi/core/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ def __del__(self):
def __getstate__(self):
"""Remove a reference to garbage collector for correct `pickle` serialization."""
attributes = self.__dict__.copy()
del attributes["_gc"]
if hasattr(self, "_gc"):
del attributes["_gc"]
return attributes

def base_data_id(self):
Expand All @@ -133,7 +134,7 @@ def base_data_id(self):
return DataID(self._id)


def get_logger(logger_name, file_name, activate=False):
def get_logger(logger_name, file_name, activate=True):
"""
Configure logger and get it's instance.

Expand Down
34 changes: 23 additions & 11 deletions unidist/core/backends/mpi/core/controller/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
)
from unidist.core.backends.mpi.core.controller.common import (
request_worker_data,
push_data,
push_data_owners,
RoundRobin,
choose_destination_rank,
collect_all_data_id_from_args,
push_data_directly_to_worker,
)
import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
Expand All @@ -36,6 +39,7 @@
ValueSource,
MpiPickleThreshold,
)
from unidist.core.backends.common.data_id import is_data_id


# TODO: Find a way to move this after all imports
Expand Down Expand Up @@ -227,8 +231,15 @@ def put(data):
unidist.core.backends.mpi.core.common.MasterDataID
An ID of an object in object storage.
"""
data_id = object_store.generate_data_id(garbage_collector)
object_store.put(data_id, data)
if is_data_id(data):
return data
else:
data_id = object_store.generate_data_id(garbage_collector)
dest_rank = RoundRobin.get_instance().schedule_rank()
object_store.put_data_owner(data_id, dest_rank)
object_store.put_data_size(data_id, data)
base_data_id = data_id.base_data_id()
push_data_directly_to_worker(dest_rank, base_data_id, data)

logger.debug("PUT {} id".format(data_id._id))

Expand Down Expand Up @@ -366,9 +377,17 @@ def submit(task, *args, num_returns=1, **kwargs):
# Initiate reference count based cleanup
# if all the tasks were completed
garbage_collector.regular_cleanup()
unwrapped_args = [common.unwrap_data_ids(arg) for arg in args]
unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()}

dest_rank = RoundRobin.get_instance().schedule_rank()
collected_data_ids = []
collect_all_data_id_from_args(unwrapped_args, collected_data_ids)
collect_all_data_id_from_args(unwrapped_kwargs, collected_data_ids)

# dest_rank = RoundRobin.get_instance().schedule_rank()
dest_rank = choose_destination_rank(collected_data_ids)
print(dest_rank, collected_data_ids)
push_data_owners(dest_rank, collected_data_ids)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, there is a case when we push the data to, for instance, rank 2 during ref = unidist.put(data) and then we push the data owner (the same worker with rank 2) to it itself during foo.remote(ref). We should probably avoid such cases, i.e., don't push the data owner if the worker is the data owner itself.

# simple example
import unidist

unidist.init()

o_ref = unidist.put(1)

@unidist.remote
def foo(obj):
    return obj

foo.remote(o_ref) # here we push the owner (rank 2) to the worker with rank 2

output_ids = object_store.generate_output_data_id(
dest_rank, garbage_collector, num_returns
)
Expand All @@ -384,13 +403,6 @@ def submit(task, *args, num_returns=1, **kwargs):
dest_rank, common.unwrapped_data_ids_list(output_ids)
)
)

unwrapped_args = [common.unwrap_data_ids(arg) for arg in args]
unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()}

push_data(dest_rank, unwrapped_args)
push_data(dest_rank, unwrapped_kwargs)

operation_type = common.Operation.EXECUTE
operation_data = {
"task": task,
Expand Down
109 changes: 108 additions & 1 deletion unidist/core/backends/mpi/core/controller/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import unidist.core.backends.mpi.core.communication as communication
from unidist.core.backends.mpi.core.async_operations import AsyncOperations
from unidist.core.backends.mpi.core.controller.object_store import object_store
from collections import defaultdict

logger = common.get_logger("common", "common.log")

Expand Down Expand Up @@ -166,7 +167,6 @@ def request_worker_data(data_id):
def _push_local_data(dest_rank, data_id):
"""
Send local data associated with passed ID to target rank.

Parameters
----------
dest_rank : int
Expand Down Expand Up @@ -209,6 +209,36 @@ def _push_local_data(dest_rank, data_id):
object_store.cache_send_info(data_id, dest_rank)


def push_data_directly_to_worker(dest_rank, data_id, value):
"""
Send local data_id and associated value to target rank.

Parameters
----------
dest_rank : int
Target rank.
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
value : value of the provided data_id

"""
async_operations = AsyncOperations.get_instance()
mpi_state = communication.MPIState.get_instance()
operation_type = common.Operation.PUT_DATA
operation_data = {
"id": data_id,
"data": value,
}
h_list, _ = communication.isend_complex_operation(
mpi_state.comm,
operation_type,
operation_data,
dest_rank,
is_serialized=False,
)
async_operations.extend(h_list)


def _push_data_owner(dest_rank, data_id):
"""
Send data location associated with data ID to target rank.
Expand Down Expand Up @@ -262,3 +292,80 @@ def push_data(dest_rank, value):
_push_data_owner(dest_rank, value)
else:
raise ValueError("Unknown DataID!")


def push_data_owners(dest_rank, data_ids):
"""
Parse and send data owners for all values to destination rank.

Send ID associated location to the target rank for all ranks that are not data owners.

Parameters
----------
dest_rank : int
Rank where task data is needed.
data_ids : list
List of all data_ids required by the worker
"""
for data_id in data_ids:
if object_store.contains_data_owner(data_id):
if object_store.get_data_owner(data_id) != dest_rank:
_push_data_owner(dest_rank, data_id)
else:
raise ValueError("Data owner not known")


def choose_destination_rank(data_ids):
"""
Choose destination rank considering which worker has the maximum share of data.

Get the data shares in each worker process and choose rank with the most data share.
If data_ids are empty choses the rank using the round robin approach.

Parameters
----------
data_ids : list
List of all data_ids required by the worker

Returns
-------
int
A rank number.
"""
# If none of the workers have any data the next rank is chosen by following a round robin.
if not data_ids:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is data_ids an empty list?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would for the case where there are no arguments for the remote function. So the data_ids would be empty and none of workers will have data shares so submitting them using round robin.

eg.

@unidist.remotedef foo():
    return 1 
o = foo.remote()

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put a comment describing this into the code.

chosen_rank = RoundRobin.get_instance().schedule_rank()
return chosen_rank
data_share = defaultdict(lambda: 0)
for data_id in data_ids:
data_share[object_store.get_data_owner(data_id)] += object_store.get_data_size(
data_id
)
chosen_rank = max(data_share, key=data_share.get)
return chosen_rank


def collect_all_data_id_from_args(value, collected_data_ids=[]):
"""
Collect all data ids

Collect all data ids from the given value and save them to collected_data_ids list.

Parameters
----------
value : iterable or dict or object
Arguments to be sent.
collected_data_ids : list to store all collected data_ids

"""
if isinstance(value, (list, tuple)):
for v in value:
collect_all_data_id_from_args(v, collected_data_ids)
elif isinstance(value, dict):
for v in value.values():
collect_all_data_id_from_args(v, collected_data_ids)
elif is_data_id(value):
Comment on lines +361 to +367
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible if we don't get into any of if branches? You can add raise an exception at the end of the function to check this when testing with Modin.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here as the plan is to just collect data_ids, recursively so if the args are not data_ids, eg functions, python objects etc it is expected not to get into any if branches. Will add an exception if object store does not contain an owner.

if object_store.contains_data_owner(value):
collected_data_ids.append(value)
else:
raise ValueError("Unknown DataID!")
43 changes: 42 additions & 1 deletion unidist/core/backends/mpi/core/controller/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import weakref
from collections import defaultdict
import sys

import unidist.core.backends.mpi.core.common as common
import unidist.core.backends.mpi.core.communication as communication
Expand Down Expand Up @@ -33,6 +34,8 @@ def __init__(self):
self._data_id_counter = 0
# Data serialized cache
self._serialization_cache = {}
# Data sizes
self._data_sizes = defaultdict(int)

@classmethod
def get_instance(cls):
Expand All @@ -49,7 +52,7 @@ def get_instance(cls):

def put(self, data_id, data):
"""
Put data to internal dictionary.
Put data and sizeof data to internal dictionary.

Parameters
----------
Expand All @@ -59,6 +62,28 @@ def put(self, data_id, data):
Data to be put.
"""
self._data_map[data_id] = data
self._data_sizes[data_id] = sys.getsizeof(data)

def put_data_size(self, data_id, data):
"""
Put data and sizeof data to internal dictionary.

Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.
data : object
Data to be put.
"""
# if conditions are a temprary fix as pandas sizeof has an issue
# https://github.com/pandas-dev/pandas/issues/51858
if "DataFrame" in str(type(data)):
size = data.memory_usage().sum()
elif "Series" in str(type(data)):
size = data.memory_usage()
else:
size = sys.getsizeof(data)
self._data_sizes[data_id] = size

def put_data_owner(self, data_id, rank):
"""
Expand Down Expand Up @@ -105,6 +130,22 @@ def get_data_owner(self, data_id):
"""
return self._data_owner_map[data_id]

def get_data_size(self, data_id):
"""
Get the data size.

Parameters
----------
data_id : unidist.core.backends.mpi.core.common.MasterDataID
An ID to data.

Returns
-------
int
Rank number where the data resides.
"""
return self._data_sizes[data_id]

def contains(self, data_id):
"""
Check if the data associated with `data_id` exists in a local dictionary.
Expand Down
2 changes: 2 additions & 0 deletions unidist/core/backends/mpi/core/worker/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ async def worker_loop():
# Check pending actor requests also.
task_store.check_pending_actor_tasks()

RequestStore.get_instance().check_pending_get_requests(request["id"])

elif operation_type == common.Operation.PUT_OWNER:
request = communication.recv_simple_operation(mpi_state.comm, source_rank)
object_store.put_data_owner(request["id"], request["owner"])
Expand Down
3 changes: 3 additions & 0 deletions unidist/core/backends/mpi/core/worker/request_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ def check_pending_get_requests(self, data_ids):
"""

def check_request(data_id):
# Dont process the get request if data ID still not available.
if not ObjectStore.get_instance().contains(data_id):
return
# Check non-blocking data requests for one of the workers
if data_id in self._data_request:
ranks_with_get_request = self._data_request[data_id]
Expand Down
8 changes: 7 additions & 1 deletion unidist/core/backends/mpi/core/worker/task_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def unwrap_local_data_id(self, arg):
self.request_worker_data(owner_rank, arg)
return arg, True
else:
raise ValueError("DataID is missing!")
raise ValueError("DataID is missing! {}".format(arg))
else:
return arg, False

Expand Down Expand Up @@ -357,6 +357,12 @@ def process_task_request(self, request):
w_logger.debug("Is pending - {}".format(is_pending))

if is_pending or is_kw_pending:
current_rank = communication.MPIState.get_instance().rank
if isinstance(output_ids, (list, tuple)):
for output_id in output_ids:
ObjectStore.get_instance().put_data_owner(output_id, current_rank)
else:
ObjectStore.get_instance().put_data_owner(output_ids, current_rank)
request["args"] = args
request["kwargs"] = kwargs
return request
Expand Down