Skip to content

Commit

Permalink
Remove new params
Browse files Browse the repository at this point in the history
Signed-off-by: Igoshev, Iaroslav <[email protected]>
  • Loading branch information
YarShev committed Mar 1, 2023
1 parent 2c150ad commit cda0b9c
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 72 deletions.
87 changes: 39 additions & 48 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ def __init__(

self._validate_axes_lengths()
if all(obj is not None for obj in (index, columns, row_lengths, column_widths)):
# this hint allows to filter empty partitions out without triggering metadata computation
# in order to avoid useless computation over empty partitions
compute_metadata = True
else:
compute_metadata = False
Expand Down Expand Up @@ -1613,7 +1615,7 @@ def _tree_reduce_func(df, *args, **kwargs):

return _tree_reduce_func

def _compute_tree_reduce_metadata(self, axis, new_parts):
def _compute_tree_reduce_metadata(self, axis, new_parts, dtypes=None):
"""
Compute the metadata for the result of reduce function.
Expand All @@ -1623,6 +1625,10 @@ def _compute_tree_reduce_metadata(self, axis, new_parts):
The axis on which reduce function was applied.
new_parts : NumPy 2D array
Partitions with the result of applied function.
dtypes : str, optional
The data types for the result. This is an optimization
because there are functions that always result in a particular data
type, and this allows us to avoid (re)computing it.
Returns
-------
Expand All @@ -1637,12 +1643,18 @@ def _compute_tree_reduce_metadata(self, axis, new_parts):
new_axes_lengths[axis] = [1]
new_axes_lengths[axis ^ 1] = self._axes_lengths[axis ^ 1]

new_dtypes = None
if dtypes == "copy":
dtypes = self._dtypes
elif dtypes is not None:
dtypes = pandas.Series(
[np.dtype(dtypes)] * len(new_axes[1]), index=new_axes[1]
)

result = self.__constructor__(
new_parts,
*new_axes,
*new_axes_lengths,
new_dtypes,
dtypes,
)
return result

Expand Down Expand Up @@ -1681,7 +1693,7 @@ def reduce(
new_parts = self._partition_mgr_cls.map_axis_partitions(
axis.value, self._partitions, function
)
return self._compute_tree_reduce_metadata(axis.value, new_parts)
return self._compute_tree_reduce_metadata(axis.value, new_parts, dtypes=dtypes)

@lazy_metadata_decorator(apply_axis="opposite", axis_arg=0)
def tree_reduce(
Expand Down Expand Up @@ -2215,8 +2227,6 @@ def apply_full_axis(
func,
new_index=None,
new_columns=None,
new_row_lengths=None,
new_column_widths=None,
apply_indices=None,
enumerate_partitions: bool = False,
dtypes=None,
Expand All @@ -2239,12 +2249,6 @@ def apply_full_axis(
new_columns : list-like, optional
The columns of the result. We may know this in
advance, and if not provided it must be computed.
new_row_lengths : list, optional
The length of each partition in the rows. The "height" of
each of the block partitions. Is computed if not provided.
new_column_widths : list, optional
The width of each partition in the columns. The "width" of
each of the block partitions. Is computed if not provided.
apply_indices : list-like, default: None
Indices of `axis ^ 1` to apply function over.
enumerate_partitions : bool, default: False
Expand Down Expand Up @@ -2279,8 +2283,6 @@ def apply_full_axis(
func=func,
new_index=new_index,
new_columns=new_columns,
new_row_lengths=new_row_lengths,
new_column_widths=new_column_widths,
apply_indices=apply_indices,
enumerate_partitions=enumerate_partitions,
dtypes=dtypes,
Expand Down Expand Up @@ -2675,8 +2677,6 @@ def broadcast_apply_full_axis(
other,
new_index=None,
new_columns=None,
new_row_lengths=None,
new_column_widths=None,
apply_indices=None,
enumerate_partitions=False,
dtypes=None,
Expand All @@ -2701,12 +2701,6 @@ def broadcast_apply_full_axis(
new_columns : list-like, optional
Columns of the result. We may know this in
advance, and if not provided it must be computed.
new_row_lengths : list, optional
The length of each partition in the rows. The "height" of
each of the block partitions. Is computed if not provided.
new_column_widths : list, optional
The width of each partition in the columns. The "width" of
each of the block partitions. Is computed if not provided.
apply_indices : list-like, default: None
Indices of `axis ^ 1` to apply function over.
enumerate_partitions : bool, default: False
Expand Down Expand Up @@ -2770,51 +2764,48 @@ def broadcast_apply_full_axis(
keep_partitioning=keep_partitioning,
apply_func_args=apply_func_args,
)
new_dtypes = None
kw = {"row_lengths": None, "column_widths": None}
if dtypes == "copy":
new_dtypes = self._dtypes
kw["dtypes"] = self._dtypes
elif dtypes is not None:
if new_columns is None:
(
new_columns,
new_column_widths,
kw["column_widths"],
) = self._compute_axis_labels_and_lengths(1, new_partitions)
new_dtypes = pandas.Series(
kw["dtypes"] = pandas.Series(
[np.dtype(dtypes)] * len(new_columns), index=new_columns
)

if not keep_partitioning:
if new_row_lengths is None and new_index is not None:
if kw["row_lengths"] is None and new_index is not None:
if axis == 0:
new_row_lengths = get_length_list(
kw["row_lengths"] = get_length_list(
axis_len=len(new_index), num_splits=new_partitions.shape[0]
)
elif (
axis == 1
and self._row_lengths_cache is not None
and len(new_index) == sum(self._row_lengths_cache)
):
new_row_lengths = self._row_lengths_cache
if new_column_widths is None and new_columns is not None:
elif axis == 1:
if self._row_lengths_cache is not None and len(new_index) == sum(
self._row_lengths_cache
):
kw["row_lengths"] = self._row_lengths_cache
elif len(new_index) == 1:
kw["row_lengths"] = [1]
if kw["column_widths"] is None and new_columns is not None:
if axis == 1:
new_column_widths = get_length_list(
kw["column_widths"] = get_length_list(
axis_len=len(new_columns),
num_splits=new_partitions.shape[1],
)
elif (
axis == 0
and self._column_widths_cache is not None
and len(new_columns) == sum(self._column_widths_cache)
):
new_column_widths = self._column_widths_cache
elif axis == 0:
if self._column_widths_cache is not None and len(
new_columns
) == sum(self._column_widths_cache):
kw["column_widths"] = self._column_widths_cache
elif len(new_columns) == 1:
kw["column_widths"] = [1]

result = self.__constructor__(
new_partitions,
index=new_index,
columns=new_columns,
row_lengths=new_row_lengths,
column_widths=new_column_widths,
dtypes=new_dtypes,
new_partitions, index=new_index, columns=new_columns, **kw
)
if sync_labels and new_index is not None:
result.synchronize_labels(axis=0)
Expand Down
47 changes: 25 additions & 22 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2476,41 +2476,44 @@ def drop(self, index=None, columns=None, errors: str = "raise"):

def duplicated(self, **kwargs):
def _compute_hash(df):
return df.apply(
result = df.apply(
lambda s: hashlib.new("md5", str(tuple(s)).encode()).hexdigest(), axis=1
).to_frame()
)
if isinstance(result, pandas.Series):
result = result.to_frame(
result.name
if result.name is not None
else MODIN_UNNAMED_SERIES_LABEL
)
return result

def _compute_duplicated(df):
return df.duplicated(**kwargs).to_frame()
result = df.duplicated(**kwargs)
if isinstance(result, pandas.Series):
result = result.to_frame(
result.name
if result.name is not None
else MODIN_UNNAMED_SERIES_LABEL
)
return result

new_index = self._modin_frame._index_cache
new_columns = [MODIN_UNNAMED_SERIES_LABEL]
new_row_lengths = self._modin_frame._row_lengths_cache
new_column_widths = [1]
if len(self._modin_frame.column_widths) > 1:
# if the number of columns (or column partitions) we are checking for duplicates is larger than 1,
# we must first hash them to generate a single value that can be compared across rows.
hashed_modin_frame = self._modin_frame.apply_full_axis(
1,
_compute_hash,
new_index=new_index,
new_columns=new_columns,
new_row_lengths=new_row_lengths,
new_column_widths=new_column_widths,
keep_partitioning=False,
hashed_modin_frame = self._modin_frame.reduce(
axis=1,
function=_compute_hash,
dtypes=np.dtype("O"),
)
else:
hashed_modin_frame = self._modin_frame
new_modin_frame = hashed_modin_frame.apply_full_axis(
0,
_compute_duplicated,
new_index=new_index,
new_columns=new_columns,
new_row_lengths=new_row_lengths,
new_column_widths=new_column_widths,
keep_partitioning=False,
axis=0,
func=_compute_duplicated,
new_index=self._modin_frame._index_cache,
new_columns=[MODIN_UNNAMED_SERIES_LABEL],
dtypes=np.bool_,
keep_partitioning=False,
)
return self.__constructor__(new_modin_frame, shape_hint="column")

Expand Down
2 changes: 0 additions & 2 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,6 @@ def duplicated(self, subset=None, keep="first"): # noqa: PR01, RT01, D200
df = self[subset] if subset is not None else self
new_qc = df._query_compiler.duplicated(keep=keep)
duplicates = self._reduce_dimension(new_qc)
# remove Series name which was assigned automatically by .apply in QC
duplicates.name = None
return duplicates

@property
Expand Down

0 comments on commit cda0b9c

Please sign in to comment.