Skip to content

Commit

Permalink
calling split_result_of_axis_func_pandas only for ray
Browse files Browse the repository at this point in the history
Signed-off-by: arunjose696 <[email protected]>
  • Loading branch information
arunjose696 committed Mar 11, 2024
1 parent 5fb4465 commit 004e6e6
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 11 deletions.
41 changes: 34 additions & 7 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from modin.core.dataframe.base.partitioning.axis_partition import (
BaseDataframeAxisPartition,
)
from modin.core.storage_formats.pandas.utils import split_result_of_axis_func_pandas
from modin.core.storage_formats.pandas.utils import (
generate_result_of_axis_func_pandas,
split_result_of_axis_func_pandas,
)

from .partition import PandasDataframePartition

Expand Down Expand Up @@ -388,6 +391,7 @@ def deploy_axis_func(
*partitions,
lengths=None,
manual_partition=False,
return_generator=False,
):
"""
Deploy a function along a full axis.
Expand All @@ -413,11 +417,14 @@ def deploy_axis_func(
The list of lengths to shuffle the object.
manual_partition : bool, default: False
If True, partition the result with `lengths`.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.
Returns
-------
list
A list of pandas DataFrames.
list | Generator
A list or generator of pandas DataFrames.
"""
dataframe = pandas.concat(list(partitions), axis=axis, copy=False)
with warnings.catch_warnings():
Expand Down Expand Up @@ -451,7 +458,12 @@ def deploy_axis_func(
lengths = [len(part.columns) for part in partitions]
if sum(lengths) != len(result.columns):
lengths = None
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
if return_generator:
return generate_result_of_axis_func_pandas(
axis, num_splits, result, lengths
)
else:
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)

@classmethod
def deploy_func_between_two_axis_partitions(
Expand All @@ -464,6 +476,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
return_generator=False,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -487,11 +500,14 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
return_generator : bool, default: False
Return a generator from the function, set to `True` for Ray backend
as Ray remote functions can return Generators.
Returns
-------
list
A list of pandas DataFrames.
list | Generator
A list or generator of pandas DataFrames.
"""
lt_frame = pandas.concat(partitions[:len_of_left], axis=axis, copy=False)

Expand All @@ -510,7 +526,18 @@ def deploy_func_between_two_axis_partitions(
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
return split_result_of_axis_func_pandas(axis, num_splits, result)
if return_generator:
return generate_result_of_axis_func_pandas(
axis,
num_splits,
result,
)
else:
return split_result_of_axis_func_pandas(
axis,
num_splits,
result,
)

@classmethod
def drain(cls, df: pandas.DataFrame, call_queue: list):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def from_pandas(cls, df, return_dims=False):
num_splits = GpuCount.get()
put_func = cls._partition_class.put
# For now, we default to row partitioning
pandas_dfs = list(split_result_of_axis_func_pandas(0, num_splits, df))
pandas_dfs = split_result_of_axis_func_pandas(0, num_splits, df)
keys = [
put_func(cls._get_gpu_managers()[i], pandas_dfs[i])
for i in range(num_splits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def deploy_axis_func(
f_kwargs=f_kwargs,
manual_partition=manual_partition,
lengths=lengths,
return_generator=True,
)

@classmethod
Expand Down Expand Up @@ -244,6 +245,7 @@ def deploy_func_between_two_axis_partitions(
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
return_generator=True,
)

def wait(self):
Expand Down
4 changes: 3 additions & 1 deletion modin/core/storage_formats/cudf/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover
Returns:
A list of pandas DataFrames.
"""
splits = list(split_result_of_axis_func_pandas(axis, num_splits, df))
splits = split_result_of_axis_func_pandas(axis, num_splits, df)
if not isinstance(splits, list):
splits = [splits]
return splits


Expand Down
4 changes: 3 additions & 1 deletion modin/core/storage_formats/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover
list
A list of pandas DataFrames.
"""
splits = list(split_result_of_axis_func_pandas(axis, num_splits, df))
splits = split_result_of_axis_func_pandas(axis, num_splits, df)
if not isinstance(splits, list):
splits = [splits]
return splits


Expand Down
61 changes: 60 additions & 1 deletion modin/core/storage_formats/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,70 @@ def split_result_of_axis_func_pandas(
list of pandas.DataFrames
Splitted dataframe represented by list of frames.
"""
if num_splits == 1:
return [result]

if length_list is None:
length_list = get_length_list(result.shape[axis], num_splits, min_block_size)
# Inserting the first "zero" to properly compute cumsum indexing slices
length_list = np.insert(length_list, obj=0, values=[0])

sums = np.cumsum(length_list)
axis = 0 if isinstance(result, pandas.Series) else axis
# We do this to restore block partitioning
if axis == 0:
chunked = [result.iloc[sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]
else:
chunked = [result.iloc[:, sums[i] : sums[i + 1]] for i in range(len(sums) - 1)]

return [
# Sliced MultiIndex still stores all encoded values of the original index, explicitly
# asking it to drop unused values in order to save memory.
(
chunk.set_axis(
chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False
)
if isinstance(chunk.axes[axis], pandas.MultiIndex)
else chunk
)
for chunk in chunked
]


def generate_result_of_axis_func_pandas(
axis, num_splits, result, length_list=None, min_block_size=None
):
"""
Generate pandas DataFrame evenly based on the provided number of splits.
Parameters
----------
axis : {0, 1}
Axis to split across. 0 means index axis when 1 means column axis.
num_splits : int
Number of splits to separate the DataFrame into.
This parameter is ignored if `length_list` is specified.
result : pandas.DataFrame
DataFrame to split.
length_list : list of ints, optional
List of slice lengths to split DataFrame into. This is used to
return the DataFrame to its original partitioning schema.
min_block_size : int, optional
Minimum number of rows/columns in a single split.
If not specified, the value is assumed equal to ``MinPartitionSize``.
Yields
------
Generator
Generates 'num_splits' dataframes as a result of axis function.
"""
if num_splits == 1:
yield result
else:
if length_list is None:
length_list = get_length_list(result.shape[axis], num_splits,min_block_size)
length_list = get_length_list(
result.shape[axis], num_splits, min_block_size
)
# Inserting the first "zero" to properly compute cumsum indexing slices
length_list = np.insert(length_list, obj=0, values=[0])
sums = np.cumsum(length_list)
Expand Down

0 comments on commit 004e6e6

Please sign in to comment.