diff --git a/modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py index 038ff0c96c8..3a3cbc6d167 100644 --- a/modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/cudf_on_ray/partitioning/partition_manager.py @@ -125,7 +125,7 @@ def from_pandas(cls, df, return_dims=False): num_splits = GpuCount.get() put_func = cls._partition_class.put # For now, we default to row partitioning - pandas_dfs = split_result_of_axis_func_pandas(0, num_splits, df) + pandas_dfs = list(split_result_of_axis_func_pandas(0, num_splits, df)) keys = [ put_func(cls._get_gpu_managers()[i], pandas_dfs[i]) for i in range(num_splits) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 12036abb76f..e04e6e29f1e 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -324,13 +324,12 @@ def _deploy_ray_func( if not extract_metadata: for item in result: yield item - return - - ip = get_node_ip_address() - for r in result: - if isinstance(r, pandas.DataFrame): - for item in [r, len(r), len(r.columns), ip]: - yield item - else: - for item in [r, None, None, ip]: - yield item + else: + ip = get_node_ip_address() + for r in result: + if isinstance(r, pandas.DataFrame): + for item in [r, len(r), len(r.columns), ip]: + yield item + else: + for item in [r, None, None, ip]: + yield item diff --git a/modin/core/storage_formats/cudf/parser.py b/modin/core/storage_formats/cudf/parser.py index ac206cfacb8..c0dcf207451 100644 --- a/modin/core/storage_formats/cudf/parser.py +++ b/modin/core/storage_formats/cudf/parser.py @@ -39,9 +39,7 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover Returns: A list of pandas DataFrames. """ - splits = split_result_of_axis_func_pandas(axis, num_splits, df) - if not isinstance(splits, list): - splits = [splits] + splits = list(split_result_of_axis_func_pandas(axis, num_splits, df)) return splits diff --git a/modin/core/storage_formats/pandas/parsers.py b/modin/core/storage_formats/pandas/parsers.py index f76ef71d128..8f5b6087f16 100644 --- a/modin/core/storage_formats/pandas/parsers.py +++ b/modin/core/storage_formats/pandas/parsers.py @@ -113,9 +113,7 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover list A list of pandas DataFrames. """ - splits = split_result_of_axis_func_pandas(axis, num_splits, df) - if not isinstance(splits, list): - splits = [splits] + splits = list(split_result_of_axis_func_pandas(axis, num_splits, df)) return splits diff --git a/modin/core/storage_formats/pandas/utils.py b/modin/core/storage_formats/pandas/utils.py index 553f8b78f1b..1cc38e8fbd9 100644 --- a/modin/core/storage_formats/pandas/utils.py +++ b/modin/core/storage_formats/pandas/utils.py @@ -81,30 +81,29 @@ def split_result_of_axis_func_pandas(axis, num_splits, result, length_list=None) """ if num_splits == 1: yield result - return - - if length_list is None: - length_list = get_length_list(result.shape[axis], num_splits) - # Inserting the first "zero" to properly compute cumsum indexing slices - length_list = np.insert(length_list, obj=0, values=[0]) - - sums = np.cumsum(length_list) - axis = 0 if isinstance(result, pandas.Series) else axis - - for i in range(len(sums) - 1): - # We do this to restore block partitioning - if axis == 0: - chunk = result.iloc[sums[i] : sums[i + 1]] - else: - chunk = result.iloc[:, sums[i] : sums[i + 1]] - - # Sliced MultiIndex still stores all encoded values of the original index, explicitly - # asking it to drop unused values in order to save memory. - if isinstance(chunk.axes[axis], pandas.MultiIndex): - chunk = chunk.set_axis( - chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False - ) - yield chunk + else: + if length_list is None: + length_list = get_length_list(result.shape[axis], num_splits) + # Inserting the first "zero" to properly compute cumsum indexing slices + length_list = np.insert(length_list, obj=0, values=[0]) + + sums = np.cumsum(length_list) + axis = 0 if isinstance(result, pandas.Series) else axis + + for i in range(len(sums) - 1): + # We do this to restore block partitioning + if axis == 0: + chunk = result.iloc[sums[i] : sums[i + 1]] + else: + chunk = result.iloc[:, sums[i] : sums[i + 1]] + + # Sliced MultiIndex still stores all encoded values of the original index, explicitly + # asking it to drop unused values in order to save memory. + if isinstance(chunk.axes[axis], pandas.MultiIndex): + chunk = chunk.set_axis( + chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False + ) + yield chunk def get_length_list(axis_len: int, num_splits: int) -> list: