Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#4445: Stop recomputing both indices for axis-wide applies. #4460

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/release_notes/release_notes-0.15.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Key Features and Updates
* FIX-#4449: Drain the call queue before waiting on result in benchmark mode (#4472)
* Performance enhancements
* FEAT-#4320: Add connectorx as an alternative engine for read_sql (#4346)
* FEAT-#4445: Stop recomputing both indices for axis-wide applies (#4460).
* Benchmarking enhancements
*
* Refactor Codebase
Expand Down
30 changes: 27 additions & 3 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,7 @@ def apply_full_axis(
new_index=None,
new_columns=None,
dtypes=None,
func_may_change_complementary_index_size=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we would pass new row lengths and new column widths from the QC?

Copy link
Collaborator

@YarShev YarShev May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QC probably shouldn't know anything about this (partition lengths) though. However, what do you think on this?

cc @mvashishtha, @devin-petersohn

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func_may_change_complementary_index_size looks a bit clumsy.

):
"""
Perform a function across an entire axis.
Expand All @@ -1884,6 +1885,14 @@ def apply_full_axis(
The data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
func_may_change_complementary_index_size : bool, optional
Whether the per-axis function may change the complementary axis
width/length for each partition. e.g. for a function that applies
per column, this parameter should only be set to false if that
function is guaranteed not to change the number of columns. Setting
this parameter to true can improve performance because the
resulting frame doesn't have to recompute the complementary axis
lengths for its partitions.

Returns
-------
Expand All @@ -1894,13 +1903,22 @@ def apply_full_axis(
-----
The data shape may change as a result of the function.
"""
new_row_lengths = None
new_column_widths = None
if not func_may_change_complementary_index_size:
if axis == 0:
new_column_widths = self._column_widths
else:
new_row_lengths = self._row_lengths
return self.broadcast_apply_full_axis(
axis=axis,
func=func,
new_index=new_index,
new_columns=new_columns,
dtypes=dtypes,
other=None,
new_row_lengths=new_row_lengths,
new_column_widths=new_column_widths,
)

@lazy_metadata_decorator(apply_axis="both")
Expand Down Expand Up @@ -2267,6 +2285,8 @@ def broadcast_apply_full_axis(
apply_indices=None,
enumerate_partitions=False,
dtypes=None,
new_row_lengths=None,
new_column_widths=None,
):
"""
Broadcast partitions of `other` Modin DataFrame and apply a function along full axis.
Expand Down Expand Up @@ -2294,6 +2314,10 @@ def broadcast_apply_full_axis(
Data types of the result. This is an optimization
because there are functions that always result in a particular data
type, and allows us to avoid (re)computing it.
new_row_lengths : list, optional
The length of each partition in the rows.
new_column_widths : list, optional
The width of each partition in the columns.

Returns
-------
Expand Down Expand Up @@ -2336,9 +2360,9 @@ def broadcast_apply_full_axis(
result = self.__constructor__(
new_partitions,
*new_axes,
None,
None,
dtypes,
column_widths=new_column_widths,
row_lengths=new_row_lengths,
dtypes=dtypes,
)
if new_index is not None:
result.synchronize_labels(axis=0)
Expand Down
18 changes: 17 additions & 1 deletion modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2434,6 +2434,7 @@ def _list_like_func(self, func, axis, *args, **kwargs):
lambda df: pandas.DataFrame(df.apply(func, axis, *args, **kwargs)),
new_index=new_index,
new_columns=new_columns,
func_may_change_complementary_index_size=False,
)
return self.__constructor__(new_modin_frame)

Expand Down Expand Up @@ -2462,8 +2463,23 @@ def _callable_func(self, func, axis, *args, **kwargs):
if callable(func):
func = wrap_udf_function(func)

if axis == 0:
new_index = None
new_columns = self._modin_frame.columns
else:
new_index = self._modin_frame.index
new_columns = None
new_modin_frame = self._modin_frame.apply_full_axis(
axis, lambda df: df.apply(func, axis=axis, *args, **kwargs)
axis,
lambda df: df.apply(
func,
axis=axis,
*args,
**kwargs,
),
new_index=new_index,
new_columns=new_columns,
func_may_change_complementary_index_size=False,
)
return self.__constructor__(new_modin_frame)

Expand Down