Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#7004: use generators when returning from _deploy_ray_func remote function. #7005

Merged
merged 5 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from modin.core.dataframe.base.partitioning.axis_partition import (
BaseDataframeAxisPartition,
)
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas
from modin.core.storage_formats.pandas.utils import (
generate_result_of_axis_func_pandas,
split_result_of_axis_func_pandas,
)

from .partition import PandasDataframePartition

Expand Down Expand Up @@ -388,6 +391,7 @@ def deploy_axis_func(
*partitions,
lengths=None,
manual_partition=False,
return_generator=False,
):
"""
Deploy a function along a full axis.
Expand All @@ -413,11 +417,14 @@ def deploy_axis_func(
The list of lengths to shuffle the object.
manual_partition : bool, default: False
If True, partition the result with `lengths`.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.

Returns
-------
list
A list of pandas DataFrames.
list | Generator
A list or generator of pandas DataFrames.
"""
dataframe = pandas.concat(list(partitions), axis=axis, copy=False)
with warnings.catch_warnings():
Expand Down Expand Up @@ -451,7 +458,12 @@ def deploy_axis_func(
lengths = [len(part.columns) for part in partitions]
if sum(lengths) != len(result.columns):
lengths = None
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
if return_generator:
return generate_result_of_axis_func_pandas(
axis, num_splits, result, lengths
)
else:
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
Fixed Show fixed Hide fixed

@classmethod
def deploy_func_between_two_axis_partitions(
Expand All @@ -464,6 +476,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
return_generator=False,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -487,11 +500,14 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.

Returns
-------
list
A list of pandas DataFrames.
list | Generator
A list or generator of pandas DataFrames.
"""
lt_frame = pandas.concat(partitions[:len_of_left], axis=axis, copy=False)

Expand All @@ -510,7 +526,18 @@ def deploy_func_between_two_axis_partitions(
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
return split_result_of_axis_func_pandas(axis, num_splits, result)
if return_generator:
return generate_result_of_axis_func_pandas(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return generate_result_of_axis_func_pandas(
yield from generate_result_of_axis_func_pandas(

?

Copy link
Collaborator Author

@arunjose696 arunjose696 Mar 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldnt work because using yeild in a function would turn it to a generator.

We do not require generators but lists for some branches of if , For the backends such as dask as we try to return a list of partitions, but as there is yield statement in the function a generator would still be returned and thus partitions would be empty when materialized.

https://stackoverflow.com/questions/26595895/return-and-yield-in-the-same-function

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean not just yield but yield from. Would it work?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked with yield from as well still the function returns a generator when called so the code fails for dask.

axis,
num_splits,
result,
)
else:
return split_result_of_axis_func_pandas(
axis,
num_splits,
result,
)
Fixed Show fixed Hide fixed

@classmethod
def drain(cls, df: pandas.DataFrame, call_queue: list):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def deploy_axis_func(
f_kwargs=f_kwargs,
manual_partition=manual_partition,
lengths=lengths,
return_generator=True,
)

@classmethod
Expand Down Expand Up @@ -244,6 +245,7 @@ def deploy_func_between_two_axis_partitions(
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
return_generator=True,
)

def wait(self):
Expand Down Expand Up @@ -320,12 +322,16 @@ def _deploy_ray_func(
f_args = positional_args[:f_len_args]
deploy_args = positional_args[f_len_args:]
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs)

if not extract_metadata:
return result
ip = get_node_ip_address()
if isinstance(result, pandas.DataFrame):
return result, len(result), len(result.columns), ip
elif all(isinstance(r, pandas.DataFrame) for r in result):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we check if all parts of result are dataframes. What are the scenarios where the result would be heterogeneous( composed of dataframes and non dataframes)?
One possibility I can think of is results could have errors, in this scenario I think it would it be sufficient to send [r, None, None, ip] for the errors and send[r, len(r), len(r.columns), ip]for the results that are dataframes. Would this suffice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would suffice.

return [i for r in result for i in [r, len(r), len(r.columns), ip]]
for item in result:
yield item
else:
return [i for r in result for i in [r, None, None, ip]]
ip = get_node_ip_address()
for r in result:
if isinstance(r, pandas.DataFrame):
for item in [r, len(r), len(r.columns), ip]:
yield item
else:
for item in [r, None, None, ip]:
yield item
55 changes: 55 additions & 0 deletions modin/core/storage_formats/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,61 @@ def split_result_of_axis_func_pandas(
]


def generate_result_of_axis_func_pandas(
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
axis, num_splits, result, length_list=None, min_block_size=None
):
"""
Generate pandas DataFrame evenly based on the provided number of splits.

Parameters
----------
axis : {0, 1}
Axis to split across. 0 means index axis when 1 means column axis.
num_splits : int
Number of splits to separate the DataFrame into.
This parameter is ignored if `length_list` is specified.
result : pandas.DataFrame
DataFrame to split.
length_list : list of ints, optional
List of slice lengths to split DataFrame into. This is used to
return the DataFrame to its original partitioning schema.
min_block_size : int, optional
Minimum number of rows/columns in a single split.
If not specified, the value is assumed equal to ``MinPartitionSize``.

Yields
------
Generator
Generates 'num_splits' dataframes as a result of axis function.
"""
if num_splits == 1:
yield result
else:
if length_list is None:
length_list = get_length_list(
result.shape[axis], num_splits, min_block_size
)
# Inserting the first "zero" to properly compute cumsum indexing slices
length_list = np.insert(length_list, obj=0, values=[0])
sums = np.cumsum(length_list)
axis = 0 if isinstance(result, pandas.Series) else axis

for i in range(len(sums) - 1):
# We do this to restore block partitioning
if axis == 0:
chunk = result.iloc[sums[i] : sums[i + 1]]
else:
chunk = result.iloc[:, sums[i] : sums[i + 1]]

# Sliced MultiIndex still stores all encoded values of the original index, explicitly
# asking it to drop unused values in order to save memory.
if isinstance(chunk.axes[axis], pandas.MultiIndex):
chunk = chunk.set_axis(
chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False
)
yield chunk


def get_length_list(axis_len: int, num_splits: int, min_block_size=None) -> list:
"""
Compute partitions lengths along the axis with the specified number of splits.
Expand Down
Loading