Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PERF-#5613: Optimize duplicated in case there is only one column partition #5640

Merged
merged 5 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 29 additions & 16 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1609,7 +1609,7 @@ def _tree_reduce_func(df, *args, **kwargs):

return _tree_reduce_func

def _compute_tree_reduce_metadata(self, axis, new_parts):
def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None):
"""
Compute the metadata for the result of reduce function.

Expand All @@ -1619,6 +1619,10 @@ def _compute_tree_reduce_metadata(self, axis, new_parts):
The axis on which reduce function was applied.
new_parts : NumPy 2D array
Partitions with the result of applied function.
dtypes : str, optional
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.

Returns
-------
Expand All @@ -1633,12 +1637,18 @@ def _compute_tree_reduce_metadata(self, axis, new_parts):
new_axes_lengths[axis] = [1]
new_axes_lengths[axis ^ 1] = self._axes_lengths[axis ^ 1]

new_dtypes = None
if dtypes == "copy":
dtypes = self._dtypes
elif dtypes is not None:
dtypes = pandas.Series(
[np.dtype(dtypes)] * len(new_axes[1]), index=new_axes[1]
)

result = self.__constructor__(
new_parts,
*new_axes,
*new_axes_lengths,
new_dtypes,
dtypes,
)
return result

Expand Down Expand Up @@ -1677,7 +1687,7 @@ def reduce(
new_parts = self._partition_mgr_cls.map_axis_partitions(
axis.value, self._partitions, function
)
return self._compute_tree_reduce_metadata(axis.value, new_parts)
return self._compute_tree_reduce_metadata(axis.value, new_parts, dtypes=dtypes)

@lazy_metadata_decorator(apply_axis="opposite", axis_arg=0)
def tree_reduce(
Expand Down Expand Up @@ -2767,24 +2777,27 @@ def broadcast_apply_full_axis(
kw["row_lengths"] = get_length_list(
axis_len=len(new_index), num_splits=new_partitions.shape[0]
)
elif (
axis == 1
and self._row_lengths_cache is not None
and len(new_index) == sum(self._row_lengths_cache)
):
kw["row_lengths"] = self._row_lengths_cache
elif axis == 1:
if self._row_lengths_cache is not None and len(new_index) == sum(
self._row_lengths_cache
):
kw["row_lengths"] = self._row_lengths_cache
elif len(new_index) == 1 and new_partitions.shape[0] == 1:
kw["row_lengths"] = [1]
if kw["column_widths"] is None and new_columns is not None:
if axis == 1:
kw["column_widths"] = get_length_list(
axis_len=len(new_columns),
num_splits=new_partitions.shape[1],
)
elif (
axis == 0
and self._column_widths_cache is not None
and len(new_columns) == sum(self._column_widths_cache)
):
kw["column_widths"] = self._column_widths_cache
elif axis == 0:
if self._column_widths_cache is not None and len(
new_columns
) == sum(self._column_widths_cache):
kw["column_widths"] = self._column_widths_cache
elif len(new_columns) == 1 and new_partitions.shape[1] == 1:
kw["column_widths"] = [1]

result = self.__constructor__(
new_partitions, index=new_index, columns=new_columns, **kw
)
Expand Down
47 changes: 28 additions & 19 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2476,35 +2476,44 @@ def drop(self, index=None, columns=None, errors: str = "raise"):

def duplicated(self, **kwargs):
def _compute_hash(df):
return df.apply(
result = df.apply(
lambda s: hashlib.new("md5", str(tuple(s)).encode()).hexdigest(), axis=1
).to_frame()
)
if isinstance(result, pandas.Series):
result = result.to_frame(
result.name
if result.name is not None
else MODIN_UNNAMED_SERIES_LABEL
)
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
return result

def _compute_duplicated(df):
return df.duplicated(**kwargs).to_frame()
result = df.duplicated(**kwargs)
if isinstance(result, pandas.Series):
result = result.to_frame(
result.name
if result.name is not None
else MODIN_UNNAMED_SERIES_LABEL
)
return result

new_index = self._modin_frame._index_cache
new_columns = [MODIN_UNNAMED_SERIES_LABEL]
if len(self.columns) > 1:
# if the number of columns we are checking for duplicates is larger than 1,
# we must hash them to generate a single value that can be compared across rows.
hashed_modin_frame = self._modin_frame.apply_full_axis(
1,
_compute_hash,
new_index=new_index,
new_columns=new_columns,
keep_partitioning=False,
if self._modin_frame._partitions.shape[1] > 1:
# if the number of columns (or column partitions) we are checking for duplicates is larger than 1,
# we must first hash them to generate a single value that can be compared across rows.
hashed_modin_frame = self._modin_frame.reduce(
axis=1,
function=_compute_hash,
dtypes=np.dtype("O"),
)
else:
hashed_modin_frame = self._modin_frame
new_modin_frame = hashed_modin_frame.apply_full_axis(
0,
_compute_duplicated,
new_index=new_index,
new_columns=new_columns,
keep_partitioning=False,
axis=0,
func=_compute_duplicated,
new_index=self._modin_frame._index_cache,
new_columns=[MODIN_UNNAMED_SERIES_LABEL],
dtypes=np.bool_,
keep_partitioning=False,
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
)
return self.__constructor__(new_modin_frame, shape_hint="column")

Expand Down
2 changes: 0 additions & 2 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,6 @@ def duplicated(self, subset=None, keep="first"): # noqa: PR01, RT01, D200
df = self[subset] if subset is not None else self
new_qc = df._query_compiler.duplicated(keep=keep)
duplicates = self._reduce_dimension(new_qc)
# remove Series name which was assigned automatically by .apply in QC
duplicates.name = None
anmyachev marked this conversation as resolved.
Show resolved Hide resolved
return duplicates

@property
Expand Down