From a96639529a2e121f24b8c5462e1e76c243b85ede Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Fri, 1 Mar 2024 12:10:09 +0100 Subject: [PATCH] FEAT-#6965: Implement `.merge()` using range-partitioning implementation (#6966) Signed-off-by: Dmitry Chigarev --- .../actions/run-core-tests/group_2/action.yml | 2 + .github/workflows/ci.yml | 1 + .github/workflows/push-to-master.yml | 1 + docs/flow/modin/experimental/index.rst | 2 +- .../range_partitioning_groupby.rst | 6 + modin/config/__init__.py | 2 + modin/config/envvars.py | 12 + .../dataframe/pandas/dataframe/dataframe.py | 68 ++++ .../pandas/partitioning/partition_manager.py | 57 +++- modin/core/storage_formats/pandas/merge.py | 302 ++++++++++++++++++ .../storage_formats/pandas/query_compiler.py | 178 +---------- modin/pandas/test/dataframe/test_join_sort.py | 37 ++- 12 files changed, 484 insertions(+), 184 deletions(-) create mode 100644 modin/core/storage_formats/pandas/merge.py diff --git a/.github/actions/run-core-tests/group_2/action.yml b/.github/actions/run-core-tests/group_2/action.yml index d330e65061a..3022acf43a2 100644 --- a/.github/actions/run-core-tests/group_2/action.yml +++ b/.github/actions/run-core-tests/group_2/action.yml @@ -20,3 +20,5 @@ runs: modin/pandas/test/dataframe/test_pickle.py echo "::endgroup::" shell: bash -l {0} + - run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge" + shell: bash -l {0} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 37dc948aa7e..64c0e0e4a13 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -188,6 +188,7 @@ jobs: - run: python -m pytest modin/pandas/test/dataframe/test_binary.py - run: python -m pytest modin/pandas/test/dataframe/test_reduce.py - run: python -m pytest modin/pandas/test/dataframe/test_join_sort.py + - run: MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge" - run: python -m pytest modin/pandas/test/test_general.py - run: python -m pytest modin/pandas/test/dataframe/test_indexing.py - run: python -m pytest modin/pandas/test/test_series.py diff --git a/.github/workflows/push-to-master.yml b/.github/workflows/push-to-master.yml index 35e41bcec81..1355203f86a 100644 --- a/.github/workflows/push-to-master.yml +++ b/.github/workflows/push-to-master.yml @@ -46,6 +46,7 @@ jobs: python -m pytest modin/pandas/test/dataframe/test_indexing.py python -m pytest modin/pandas/test/dataframe/test_iter.py python -m pytest modin/pandas/test/dataframe/test_join_sort.py + MODIN_RANGE_PARTITIONING=1 python -m pytest modin/pandas/test/dataframe/test_join_sort.py -k "merge" python -m pytest modin/pandas/test/dataframe/test_map_metadata.py python -m pytest modin/pandas/test/dataframe/test_reduce.py python -m pytest modin/pandas/test/dataframe/test_udf.py diff --git a/docs/flow/modin/experimental/index.rst b/docs/flow/modin/experimental/index.rst index 6e5a1607a9d..90840e2c800 100644 --- a/docs/flow/modin/experimental/index.rst +++ b/docs/flow/modin/experimental/index.rst @@ -15,7 +15,7 @@ and provides a limited set of functionality: * :doc:`xgboost ` * :doc:`sklearn ` * :doc:`batch ` -* :doc:`Range-partitioning GroupBy implementation ` +* :doc:`Range-partitioning implementations ` .. toctree:: diff --git a/docs/flow/modin/experimental/range_partitioning_groupby.rst b/docs/flow/modin/experimental/range_partitioning_groupby.rst index ef82967ce15..b575badcc4f 100644 --- a/docs/flow/modin/experimental/range_partitioning_groupby.rst +++ b/docs/flow/modin/experimental/range_partitioning_groupby.rst @@ -72,3 +72,9 @@ implementation with the respective warning if it meets an unsupported case: ... # Range-partitioning groupby is only supported when grouping on a column(s) of the same frame. ... # https://github.com/modin-project/modin/issues/5926 ... # Falling back to a TreeReduce implementation. + +Range-partitioning Merge +"""""""""""""""""""""""" + +It is recommended to use this implementation if the right dataframe in merge is as big as +the left dataframe. In this case, range-partitioning implementation works faster and consumes less RAM. diff --git a/modin/config/__init__.py b/modin/config/__init__.py index c0107f814d6..0450782b4f6 100644 --- a/modin/config/__init__.py +++ b/modin/config/__init__.py @@ -44,6 +44,7 @@ NPartitions, PersistentPickle, ProgressBar, + RangePartitioning, RangePartitioningGroupby, RayRedisAddress, RayRedisPassword, @@ -92,6 +93,7 @@ "ModinNumpy", "ExperimentalNumPyAPI", "RangePartitioningGroupby", + "RangePartitioning", "ExperimentalGroupbyImpl", "AsyncReadMode", "ReadSqlEngine", diff --git a/modin/config/envvars.py b/modin/config/envvars.py index 56d821baf3b..ca581c41315 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -770,6 +770,18 @@ def _sibling(cls) -> type[EnvWithSibilings]: ) +class RangePartitioning(EnvironmentVariable, type=bool): + """ + Set to true to use Modin's range-partitioning implementation where possible. + + Please refer to documentation for cases where enabling this options would be beneficial: + https://modin.readthedocs.io/en/stable/flow/modin/experimental/range_partitioning_groupby.html + """ + + varname = "MODIN_RANGE_PARTITIONING" + default = False + + class CIAWSSecretAccessKey(EnvironmentVariable, type=str): """Set to AWS_SECRET_ACCESS_KEY when running mock S3 tests for Modin in GitHub CI.""" diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 4235370497f..bf2963e5806 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -3881,6 +3881,74 @@ def _compute_new_widths(): new_partitions, new_index, new_columns, new_lengths, new_widths, new_dtypes ) + def _apply_func_to_range_partitioning_broadcast( + self, right, func, key, new_index=None, new_columns=None, new_dtypes=None + ): + """ + Apply `func` against two dataframes using range-partitioning implementation. + + The method first builds range-partitioning for both dataframes using the data from + `self[key]`, after that, it applies `func` row-wise to `self` frame and + broadcasts row-parts of `right` to `self`. + + Parameters + ---------- + right : PandasDataframe + func : callable(left : pandas.DataFrame, right : pandas.DataFrame) -> pandas.DataFrame + key : list of labels + Columns to use to build range-partitioning. Must present in both dataframes. + new_index : pandas.Index, optional + Index values to write to the result's cache. + new_columns : pandas.Index, optional + Column values to write to the result's cache. + new_dtypes : pandas.Series or ModinDtypes, optional + Dtype values to write to the result's cache. + + Returns + ------- + PandasDataframe + """ + if self._partitions.shape[0] == 1: + result = self.broadcast_apply_full_axis( + axis=1, + func=func, + new_columns=new_columns, + dtypes=new_dtypes, + other=right, + ) + return result + + if not isinstance(key, list): + key = [key] + + shuffling_functions = ShuffleSortFunctions( + self, + key, + ascending=True, + ideal_num_new_partitions=self._partitions.shape[0], + ) + + # here we want to get indices of those partitions that hold the key columns + key_indices = self.columns.get_indexer_for(key) + partition_indices = np.unique( + np.digitize(key_indices, np.cumsum(self.column_widths)) + ) + + new_partitions = self._partition_mgr_cls.shuffle_partitions( + self._partitions, + partition_indices, + shuffling_functions, + func, + right_partitions=right._partitions, + ) + + return self.__constructor__( + new_partitions, + index=new_index, + columns=new_columns, + dtypes=new_dtypes, + ) + @lazy_metadata_decorator(apply_axis="both") def groupby( self, diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index f9a02f7dcad..0f03dabcb4a 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -1722,6 +1722,7 @@ def shuffle_partitions( index, shuffle_functions: "ShuffleFunctions", final_shuffle_func, + right_partitions=None, ): """ Return shuffled partitions. @@ -1736,6 +1737,9 @@ def shuffle_partitions( An object implementing the functions that we will be using to perform this shuffle. final_shuffle_func : Callable(pandas.DataFrame) -> pandas.DataFrame Function that shuffles the data within each new partition. + right_partitions : np.ndarray, optional + Partitions to broadcast to `self` partitions. If specified, the method builds range-partitioning + for `right_partitions` basing on bins calculated for `partitions`, then performs broadcasting. Returns ------- @@ -1774,18 +1778,57 @@ def shuffle_partitions( for partition in row_partitions ] ).T - # We need to convert every partition that came from the splits into a full-axis column partition. - new_partitions = [ + + if right_partitions is None: + # We need to convert every partition that came from the splits into a column partition. + return np.array( + [ + [ + cls._column_partitions_class( + row_partition, full_axis=False + ).apply(final_shuffle_func) + ] + for row_partition in split_row_partitions + ] + ) + + right_row_parts = cls.row_partitions(right_partitions) + right_split_row_partitions = np.array( + [ + partition.split( + shuffle_functions.split_fn, + num_splits=num_bins, + extract_metadata=False, + ) + for partition in right_row_parts + ] + ).T + return np.array( [ cls._column_partitions_class(row_partition, full_axis=False).apply( - final_shuffle_func + final_shuffle_func, + other_axis_partition=cls._column_partitions_class( + right_row_partitions + ), + ) + for right_row_partitions, row_partition in zip( + right_split_row_partitions, split_row_partitions ) ] - for row_partition in split_row_partitions - ] - return np.array(new_partitions) + ) + else: # If there are not pivots we can simply apply the function row-wise + if right_partitions is None: + return np.array( + [row_part.apply(final_shuffle_func) for row_part in row_partitions] + ) + right_row_parts = cls.row_partitions(right_partitions) return np.array( - [row_part.apply(final_shuffle_func) for row_part in row_partitions] + [ + row_part.apply( + final_shuffle_func, other_axis_partition=right_row_part + ) + for right_row_part, row_part in zip(right_row_parts, row_partitions) + ] ) diff --git a/modin/core/storage_formats/pandas/merge.py b/modin/core/storage_formats/pandas/merge.py new file mode 100644 index 00000000000..9a3705ce3c1 --- /dev/null +++ b/modin/core/storage_formats/pandas/merge.py @@ -0,0 +1,302 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Contains implementations for Merge/Join.""" + +import pandas +from pandas.core.dtypes.common import is_list_like +from pandas.errors import MergeError + +from modin.core.dataframe.base.dataframe.utils import join_columns +from modin.core.dataframe.pandas.metadata import ModinDtypes + +from .utils import merge_partitioning + + +# TODO: add methods for 'join' here +class MergeImpl: + """Provide implementations for merge/join.""" + + @classmethod + def range_partitioning_merge(cls, left, right, kwargs): + """ + Execute merge using range-partitioning implementation. + + Parameters + ---------- + left : PandasQueryCompiler + right : PandasQueryCompiler + kwargs : dict + Keyword arguments for ``pandas.merge()`` function. + + Returns + ------- + PandasQueryCompiler + """ + if ( + kwargs.get("left_index", False) + or kwargs.get("right_index", False) + or kwargs.get("left_on", None) is not None + or kwargs.get("left_on", None) is not None + or kwargs.get("how", "left") not in ("left", "inner") + ): + raise NotImplementedError( + f"The passed parameters are not yet supported by range-partitioning merge: {kwargs=}" + ) + + on = kwargs.get("on", None) + if on is not None and not isinstance(on, list): + on = [on] + if on is None or len(on) > 1: + raise NotImplementedError( + f"Merging on multiple columns is not yet supported by range-partitioning merge: {on=}" + ) + + if any(col not in left.columns or col not in right.columns for col in on): + raise NotImplementedError( + "Merging on an index level is not yet supported by range-partitioning merge." + ) + + def func(left, right): + return left.merge(right, **kwargs) + + new_columns, new_dtypes = cls._compute_result_metadata( + left, + right, + on, + left_on=None, + right_on=None, + suffixes=kwargs.get("suffixes", ("_x", "_y")), + ) + + return left.__constructor__( + left._modin_frame._apply_func_to_range_partitioning_broadcast( + right._modin_frame, + func=func, + key=on, + new_columns=new_columns, + new_dtypes=new_dtypes, + ) + # pandas resets the index of the result unless we were merging on an index level, + # the current implementation only supports merging on column names, so dropping + # the index unconditionally + ).reset_index(drop=True) + + @classmethod + def row_axis_merge(cls, left, right, kwargs): + """ + Execute merge using row-axis implementation. + + Parameters + ---------- + left : PandasQueryCompiler + right : PandasQueryCompiler + kwargs : dict + Keyword arguments for ``pandas.merge()`` function. + + Returns + ------- + PandasQueryCompiler + """ + how = kwargs.get("how", "inner") + on = kwargs.get("on", None) + left_on = kwargs.get("left_on", None) + right_on = kwargs.get("right_on", None) + left_index = kwargs.get("left_index", False) + right_index = kwargs.get("right_index", False) + sort = kwargs.get("sort", False) + right_to_broadcast = right._modin_frame.combine() + + if how in ["left", "inner"] and left_index is False and right_index is False: + kwargs["sort"] = False + + def should_keep_index(left, right): + keep_index = False + if left_on is not None and right_on is not None: + keep_index = any( + o in left.index.names + and o in right_on + and o in right.index.names + for o in left_on + ) + elif on is not None: + keep_index = any( + o in left.index.names and o in right.index.names for o in on + ) + return keep_index + + def map_func( + left, right, *axis_lengths, kwargs=kwargs, **service_kwargs + ): # pragma: no cover + df = pandas.merge(left, right, **kwargs) + + if kwargs["how"] == "left": + partition_idx = service_kwargs["partition_idx"] + if len(axis_lengths): + if not should_keep_index(left, right): + # Doesn't work for "inner" case, since the partition sizes of the + # left dataframe may change + start = sum(axis_lengths[:partition_idx]) + stop = sum(axis_lengths[: partition_idx + 1]) + + df.index = pandas.RangeIndex(start, stop) + + return df + + # Want to ensure that these are python lists + if left_on is not None and right_on is not None: + left_on = list(left_on) if is_list_like(left_on) else [left_on] + right_on = list(right_on) if is_list_like(right_on) else [right_on] + elif on is not None: + on = list(on) if is_list_like(on) else [on] + + new_columns, new_dtypes = cls._compute_result_metadata( + left, right, on, left_on, right_on, kwargs.get("suffixes", ("_x", "_y")) + ) + + new_left = left.__constructor__( + left._modin_frame.broadcast_apply_full_axis( + axis=1, + func=map_func, + enumerate_partitions=how == "left", + other=right_to_broadcast, + # We're going to explicitly change the shape across the 1-axis, + # so we want for partitioning to adapt as well + keep_partitioning=False, + num_splits=merge_partitioning( + left._modin_frame, right._modin_frame, axis=1 + ), + new_columns=new_columns, + sync_labels=False, + dtypes=new_dtypes, + pass_axis_lengths_to_partitions=how == "left", + ) + ) + + # Here we want to understand whether we're joining on a column or on an index level. + # It's cool if indexes are already materialized so we can easily check that, if not + # it's fine too, we can also decide that by columns, which tend to be already + # materialized quite often compared to the indexes. + keep_index = False + if left._modin_frame.has_materialized_index: + keep_index = should_keep_index(left, right) + else: + # Have to trigger columns materialization. Hope they're already available at this point. + if left_on is not None and right_on is not None: + keep_index = any( + o not in right.columns + and o in left_on + and o not in left.columns + for o in right_on + ) + elif on is not None: + keep_index = any( + o not in right.columns and o not in left.columns for o in on + ) + + if sort: + if left_on is not None and right_on is not None: + new_left = ( + new_left.sort_index(axis=0, level=left_on + right_on) + if keep_index + else new_left.sort_rows_by_column_values(left_on + right_on) + ) + elif on is not None: + new_left = ( + new_left.sort_index(axis=0, level=on) + if keep_index + else new_left.sort_rows_by_column_values(on) + ) + + return ( + new_left.reset_index(drop=True) + if not keep_index and (kwargs["how"] != "left" or sort) + else new_left + ) + else: + return left.default_to_pandas(pandas.DataFrame.merge, right, **kwargs) + + @classmethod + def _compute_result_metadata(cls, left, right, on, left_on, right_on, suffixes): + """ + Compute columns and dtypes metadata for the result of merge if possible. + + Parameters + ---------- + left : PandasQueryCompiler + right : PandasQueryCompiler + on : label, list of labels or None + `on` argument that was passed to ``pandas.merge()``. + left_on : label, list of labels or None + `left_on` argument that was passed to ``pandas.merge()``. + right_on : label, list of labels or None + `right_on` argument that was passed to ``pandas.merge()``. + suffixes : list of strings + `suffixes` argument that was passed to ``pandas.merge()``. + + Returns + ------- + new_columns : pandas.Index or None + Columns for the result of merge. ``None`` if not enought metadata to compute. + new_dtypes : ModinDtypes or None + Dtypes for the result of merge. ``None`` if not enought metadata to compute. + """ + new_columns = None + new_dtypes = None + + if not left._modin_frame.has_materialized_columns: + return new_columns, new_dtypes + + if left_on is None and right_on is None: + if on is None: + on = [c for c in left.columns if c in right.columns] + _left_on, _right_on = on, on + else: + if left_on is None or right_on is None: + raise MergeError( + "Must either pass only 'on' or 'left_on' and 'right_on', not combination of them." + ) + _left_on, _right_on = left_on, right_on + + try: + new_columns, left_renamer, right_renamer = join_columns( + left.columns, + right.columns, + _left_on, + _right_on, + suffixes, + ) + except NotImplementedError: + # This happens when one of the keys to join is an index level. Pandas behaviour + # is really complicated in this case, so we're not computing resulted columns for now. + pass + else: + # renamers may contain columns from 'index', so trying to merge index and column dtypes here + right_index_dtypes = ( + right.index.dtypes + if isinstance(right.index, pandas.MultiIndex) + else pandas.Series([right.index.dtype], index=[right.index.name]) + ) + right_dtypes = pandas.concat([right.dtypes, right_index_dtypes])[ + right_renamer.keys() + ].rename(right_renamer) + + left_index_dtypes = left._modin_frame._index_cache.maybe_get_dtypes() + left_dtypes = ( + ModinDtypes.concat([left._modin_frame._dtypes, left_index_dtypes]) + .lazy_get(left_renamer.keys()) + .set_index(list(left_renamer.values())) + ) + new_dtypes = ModinDtypes.concat([left_dtypes, right_dtypes]) + + return new_columns, new_dtypes diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index dffb1285eef..7beeca258dc 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -41,9 +41,9 @@ from pandas.core.groupby.base import transformation_kernels from pandas.core.indexes.api import ensure_index_from_sequences from pandas.core.indexing import check_bool_indexer -from pandas.errors import DataError, MergeError +from pandas.errors import DataError -from modin.config import CpuCount, RangePartitioningGroupby +from modin.config import CpuCount, RangePartitioning, RangePartitioningGroupby from modin.core.dataframe.algebra import ( Binary, Fold, @@ -57,7 +57,6 @@ GroupByDefault, SeriesGroupByDefault, ) -from modin.core.dataframe.base.dataframe.utils import join_columns from modin.core.dataframe.pandas.metadata import ( DtypesDescriptor, ModinDtypes, @@ -77,6 +76,7 @@ from .aggregations import CorrCovBuilder from .groupby import GroupbyReduceImpl +from .merge import MergeImpl from .utils import get_group_names, merge_partitioning @@ -513,170 +513,16 @@ def where_builder_series(df, cond): return self.__constructor__(new_modin_frame) def merge(self, right, **kwargs): - how = kwargs.get("how", "inner") - on = kwargs.get("on", None) - left_on = kwargs.get("left_on", None) - right_on = kwargs.get("right_on", None) - left_index = kwargs.get("left_index", False) - right_index = kwargs.get("right_index", False) - sort = kwargs.get("sort", False) - right_to_broadcast = right._modin_frame.combine() - - if how in ["left", "inner"] and left_index is False and right_index is False: - kwargs["sort"] = False - - def should_keep_index(left, right): - keep_index = False - if left_on is not None and right_on is not None: - keep_index = any( - o in left.index.names - and o in right_on - and o in right.index.names - for o in left_on - ) - elif on is not None: - keep_index = any( - o in left.index.names and o in right.index.names for o in on - ) - return keep_index - - def map_func( - left, right, *axis_lengths, kwargs=kwargs, **service_kwargs - ): # pragma: no cover - df = pandas.merge(left, right, **kwargs) - - if kwargs["how"] == "left": - partition_idx = service_kwargs["partition_idx"] - if len(axis_lengths): - if not should_keep_index(left, right): - # Doesn't work for "inner" case, since the partition sizes of the - # left dataframe may change - start = sum(axis_lengths[:partition_idx]) - stop = sum(axis_lengths[: partition_idx + 1]) - - df.index = pandas.RangeIndex(start, stop) - - return df - - # Want to ensure that these are python lists - if left_on is not None and right_on is not None: - left_on = list(left_on) if is_list_like(left_on) else [left_on] - right_on = list(right_on) if is_list_like(right_on) else [right_on] - elif on is not None: - on = list(on) if is_list_like(on) else [on] - - new_columns = None - new_dtypes = None - if self._modin_frame.has_materialized_columns: - if left_on is None and right_on is None: - if on is None: - on = [c for c in self.columns if c in right.columns] - _left_on, _right_on = on, on - else: - if left_on is None or right_on is None: - raise MergeError( - "Must either pass only 'on' or 'left_on' and 'right_on', not combination of them." - ) - _left_on, _right_on = left_on, right_on - - try: - new_columns, left_renamer, right_renamer = join_columns( - self.columns, - right.columns, - _left_on, - _right_on, - kwargs.get("suffixes", ("_x", "_y")), - ) - except NotImplementedError: - # This happens when one of the keys to join is an index level. Pandas behaviour - # is really complicated in this case, so we're not computing resulted columns for now. - pass - else: - # renamers may contain columns from 'index', so trying to merge index and column dtypes here - right_index_dtypes = ( - right.index.dtypes - if isinstance(right.index, pandas.MultiIndex) - else pandas.Series( - [right.index.dtype], index=[right.index.name] - ) - ) - right_dtypes = pandas.concat([right.dtypes, right_index_dtypes])[ - right_renamer.keys() - ].rename(right_renamer) - - left_index_dtypes = ( - self._modin_frame._index_cache.maybe_get_dtypes() - ) - left_dtypes = ( - ModinDtypes.concat( - [self._modin_frame._dtypes, left_index_dtypes] - ) - .lazy_get(left_renamer.keys()) - .set_index(list(left_renamer.values())) - ) - new_dtypes = ModinDtypes.concat([left_dtypes, right_dtypes]) - - new_self = self.__constructor__( - self._modin_frame.broadcast_apply_full_axis( - axis=1, - func=map_func, - enumerate_partitions=how == "left", - other=right_to_broadcast, - # We're going to explicitly change the shape across the 1-axis, - # so we want for partitioning to adapt as well - keep_partitioning=False, - num_splits=merge_partitioning( - self._modin_frame, right._modin_frame, axis=1 - ), - new_columns=new_columns, - sync_labels=False, - dtypes=new_dtypes, - pass_axis_lengths_to_partitions=how == "left", + if RangePartitioning.get(): + try: + return MergeImpl.range_partitioning_merge(self, right, kwargs) + except NotImplementedError as e: + message = ( + f"Can't use range-partitioning merge implementation because of: {e}" + + "\nFalling back to a row-axis implementation." ) - ) - - # Here we want to understand whether we're joining on a column or on an index level. - # It's cool if indexes are already materialized so we can easily check that, if not - # it's fine too, we can also decide that by columns, which tend to be already - # materialized quite often compared to the indexes. - keep_index = False - if self._modin_frame.has_materialized_index: - keep_index = should_keep_index(self, right) - else: - # Have to trigger columns materialization. Hope they're already available at this point. - if left_on is not None and right_on is not None: - keep_index = any( - o not in right.columns - and o in left_on - and o not in self.columns - for o in right_on - ) - elif on is not None: - keep_index = any( - o not in right.columns and o not in self.columns for o in on - ) - - if sort: - if left_on is not None and right_on is not None: - new_self = ( - new_self.sort_index(axis=0, level=left_on + right_on) - if keep_index - else new_self.sort_rows_by_column_values(left_on + right_on) - ) - elif on is not None: - new_self = ( - new_self.sort_index(axis=0, level=on) - if keep_index - else new_self.sort_rows_by_column_values(on) - ) - - return ( - new_self.reset_index(drop=True) - if not keep_index and (kwargs["how"] != "left" or sort) - else new_self - ) - else: - return self.default_to_pandas(pandas.DataFrame.merge, right, **kwargs) + get_logger().info(message) + return MergeImpl.row_axis_merge(self, right, kwargs) def join(self, right, **kwargs): on = kwargs.get("on", None) diff --git a/modin/pandas/test/dataframe/test_join_sort.py b/modin/pandas/test/dataframe/test_join_sort.py index 653e39bec0a..d2b44741544 100644 --- a/modin/pandas/test/dataframe/test_join_sort.py +++ b/modin/pandas/test/dataframe/test_join_sort.py @@ -19,7 +19,7 @@ import pytest import modin.pandas as pd -from modin.config import Engine, NPartitions, StorageFormat +from modin.config import Engine, NPartitions, RangePartitioning, StorageFormat from modin.pandas.io import to_pandas from modin.pandas.test.utils import ( arg_keys, @@ -54,6 +54,13 @@ pd.DataFrame() +def df_equals_and_sort(df1, df2): + """Sort dataframe's rows and run ``df_equals()`` for them.""" + df1 = df1.sort_values(by=df1.columns.tolist(), ignore_index=True) + df2 = df2.sort_values(by=df2.columns.tolist(), ignore_index=True) + df_equals(df1, df2) + + @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) def test_combine(data): pandas_df = pandas.DataFrame(data) @@ -214,6 +221,10 @@ def test_join_6602(): teams.set_index("league_abbreviation").join(abbreviations.rename("league_name")) +@pytest.mark.skipif( + RangePartitioning.get() and StorageFormat.get() == "Hdk", + reason="Doesn't make sense for HDK", +) @pytest.mark.parametrize( "test_data, test_data2", [ @@ -236,6 +247,10 @@ def test_join_6602(): ], ) def test_merge(test_data, test_data2): + # RangePartitioning merge always produces sorted result, so we have to sort + # pandas' result as well in order them to match + comparator = df_equals_and_sort if RangePartitioning.get() else df_equals + modin_df = pd.DataFrame( test_data, columns=["col{}".format(i) for i in range(test_data.shape[1])], @@ -268,7 +283,7 @@ def test_merge(test_data, test_data2): pandas_result = pandas_df.merge( pandas_df2, how=hows[i], on=ons[j], sort=sorts[j] ) - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) modin_result = modin_df.merge( modin_df2, @@ -284,7 +299,7 @@ def test_merge(test_data, test_data2): right_on="key", sort=sorts[j], ) - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) # Test for issue #1771 modin_df = pd.DataFrame({"name": np.arange(40)}) @@ -293,7 +308,7 @@ def test_merge(test_data, test_data2): pandas_df2 = pandas.DataFrame({"name": [39], "position": [0]}) modin_result = modin_df.merge(modin_df2, on="name", how="inner") pandas_result = pandas_df.merge(pandas_df2, on="name", how="inner") - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) frame_data = { "col1": [0, 1, 2, 3], @@ -314,7 +329,7 @@ def test_merge(test_data, test_data2): # Defaults modin_result = modin_df.merge(modin_df2, how=how) pandas_result = pandas_df.merge(pandas_df2, how=how) - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) # left_on and right_index modin_result = modin_df.merge( @@ -323,7 +338,7 @@ def test_merge(test_data, test_data2): pandas_result = pandas_df.merge( pandas_df2, how=how, left_on="col1", right_index=True ) - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) # left_index and right_on modin_result = modin_df.merge( @@ -332,7 +347,7 @@ def test_merge(test_data, test_data2): pandas_result = pandas_df.merge( pandas_df2, how=how, left_index=True, right_on="col1" ) - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) # left_on and right_on col1 modin_result = modin_df.merge( @@ -341,7 +356,7 @@ def test_merge(test_data, test_data2): pandas_result = pandas_df.merge( pandas_df2, how=how, left_on="col1", right_on="col1" ) - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) # left_on and right_on col2 modin_result = modin_df.merge( @@ -350,7 +365,7 @@ def test_merge(test_data, test_data2): pandas_result = pandas_df.merge( pandas_df2, how=how, left_on="col2", right_on="col2" ) - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) # left_index and right_index modin_result = modin_df.merge( @@ -359,7 +374,7 @@ def test_merge(test_data, test_data2): pandas_result = pandas_df.merge( pandas_df2, how=how, left_index=True, right_index=True ) - df_equals(modin_result, pandas_result) + comparator(modin_result, pandas_result) # Cannot merge a Series without a name ps = pandas.Series(frame_data2.get("col1")) @@ -368,6 +383,7 @@ def test_merge(test_data, test_data2): modin_df, pandas_df, lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps), + comparator=comparator, ) # merge a Series with a name @@ -377,6 +393,7 @@ def test_merge(test_data, test_data2): modin_df, pandas_df, lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps), + comparator=comparator, ) with pytest.raises(TypeError):