Skip to content

Commit

Permalink
pr comments
Browse files Browse the repository at this point in the history
Signed-off-by: arunjose696 <[email protected]>
  • Loading branch information
arunjose696 committed Mar 7, 2024
1 parent 7d4e663 commit 74e8a13
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def from_pandas(cls, df, return_dims=False):
num_splits = GpuCount.get()
put_func = cls._partition_class.put
# For now, we default to row partitioning
pandas_dfs = split_result_of_axis_func_pandas(0, num_splits, df)
pandas_dfs = list(split_result_of_axis_func_pandas(0, num_splits, df))
keys = [
put_func(cls._get_gpu_managers()[i], pandas_dfs[i])
for i in range(num_splits)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,12 @@ def _deploy_ray_func(
if not extract_metadata:
for item in result:
yield item
return

ip = get_node_ip_address()
for r in result:
if isinstance(r, pandas.DataFrame):
for item in [r, len(r), len(r.columns), ip]:
yield item
else:
for item in [r, None, None, ip]:
yield item
else:
ip = get_node_ip_address()
for r in result:
if isinstance(r, pandas.DataFrame):
for item in [r, len(r), len(r.columns), ip]:
yield item
else:
for item in [r, None, None, ip]:
yield item
4 changes: 1 addition & 3 deletions modin/core/storage_formats/cudf/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover
Returns:
A list of pandas DataFrames.
"""
splits = split_result_of_axis_func_pandas(axis, num_splits, df)
if not isinstance(splits, list):
splits = [splits]
splits = list(split_result_of_axis_func_pandas(axis, num_splits, df))
return splits


Expand Down
4 changes: 1 addition & 3 deletions modin/core/storage_formats/pandas/parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,7 @@ def _split_result_for_readers(axis, num_splits, df): # pragma: no cover
list
A list of pandas DataFrames.
"""
splits = split_result_of_axis_func_pandas(axis, num_splits, df)
if not isinstance(splits, list):
splits = [splits]
splits = list(split_result_of_axis_func_pandas(axis, num_splits, df))
return splits


Expand Down
47 changes: 23 additions & 24 deletions modin/core/storage_formats/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,30 +81,29 @@ def split_result_of_axis_func_pandas(axis, num_splits, result, length_list=None)
"""
if num_splits == 1:
yield result
return

if length_list is None:
length_list = get_length_list(result.shape[axis], num_splits)
# Inserting the first "zero" to properly compute cumsum indexing slices
length_list = np.insert(length_list, obj=0, values=[0])

sums = np.cumsum(length_list)
axis = 0 if isinstance(result, pandas.Series) else axis

for i in range(len(sums) - 1):
# We do this to restore block partitioning
if axis == 0:
chunk = result.iloc[sums[i] : sums[i + 1]]
else:
chunk = result.iloc[:, sums[i] : sums[i + 1]]

# Sliced MultiIndex still stores all encoded values of the original index, explicitly
# asking it to drop unused values in order to save memory.
if isinstance(chunk.axes[axis], pandas.MultiIndex):
chunk = chunk.set_axis(
chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False
)
yield chunk
else:
if length_list is None:
length_list = get_length_list(result.shape[axis], num_splits)
# Inserting the first "zero" to properly compute cumsum indexing slices
length_list = np.insert(length_list, obj=0, values=[0])

sums = np.cumsum(length_list)
axis = 0 if isinstance(result, pandas.Series) else axis

for i in range(len(sums) - 1):
# We do this to restore block partitioning
if axis == 0:
chunk = result.iloc[sums[i] : sums[i + 1]]
else:
chunk = result.iloc[:, sums[i] : sums[i + 1]]

# Sliced MultiIndex still stores all encoded values of the original index, explicitly
# asking it to drop unused values in order to save memory.
if isinstance(chunk.axes[axis], pandas.MultiIndex):
chunk = chunk.set_axis(
chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False
)
yield chunk


def get_length_list(axis_len: int, num_splits: int) -> list:
Expand Down

0 comments on commit 74e8a13

Please sign in to comment.