From 92fe2f771587afd21671976003a2cde0ae9f2b6c Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Wed, 21 Feb 2024 17:28:41 +0100 Subject: [PATCH] FEAT-#6990: Implement lazy execution for the Ray virtual partitions. --- modin/config/envvars.py | 2 +- modin/core/execution/ray/common/__init__.py | 8 +- .../ray/common/deferred_execution.py | 213 ++++-- .../execution/ray/common/engine_wrapper.py | 6 +- .../partitioning/lazy_virtual_partition.py | 616 ++++++++++++++++++ .../pandas_on_ray/partitioning/partition.py | 36 +- .../partitioning/partition_manager.py | 98 ++- modin/tests/pandas/dataframe/test_binary.py | 6 +- modin/tests/pandas/test_groupby.py | 9 +- 9 files changed, 911 insertions(+), 83 deletions(-) create mode 100644 modin/core/execution/ray/implementations/pandas_on_ray/partitioning/lazy_virtual_partition.py diff --git a/modin/config/envvars.py b/modin/config/envvars.py index f1dc047adb9..9bb01da60f8 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -868,7 +868,7 @@ class LazyExecution(EnvironmentVariable, type=str): """ varname = "MODIN_LAZY_EXECUTION" - choices = ("Auto", "On", "Off") + choices = ("Auto", "On", "Off", "Axis") default = "Auto" diff --git a/modin/core/execution/ray/common/__init__.py b/modin/core/execution/ray/common/__init__.py index f0da582328f..6837e6a07e9 100644 --- a/modin/core/execution/ray/common/__init__.py +++ b/modin/core/execution/ray/common/__init__.py @@ -13,7 +13,12 @@ """Common utilities for Ray execution engine.""" -from .engine_wrapper import MaterializationHook, RayWrapper, SignalActor +from .engine_wrapper import ( + MaterializationHook, + RayObjectRefTypes, + RayWrapper, + SignalActor, +) from .utils import initialize_ray __all__ = [ @@ -21,4 +26,5 @@ "RayWrapper", "MaterializationHook", "SignalActor", + "RayObjectRefTypes", ] diff --git a/modin/core/execution/ray/common/deferred_execution.py b/modin/core/execution/ray/common/deferred_execution.py index 0ace6c27b10..0d523f2565f 100644 --- a/modin/core/execution/ray/common/deferred_execution.py +++ b/modin/core/execution/ray/common/deferred_execution.py @@ -29,13 +29,20 @@ import pandas import ray +import ray.exceptions from ray._private.services import get_node_ip_address from ray.util.client.common import ClientObjectRef -from modin.core.execution.ray.common import MaterializationHook, RayWrapper +from modin.core.execution.ray.common import ( + MaterializationHook, + RayObjectRefTypes, + RayWrapper, +) +from modin.core.execution.utils import remote_function from modin.logging import get_logger +from modin.utils import _inherit_docstrings -ObjectRefType = Union[ray.ObjectRef, ClientObjectRef, None] +ObjectRefType = Union[ray.ObjectRef, ClientObjectRef] ObjectRefOrListType = Union[ObjectRefType, List[ObjectRefType]] ListOrTuple = (list, tuple) @@ -68,16 +75,18 @@ class DeferredExecution: Attributes ---------- - data : ObjectRefType or DeferredExecution + data : object The execution input. func : callable or ObjectRefType A function to be executed. - args : list or tuple + args : list or tuple, optional Additional positional arguments to be passed in `func`. - kwargs : dict + kwargs : dict, optional Additional keyword arguments to be passed in `func`. - num_returns : int + num_returns : int, default: 1 The number of the return values. + flat_data : bool + True means that the data is neither DeferredExecution nor list. flat_args : bool True means that there are no lists or DeferredExecution objects in `args`. In this case, no arguments processing is performed and `args` is passed @@ -88,26 +97,29 @@ class DeferredExecution: def __init__( self, - data: Union[ - ObjectRefType, - "DeferredExecution", - List[Union[ObjectRefType, "DeferredExecution"]], - ], + data: Any, func: Union[Callable, ObjectRefType], - args: Union[List[Any], Tuple[Any]], - kwargs: Dict[str, Any], + args: Optional[Union[List[Any], Tuple[Any]]] = None, + kwargs: Optional[Dict[str, Any]] = None, num_returns=1, ): - if isinstance(data, DeferredExecution): - data.subscribe() + self.flat_data = self._flat_args((data,)) self.data = data self.func = func - self.args = args - self.kwargs = kwargs self.num_returns = num_returns - self.flat_args = self._flat_args(args) - self.flat_kwargs = self._flat_args(kwargs.values()) self.subscribers = 0 + if args is not None: + self.args = args + self.flat_args = self._flat_args(args) + else: + self.args = () + self.flat_args = True + if kwargs is not None: + self.kwargs = kwargs + self.flat_kwargs = self._flat_args(kwargs.values()) + else: + self.kwargs = {} + self.flat_kwargs = True @classmethod def _flat_args(cls, args: Iterable): @@ -134,7 +146,7 @@ def _flat_args(cls, args: Iterable): def exec( self, - ) -> Tuple[ObjectRefOrListType, Union["MetaList", List], Union[int, List[int]]]: + ) -> Tuple[ObjectRefOrListType, "MetaList", Union[int, List[int]]]: """ Execute this task, if required. @@ -150,7 +162,7 @@ def exec( return self.data, self.meta, self.meta_offset if ( - not isinstance(self.data, DeferredExecution) + self.flat_data and self.flat_args and self.flat_kwargs and self.num_returns == 1 @@ -166,6 +178,7 @@ def exec( # it back. After the execution, the result is saved and the counter has no effect. self.subscribers += 2 consumers, output = self._deconstruct() + # The last result is the MetaList, so adding +1 here. num_returns = sum(c.num_returns for c in consumers) + 1 results = self._remote_exec_chain(num_returns, *output) @@ -173,12 +186,13 @@ def exec( meta_offset = 0 results = iter(results) for de in consumers: - if de.num_returns == 1: + num_returns = de.num_returns + if num_returns == 1: de._set_result(next(results), meta, meta_offset) meta_offset += 2 else: res = list(islice(results, num_returns)) - offsets = list(range(0, 2 * num_returns, 2)) + offsets = list(range(meta_offset, meta_offset + 2 * num_returns, 2)) de._set_result(res, meta, offsets) meta_offset += 2 * num_returns return self.data, self.meta, self.meta_offset @@ -303,7 +317,9 @@ def _deconstruct_chain( out_extend = output.extend while True: de.unsubscribe() - if (out_pos := getattr(de, "out_pos", None)) and not de.has_result: + if not (has_result := de.has_result) and ( + out_pos := getattr(de, "out_pos", None) + ): out_append(_Tag.REF) out_append(out_pos) output[out_pos] = out_pos @@ -318,12 +334,13 @@ def _deconstruct_chain( break elif not isinstance(data := de.data, DeferredExecution): if isinstance(data, ListOrTuple): + out_append(_Tag.LIST) yield cls._deconstruct_list( data, output, stack, result_consumers, out_append ) else: out_append(data) - if not de.has_result: + if not has_result: stack.append(de) break else: @@ -391,22 +408,24 @@ def _deconstruct_list( """ for obj in lst: if isinstance(obj, DeferredExecution): - if out_pos := getattr(obj, "out_pos", None): + if obj.has_result: + obj = obj.data + elif out_pos := getattr(obj, "out_pos", None): obj.unsubscribe() - if obj.has_result: - out_append(obj.data) - else: - out_append(_Tag.REF) - out_append(out_pos) - output[out_pos] = out_pos - if obj.subscribers == 0: - output[out_pos + 1] = 0 - result_consumers.remove(obj) + out_append(_Tag.REF) + out_append(out_pos) + output[out_pos] = out_pos + if obj.subscribers == 0: + output[out_pos + 1] = 0 + result_consumers.remove(obj) + continue else: out_append(_Tag.CHAIN) yield cls._deconstruct_chain(obj, output, stack, result_consumers) out_append(_Tag.END) - elif isinstance(obj, ListOrTuple): + continue + + if isinstance(obj, ListOrTuple): out_append(_Tag.LIST) yield cls._deconstruct_list( obj, output, stack, result_consumers, out_append @@ -432,20 +451,20 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]: list The execution results. The last element of this list is the ``MetaList``. """ - # Prefer _remote_exec_single_chain(). It has fewer arguments and - # does not require the num_returns to be specified in options. + # Prefer _remote_exec_single_chain(). It does not require the num_returns + # to be specified in options. if num_returns == 2: return _remote_exec_single_chain.remote(*args) else: return _remote_exec_multi_chain.options(num_returns=num_returns).remote( - num_returns, *args + *args ) def _set_result( self, result: ObjectRefOrListType, - meta: "MetaList", - meta_offset: Union[int, List[int]], + meta: Optional["MetaList"] = None, + meta_offset: Optional[Union[int, List[int]]] = 0, ): """ Set the execution result. @@ -453,12 +472,12 @@ def _set_result( Parameters ---------- result : ObjectRefOrListType - meta : MetaList - meta_offset : int or list of int + meta : MetaList, default: None + meta_offset : int or list of int, default: 0 """ - del self.func, self.args, self.kwargs, self.flat_args, self.flat_kwargs + del self.func, self.args, self.kwargs self.data = result - self.meta = meta + self.meta = MetaList([0]) if meta is None else meta self.meta_offset = meta_offset def __reduce__(self): @@ -466,6 +485,72 @@ def __reduce__(self): raise NotImplementedError("DeferredExecution is not serializable!") +ObjectRefOrDeType = Union[ObjectRefType, DeferredExecution] + + +class DeferredGetItem(DeferredExecution): + """ + Deferred execution task that returns an item at the specified index. + + Parameters + ---------- + data : ObjectRefOrDeType + The object to get the item from. + index : int + The item index. + """ + + def __init__(self, data: ObjectRefOrDeType, index: int): + super().__init__(data, self._remote_fn, [index]) + self.index = index + + @property + @_inherit_docstrings(DeferredExecution.has_result) + def has_result(self): + if super().has_result: + return True + + if isinstance(self.data, RayObjectRefTypes): + # Set the result if the object is already in the local store. + try: + self._set_result(ray.get(self.data, timeout=0)[self.index]) + return True + except ray.exceptions.GetTimeoutError: + return False + + if ( + isinstance(self.data, DeferredExecution) + and self.data.has_result + and self.data.num_returns != 1 + ): + # If `data` is a `DeferredExecution`, that returns multiple results, we + # don't need to execute `_remote_fn`, but can get the result by index instead. + self._set_result( + self.data.data[self.index], + self.data.meta, + self.data.meta_offset[self.index], + ) + return True + + return False + + @remote_function + def _remote_fn(obj, index): # pragma: no cover + """ + Return the item by index. + + Parameters + ---------- + obj : collection + index : int + + Returns + ------- + object + """ + return obj[index] + + class MetaList: """ Meta information, containing the result lengths and the worker address. @@ -478,6 +563,11 @@ class MetaList: def __init__(self, obj: Union[ray.ObjectID, ClientObjectRef, List]): self._obj = obj + def materialize(self): + """Materialized the list, if required.""" + if not isinstance(self._obj, list): + self._obj = RayWrapper.materialize(self._obj) + def __getitem__(self, index): """ Get item at the specified index. @@ -508,7 +598,7 @@ def __setitem__(self, index, value): obj[index] = value -class MetaListHook(MaterializationHook): +class MetaListHook(MaterializationHook, DeferredGetItem): """ Used by MetaList.__getitem__() for lazy materialization and getting a single value from the list. @@ -516,13 +606,13 @@ class MetaListHook(MaterializationHook): ---------- meta : MetaList Non-materialized list to get the value from. - idx : int + index : int The value index in the list. """ - def __init__(self, meta: MetaList, idx: int): + def __init__(self, meta: MetaList, index: int): + super().__init__(meta._obj, index) self.meta = meta - self.idx = idx def pre_materialize(self): """ @@ -533,7 +623,7 @@ def pre_materialize(self): object """ obj = self.meta._obj - return obj[self.idx] if isinstance(obj, list) else obj + return obj[self.index] if isinstance(obj, list) else obj def post_materialize(self, materialized): """ @@ -548,7 +638,7 @@ def post_materialize(self, materialized): object """ self.meta._obj = materialized - return materialized[self.idx] + return materialized[self.index] class _Tag(Enum): # noqa: PR01 @@ -605,7 +695,7 @@ def exec_func(fn: Callable, obj: Any, args: Tuple, kwargs: Dict) -> Any: raise err @classmethod - def construct(cls, num_returns: int, args: Tuple): # pragma: no cover + def construct(cls, args: Tuple): # pragma: no cover """ Construct and execute the specified chain. @@ -615,7 +705,6 @@ def construct(cls, num_returns: int, args: Tuple): # pragma: no cover Parameters ---------- - num_returns : int args : tuple Yields @@ -687,7 +776,7 @@ def construct_chain( while chain: fn = pop() - if fn == tg_e: + if fn is tg_e: lst.append(obj) break @@ -717,10 +806,10 @@ def construct_chain( itr = iter([obj] if num_returns == 1 else obj) for _ in range(num_returns): - obj = next(itr) - meta.append(len(obj) if hasattr(obj, "__len__") else 0) - meta.append(len(obj.columns) if hasattr(obj, "columns") else 0) - yield obj + o = next(itr) + meta.append(len(o) if hasattr(o, "__len__") else 0) + meta.append(len(o.columns) if hasattr(o, "columns") else 0) + yield o @classmethod def construct_list( @@ -834,20 +923,18 @@ def _remote_exec_single_chain( ------- Generator """ - return remote_executor.construct(num_returns=2, args=args) + return remote_executor.construct(args=args) @ray.remote def _remote_exec_multi_chain( - num_returns: int, *args: Tuple, remote_executor=_REMOTE_EXEC + *args: Tuple, remote_executor=_REMOTE_EXEC ) -> Generator: # pragma: no cover """ Execute the deconstructed chain with a multiple return values in a worker process. Parameters ---------- - num_returns : int - The number of return values. *args : tuple A deconstructed chain to be executed. remote_executor : _RemoteExecutor, default: _REMOTE_EXEC @@ -857,4 +944,4 @@ def _remote_exec_multi_chain( ------- Generator """ - return remote_executor.construct(num_returns, args) + return remote_executor.construct(args) diff --git a/modin/core/execution/ray/common/engine_wrapper.py b/modin/core/execution/ray/common/engine_wrapper.py index fc5f8a643d2..178b25e1d03 100644 --- a/modin/core/execution/ray/common/engine_wrapper.py +++ b/modin/core/execution/ray/common/engine_wrapper.py @@ -20,7 +20,7 @@ import asyncio import os from types import FunctionType -from typing import Sequence +from typing import Iterable, Sequence import ray from ray.util.client.common import ClientObjectRef @@ -106,7 +106,7 @@ def materialize(cls, obj_id): Parameters ---------- - obj_id : ray.ObjectID + obj_id : ObjectRefTypes Ray object identifier to get the value by. Returns @@ -214,7 +214,7 @@ def wait(cls, obj_ids, num_returns=None): num_returns : int, optional """ if not isinstance(obj_ids, Sequence): - obj_ids = list(obj_ids) + obj_ids = list(obj_ids) if isinstance(obj_ids, Iterable) else [obj_ids] ids = set() for obj in obj_ids: diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/lazy_virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/lazy_virtual_partition.py new file mode 100644 index 00000000000..a4c9ea09229 --- /dev/null +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/lazy_virtual_partition.py @@ -0,0 +1,616 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Module houses classes responsible for storing a virtual partition and applying a function to it.""" +import math +from typing import ( + Callable, + Collection, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) + +import pandas +import ray + +from modin.config import MinPartitionSize +from modin.core.dataframe.base.partitioning.axis_partition import ( + BaseDataframeAxisPartition, +) +from modin.core.dataframe.pandas.partitioning.axis_partition import ( + PandasDataframeAxisPartition, +) +from modin.core.execution.ray.common import RayWrapper +from modin.core.execution.ray.common.deferred_execution import ( + DeferredExecution, + DeferredGetItem, + MetaList, + ObjectRefOrDeType, + ObjectRefType, +) +from modin.core.execution.utils import remote_function +from modin.utils import _inherit_docstrings + +from .partition import PandasOnRayDataframePartition + + +class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition): + """ + The class implements the interface in ``PandasDataframeAxisPartition``. + + Parameters + ---------- + data : DeferredExecution or list of PandasOnRayDataframePartition + full_axis : bool, default: True + Whether or not the virtual partition encompasses the whole axis. + length : ray.ObjectRef or int, optional + Length, or reference to length, of wrapped ``pandas.DataFrame``. + width : ray.ObjectRef or int, optional + Width, or reference to width, of wrapped ``pandas.DataFrame``. + num_splits : int, optional + The number of chunks to split the results on. + chunk_lengths : list of ints, optional + The chunk lengths. + """ + + partition_type = PandasOnRayDataframePartition + instance_type = ray.ObjectRef + axis = None + + def __init__( + self, + data: Union[ + DeferredExecution, + PandasOnRayDataframePartition, + List[PandasOnRayDataframePartition], + ], + full_axis: bool = True, + length: Union[int, ObjectRefType] = None, + width: Union[int, ObjectRefType] = None, + num_splits=None, + chunk_lengths=None, + ): + self.full_axis = full_axis + self._meta = MetaList([length, width, None]) + self._meta_offset = 0 + self._chunk_lengths_cache = chunk_lengths + + if isinstance(data, DeferredExecution): + self._set_data_ref(data) + self._num_splits = num_splits + self._list_of_block_partitions = None + return + + if not isinstance(data, Collection) or len(data) == 1: + if not isinstance(data, Collection): + data = [data] + self._set_data_ref(data[0]._data) + self._num_splits = 1 + self._list_of_block_partitions = data + return + + self._num_splits = len(data) + self._list_of_block_partitions = data + + non_split, lengths, full_concat = self.find_non_split_block(data) + if non_split is not None: + if lengths is not None: + self._chunk_lengths_cache = lengths + if full_concat: + self._set_data_ref(non_split) + return + # TODO: We have a subset of the same frame here and can just get a single chunk + # from the original frame instead of concatenating all these chunks. + + data = DeferredExecution([part._data for part in data], self._remote_concat) + self._set_data_ref(data) + + @staticmethod + def find_non_split_block( + partitions: Iterable[PandasOnRayDataframePartition], + ) -> Tuple[ + Union[ObjectRefOrDeType, None], + Union[List[Union[ObjectRefType, int]], None], + bool, + ]: + """ + Find a non-split block if there is one. + + The `apply()` method returns the following lazy execution tree: + + DeferredExecution (apply func) + | + | + _DeferredSplit (n) + / ... \ + / \ + _DeferredGetChunk (0) ... _DeferredGetChunk (n - 1) + + If we need to concatenate all the `partitions` and the `partitions` are the + complete sequence of `_DeferredGetChunk`, then we can just get the root of + this tree and avoid the concatenation. + + Parameters + ---------- + partitions : list of PandasOnRayDataframePartition + + Returns + ------- + ObjectRefOrDeType or None + The non-split block or None if not found. + list of lengths or None + Estimated chunk lengths, that could be different form the real ones. + bool + Whether the specified partitions represent the full block or just the + first part of this block. + """ + refs = [part._data_ref for part in partitions] + if ( + isinstance(refs[0], _DeferredGetChunk) + and isinstance(split := refs[0].data, _DeferredSplit) + and (refs[0].index == 0) + and all(prev.is_next_chunk(next) for prev, next in zip(refs[:-1], refs[1:])) + ): + lengths = [ref.length for ref in refs if ref.length is not None] + return ( + split.data, + lengths if len(lengths) == len(refs) else None, + split.num_splits == refs[-1].index + 1, + ) + return None, None, False + + def _set_data_ref(self, data: Union[DeferredExecution, ObjectRefType]): + """ + Set the `_data_ref` property. + + Parameters + ---------- + data : DeferredExecution or ObjectRefType + """ + if isinstance(data, DeferredExecution): + data.subscribe() + self._data_ref = data + + def __del__(self): + """Unsubscribe from DeferredExecution.""" + if isinstance(self._data_ref, DeferredExecution): + self._data_ref.unsubscribe() + + @_inherit_docstrings(BaseDataframeAxisPartition.apply) + def apply( + self, + func, + *args, + num_splits=None, + other_axis_partition=None, + maintain_partitioning=True, + lengths=None, + manual_partition=False, + **kwargs, + ) -> Union[List[PandasOnRayDataframePartition], PandasOnRayDataframePartition]: + if not manual_partition: + if not self.full_axis: + # If this is not a full axis partition, it already contains a subset of + # the full axis, so we shouldn't split the result further. + num_splits = 1 + elif num_splits is None: + num_splits = self._num_splits + + if ( + num_splits == 1 + or not maintain_partitioning + or num_splits != self._num_splits + ): + lengths = None + elif lengths is None: + lengths = self._chunk_lengths + + if other_axis_partition is not None: + if isinstance(other_axis_partition, Collection): + if len(other_axis_partition) == 1: + other_part = other_axis_partition[0]._data + else: + concat_fn = ( + PandasOnRayDataframeColumnPartition + if self.axis + else PandasOnRayDataframeRowPartition + )._remote_concat + other_part = DeferredExecution( + [p._data for p in other_axis_partition], concat_fn + ) + else: + other_part = other_axis_partition._data + args = [other_part] + list(args) + + de = self._apply(func, args, kwargs) + if num_splits > 1: + de = _DeferredSplit(de, self.axis, num_splits, lengths) + if lengths is not None and len(lengths) != num_splits: + lengths = None + result = [ + PandasOnRayDataframePartition( + _DeferredGetChunk( + de, i, lengths[i] if lengths is not None else None + ) + ) + for i in range(num_splits) + ] + else: + result = [PandasOnRayDataframePartition(de)] + if self.full_axis or other_axis_partition is not None: + return result + else: + # If this is not a full axis partition, just take out the single split in the result. + return result[0] + + @_inherit_docstrings(PandasDataframeAxisPartition.add_to_apply_calls) + def add_to_apply_calls(self, func, *args, length=None, width=None, **kwargs): + de = self._apply(func, args, kwargs) + return type(self)( + de, self.full_axis, length, width, self._num_splits, self._chunk_lengths + ) + + @_inherit_docstrings(PandasDataframeAxisPartition.split) + def split( + self, split_func, num_splits, f_args=None, f_kwargs=None, extract_metadata=False + ) -> List[PandasOnRayDataframePartition]: + chunks, meta, offsets = DeferredExecution( + self._data_ref, + split_func, + args=f_args, + kwargs=f_kwargs, + num_returns=num_splits, + ).exec() + return [ + PandasOnRayDataframePartition(chunks[i], meta=meta, meta_offset=offsets[i]) + for i in range(num_splits) + ] + + @property + def _length_cache(self) -> Union[int, None, ObjectRefType]: + """ + Get the cached length of the partition. + + Returns + ------- + int or None or ObjectRefType + """ + return self._meta[self._meta_offset] + + def length(self, materialize=True) -> Union[int, ObjectRefType]: + """ + Get the length of the partition. + + Parameters + ---------- + materialize : bool, default: True + + Returns + ------- + int or ObjectRefType + """ + if self._length_cache is None: + self._calculate_lengths(materialize) + elif materialize: + self._meta.materialize() + return self._length_cache + + @property + def _width_cache(self) -> Union[int, None, ObjectRefType]: + """ + Get the cached width of the partition. + + Returns + ------- + int or None or ObjectRefType + """ + return self._meta[self._meta_offset + 1] + + def width(self, materialize=True) -> Union[int, ObjectRefType]: + """ + Get the width of the partition. + + Parameters + ---------- + materialize : bool, default: True + + Returns + ------- + int or ObjectRefType + """ + if self._width_cache is None: + self._calculate_lengths(materialize) + elif materialize: + self._meta.materialize() + return self._width_cache + + def _calculate_lengths(self, materialize=True): + """ + Calculate the length and width of the partition. + + Parameters + ---------- + materialize : bool, default: True + """ + if self._list_of_block_partitions is not None: + from . import PandasOnRayDataframePartitionManager + + lengths = [part.length(False) for part in self._list_of_block_partitions] + widths = [part.width(False) for part in self._list_of_block_partitions] + materialized = PandasOnRayDataframePartitionManager.materialize_futures( + lengths + widths + ) + self._meta[self._meta_offset] = sum(materialized[: len(lengths)]) + self._meta[self._meta_offset + 1] = sum(materialized[len(lengths) :]) + else: + self.force_materialization() + if materialize: + self._meta.materialize() + + @_inherit_docstrings(PandasDataframeAxisPartition.drain_call_queue) + def drain_call_queue(self, num_splits=None): + if num_splits: + self._num_splits = num_splits + + @_inherit_docstrings(PandasDataframeAxisPartition.force_materialization) + def force_materialization(self, get_ip=False): + self._data # Trigger execution + self._num_splits = 1 + self._chunk_lengths_cache = None + self._list_of_block_partitions = None + return self + + @_inherit_docstrings(PandasDataframeAxisPartition.wait) + def wait(self): + """Wait completing computations on the object wrapped by the partition.""" + RayWrapper.wait(self._data) + + @_inherit_docstrings(PandasDataframeAxisPartition.to_pandas) + def to_pandas(self): + return RayWrapper.materialize(self._data) + + @_inherit_docstrings(PandasDataframeAxisPartition.to_numpy) + def to_numpy(self): + return self.to_pandas().to_numpy() + + @_inherit_docstrings(PandasDataframeAxisPartition.mask) + def mask(self, row_indices, col_indices): + part = PandasOnRayDataframePartition(self._data_ref).mask( + row_indices, col_indices + ) + return type(self)(part, False) + + @property + @_inherit_docstrings(BaseDataframeAxisPartition.list_of_blocks) + def list_of_blocks(self): + return [part._data for part in self.list_of_block_partitions] + + @property + @_inherit_docstrings(PandasDataframeAxisPartition.list_of_block_partitions) + def list_of_block_partitions(self) -> list: + if self._list_of_block_partitions is not None: + return self._list_of_block_partitions + + data = self._data_ref + num_splits = self._num_splits + if num_splits > 1: + lengths = self._chunk_lengths + data = _DeferredSplit(data, self.axis, num_splits, lengths) + if lengths is not None and len(lengths) != num_splits: + lengths = None + self._list_of_block_partitions = [ + PandasOnRayDataframePartition( + _DeferredGetChunk( + data, i, lengths[i] if lengths is not None else None + ) + ) + for i in range(num_splits) + ] + else: + self._list_of_block_partitions = [PandasOnRayDataframePartition(data)] + return self._list_of_block_partitions + + @property + def list_of_ips(self): + """ + Return the list of IP worker addresses. + + Returns + ------- + list of str + """ + if (ip := self._meta[self._meta_offset + 2]) is not None: + return [ip] + if self._list_of_block_partitions is not None: + return [part.ip() for part in self._list_of_block_partitions] + return [] + + @property + def _data(self) -> ObjectRefType: + """ + Get the data wrapped by the partition. + + If the data is a `DeferredExecution`, the execution is triggered + and the result is returned. + + Returns + ------- + ObjectRefType + """ + data = self._data_ref + if isinstance(data, DeferredExecution): + data, self._meta, self._meta_offset = data.exec() + self._data_ref = data + return data + + @property + def _chunk_lengths(self) -> List[int]: + """ + Calculate the partition chunk lengths. + + Returns + ------- + list of int + """ + if ( + self._chunk_lengths_cache is None + and self._list_of_block_partitions is not None + ): + attr = "length" if self.axis == 0 else "width" + self._chunk_lengths_cache = [ + getattr(p, attr)(materialize=False) + for p in self._list_of_block_partitions + ] + return self._chunk_lengths_cache + + def _apply( + self, apply_fn: Union[Callable, ObjectRefType], args: List, kwargs: Dict + ) -> DeferredExecution: + """ + Apply the function to this partition. + + Parameters + ---------- + apply_fn : callable or ObjectRefType + args : list + kwargs : dict + + Returns + ------- + DeferredExecution + """ + return DeferredExecution(self._data_ref, apply_fn, args, kwargs) + + +class _DeferredSplit(DeferredExecution): + """ + Split the DataFrame along the specified axis into the `num_splits` chunks. + + Parameters + ---------- + obj : ObjectRefOrDeType + axis : int + num_splits : int + lengths : list of int or None + """ + + def __init__( + self, + obj: ObjectRefOrDeType, + axis: int, + num_splits: int, + lengths: Union[List[Union[ObjectRefType, int]], None], + ): + self.num_splits = num_splits + self.skip_chunks = set() + args = [axis, num_splits, MinPartitionSize.get(), self.skip_chunks] + if lengths and (len(lengths) == num_splits): + args.extend(lengths) + super().__init__(obj, self._split, args, num_returns=num_splits) + + @remote_function + def _split( + df: pandas.DataFrame, + axis: int, + num_splits: int, + min_chunk_len: int, + skip_chunks: Set[int], + *lengths: Optional[List[int]], + ): # pragma: no cover # noqa: GL08 + if not lengths or (sum(lengths) != df.shape[axis]): + length = df.shape[axis] + chunk_len = max(math.ceil(length / num_splits), min_chunk_len) + lengths = [chunk_len] * num_splits + + result = [] + start = 0 + for i in range(num_splits): + if i in skip_chunks: + result.append(None) + start += lengths[i] + continue + + end = start + lengths[i] + chunk = df.iloc[start:end] if axis == 0 else df.iloc[:, start:end] + start = end + result.append(chunk) + if isinstance(chunk.axes[axis], pandas.MultiIndex): + chunk.set_axis( + chunk.axes[axis].remove_unused_levels(), + axis=axis, + copy=False, + ) + + return result + + +class _DeferredGetChunk(DeferredGetItem): + """ + Get the chunk with the specified index from the split. + + Parameters + ---------- + split : _DeferredSplit + index : int + length : int, optional + """ + + def __init__(self, split: _DeferredSplit, index: int, length: Optional[int] = None): + super().__init__(split, index) + self.length = length + + def __del__(self): + """Remove this chunk from _DeferredSplit if it's not executed yet.""" + if isinstance(self.data, _DeferredSplit): + self.data.skip_chunks.add(self.index) + + def is_next_chunk(self, other): + """ + Check if the other chunk is the next chunk of the same split. + + Parameters + ---------- + other : object + + Returns + ------- + bool + """ + return ( + isinstance(other, _DeferredGetChunk) + and (self.data is other.data) + and (other.index == self.index + 1) + ) + + +@_inherit_docstrings(PandasOnRayDataframeVirtualPartition.__init__) +class PandasOnRayDataframeColumnPartition(PandasOnRayDataframeVirtualPartition): + axis = 0 + + @remote_function + def _remote_concat(dfs): # pragma: no cover # noqa: GL08 + return pandas.concat(dfs, axis=0, copy=False) + + +@_inherit_docstrings(PandasOnRayDataframeVirtualPartition.__init__) +class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition): + axis = 1 + + @remote_function + def _remote_concat(dfs): # pragma: no cover # noqa: GL08 + return pandas.concat(dfs, axis=1, copy=False) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index a4c35bf7e95..009d112724f 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -23,13 +23,18 @@ from modin.config import LazyExecution from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition -from modin.core.execution.ray.common import MaterializationHook, RayWrapper +from modin.core.execution.ray.common import ( + MaterializationHook, + RayObjectRefTypes, + RayWrapper, +) from modin.core.execution.ray.common.deferred_execution import ( DeferredExecution, MetaList, MetaListHook, ) from modin.core.execution.ray.common.utils import ObjectIDType +from modin.core.execution.utils import remote_function from modin.logging import disable_logging, get_logger from modin.pandas.indexing import compute_sliced_len from modin.utils import _inherit_docstrings @@ -148,7 +153,7 @@ def add_to_apply_calls( def drain_call_queue(self): data = self._data_ref if not isinstance(data, DeferredExecution): - return data + return log = get_logger() self._is_debug(log) and log.debug( @@ -387,7 +392,7 @@ def _configure_lazy_exec(cls: LazyExecution): """Configure lazy execution mode for PandasOnRayDataframePartition.""" mode = cls.get() get_logger().debug(f"Ray lazy execution mode: {mode}") - if mode == "Auto": + if mode == "Auto" or mode == "Axis": PandasOnRayDataframePartition.apply = ( PandasOnRayDataframePartition._eager_exec_func ) @@ -419,7 +424,7 @@ def eager_exec(self, func, *args, length=None, width=None, **kwargs): LazyExecution.subscribe(_configure_lazy_exec) -class SlicerHook(MaterializationHook): +class SlicerHook(MaterializationHook, DeferredExecution): """ Used by mask() for the slilced length computation. @@ -432,9 +437,32 @@ class SlicerHook(MaterializationHook): """ def __init__(self, ref: ObjectIDType, slc: slice): + super().__init__(slc, remote_function(compute_sliced_len), [ref]) self.ref = ref self.slc = slc + @_inherit_docstrings(DeferredExecution.has_result) + def has_result(self): + if super().has_result: + return True + + ref = self.ref + if isinstance(ref, MetaListHook): + if ref.has_result: + ref = ref.data + else: + return False + elif isinstance(ref, RayObjectRefTypes): + try: + ref = ray.get(ref, timeout=0) + except ray.exceptions.GetTimeoutError: + return False + else: + return False + + self._set_result(compute_sliced_len(self.slc, ref)) + return True + def pre_materialize(self): """ Get the sliced length or object ref if not materialized. diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 0748efb219a..1995215e3f5 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -12,24 +12,36 @@ # governing permissions and limitations under the License. """Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray.""" +import math import numpy as np +import pandas from pandas.core.dtypes.common import is_numeric_dtype -from modin.config import AsyncReadMode +from modin.config import AsyncReadMode, LazyExecution, MinPartitionSize from modin.core.execution.modin_aqp import progress_bar_wrapper from modin.core.execution.ray.common import RayWrapper +from modin.core.execution.ray.common.deferred_execution import DeferredExecution from modin.core.execution.ray.generic.partitioning import ( GenericRayDataframePartitionManager, ) +from modin.core.execution.utils import remote_function from modin.logging import get_logger from modin.utils import _inherit_docstrings from .partition import PandasOnRayDataframePartition -from .virtual_partition import ( - PandasOnRayDataframeColumnPartition, - PandasOnRayDataframeRowPartition, -) + +if LazyExecution.get() in ("On", "Axis"): + from .lazy_virtual_partition import ( + PandasOnRayDataframeColumnPartition, + PandasOnRayDataframeRowPartition, + PandasOnRayDataframeVirtualPartition, + ) +else: + from .virtual_partition import ( + PandasOnRayDataframeColumnPartition, + PandasOnRayDataframeRowPartition, + ) class PandasOnRayDataframePartitionManager(GenericRayDataframePartitionManager): @@ -42,6 +54,82 @@ class PandasOnRayDataframePartitionManager(GenericRayDataframePartitionManager): _execution_wrapper = RayWrapper materialize_futures = RayWrapper.materialize + if LazyExecution.get() in ("On", "Axis"): + # Try calculating the indices without triggering the lazy split. + + @classmethod + @_inherit_docstrings(GenericRayDataframePartitionManager.get_indices) + def get_indices(cls, axis, partitions, index_func=None): + partitions = partitions.T if axis == 0 else partitions + if len(partitions): + partitions = [part for part in partitions[0]] + num_partitions = len(partitions) + if num_partitions == 0: + return pandas.Index([]), [] + + non_split, lengths, _ = ( + PandasOnRayDataframeVirtualPartition.find_non_split_block(partitions) + ) + if non_split is not None: + partitions = [non_split] + if lengths is None: + lengths = [] + else: + partitions = [part._data for part in partitions] + + if index_func is None: + if axis == 0: + + @remote_function + def remote_fn(*dfs): # pragma: no cover + return [df.index for df in dfs] + + else: + + @remote_function + def remote_fn(*dfs): # pragma: no cover + return [df.columns for df in dfs] + + data, args = partitions[0], partitions[1:] + else: + + @remote_function + def remote_fn(fn, *dfs): # pragma: no cover + return [fn(df) for df in dfs] + + data, args = index_func, partitions + + de = DeferredExecution(data, remote_fn, args, num_returns=len(partitions)) + part_indices = de.exec()[0] + + # If there is a non-split df, we could get the index from it and avoid + # triggering all the lazy operations and then concatenating the indices. + if non_split is not None: + materialized = RayWrapper.materialize([part_indices] + lengths) + idx = materialized[0][0] + lengths = materialized[1:] + idx_len = len(idx) + + if idx_len != sum(lengths): + chunk_len = max( + math.ceil(idx_len / num_partitions), MinPartitionSize.get() + ) + lengths = [chunk_len] * num_partitions + + part_indices = [] + start = 0 + for length in lengths: + end = start + length + part_indices.append(idx[start:end]) + start = end + return idx, part_indices + + part_indices = RayWrapper.materialize(part_indices) + indices = [idx for idx in part_indices if len(idx)] + if len(indices) == 0: + return part_indices[0], part_indices + return indices[0].append(indices[1:]), part_indices + @classmethod def wait_partitions(cls, partitions): """ diff --git a/modin/tests/pandas/dataframe/test_binary.py b/modin/tests/pandas/dataframe/test_binary.py index 062250bbd8a..4014687face 100644 --- a/modin/tests/pandas/dataframe/test_binary.py +++ b/modin/tests/pandas/dataframe/test_binary.py @@ -18,8 +18,8 @@ import modin.pandas as pd from modin.config import Engine, NPartitions, StorageFormat -from modin.core.dataframe.pandas.partitioning.axis_partition import ( - PandasDataframeAxisPartition, +from modin.core.dataframe.base.partitioning.axis_partition import ( + BaseDataframeAxisPartition, ) from modin.tests.pandas.utils import ( CustomIntegerForAddition, @@ -228,7 +228,7 @@ def modin_df(is_virtual): # Modin should rebalance the partitions after the concat, producing virtual partitions. assert isinstance( result._query_compiler._modin_frame._partitions[0][0], - PandasDataframeAxisPartition, + BaseDataframeAxisPartition, ) return result diff --git a/modin/tests/pandas/test_groupby.py b/modin/tests/pandas/test_groupby.py index c10cba13b1d..3b3ec386d68 100644 --- a/modin/tests/pandas/test_groupby.py +++ b/modin/tests/pandas/test_groupby.py @@ -29,8 +29,8 @@ use_range_partitioning_groupby, ) from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy -from modin.core.dataframe.pandas.partitioning.axis_partition import ( - PandasDataframeAxisPartition, +from modin.core.dataframe.base.partitioning.axis_partition import ( + BaseDataframeAxisPartition, ) from modin.pandas.io import from_pandas from modin.pandas.utils import is_scalar @@ -264,6 +264,9 @@ def test_mixed_dtypes_groupby(as_index): comparator=lambda *dfs: df_equals(*sort_if_experimental_groupby(*dfs)), ) # FIXME: https://github.com/modin-project/modin/issues/7032 + # Triger execution of deferred operations. If not executed, eval_shift() below fails with + # `could not convert string to float: '\x94'`. Probably, this is also related to #7032. + modin_groupby.shift() eval_general( modin_groupby, pandas_groupby, @@ -2647,7 +2650,7 @@ def test_groupby_with_virtual_partitions(): # Check that the constructed Modin DataFrame has virtual partitions when assert issubclass( type(big_modin_df._query_compiler._modin_frame._partitions[0][0]), - PandasDataframeAxisPartition, + BaseDataframeAxisPartition, ) eval_general( big_modin_df, big_pandas_df, lambda df: df.groupby(df.columns[0]).count()