diff --git a/modin/core/dataframe/pandas/partitioning/axis_partition.py b/modin/core/dataframe/pandas/partitioning/axis_partition.py index 06cb1bd65dd..cdbcc1a36e6 100644 --- a/modin/core/dataframe/pandas/partitioning/axis_partition.py +++ b/modin/core/dataframe/pandas/partitioning/axis_partition.py @@ -388,6 +388,7 @@ def deploy_axis_func( *partitions, lengths=None, manual_partition=False, + split_func=split_result_of_axis_func_pandas, ): """ Deploy a function along a full axis. @@ -413,13 +414,19 @@ def deploy_axis_func( The list of lengths to shuffle the object. manual_partition : bool, default: False If True, partition the result with `lengths`. + split_func : callable, optional + Split the result with the specified function. Returns ------- list A list of pandas DataFrames. """ - dataframe = pandas.concat(list(partitions), axis=axis, copy=False) + dataframe = ( + partitions[0] + if len(partitions) == 1 + else pandas.concat(partitions, axis=axis, copy=False) + ) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=FutureWarning) try: @@ -451,7 +458,7 @@ def deploy_axis_func( lengths = [len(part.columns) for part in partitions] if sum(lengths) != len(result.columns): lengths = None - return split_result_of_axis_func_pandas(axis, num_splits, result, lengths) + return split_func(axis, num_splits, result, lengths) @classmethod def deploy_func_between_two_axis_partitions( @@ -464,6 +471,7 @@ def deploy_func_between_two_axis_partitions( len_of_left, other_shape, *partitions, + split_func=split_result_of_axis_func_pandas, ): """ Deploy a function along a full axis between two data sets. @@ -487,6 +495,8 @@ def deploy_func_between_two_axis_partitions( (other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition. *partitions : iterable All partitions that make up the full axis (row or column) for both data sets. + split_func : callable, optional + Split the result with the specified function. Returns ------- @@ -510,7 +520,7 @@ def deploy_func_between_two_axis_partitions( with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=FutureWarning) result = func(lt_frame, rt_frame, *f_args, **f_kwargs) - return split_result_of_axis_func_pandas(axis, num_splits, result) + return split_func(axis, num_splits, result) @classmethod def drain(cls, df: pandas.DataFrame, call_queue: list): diff --git a/modin/core/execution/ray/common/deferred_execution.py b/modin/core/execution/ray/common/deferred_execution.py index 5198d83502e..643723f1d23 100644 --- a/modin/core/execution/ray/common/deferred_execution.py +++ b/modin/core/execution/ray/common/deferred_execution.py @@ -72,12 +72,14 @@ class DeferredExecution: The execution input. func : callable or ObjectRefType A function to be executed. - args : list or tuple + args : list or tuple, optional Additional positional arguments to be passed in `func`. - kwargs : dict + kwargs : dict, optional Additional keyword arguments to be passed in `func`. - num_returns : int + num_returns : int, default: 1 The number of the return values. + flat_data : bool + True means that the data is neither DeferredExecution nor list. flat_args : bool True means that there are no lists or DeferredExecution objects in `args`. In this case, no arguments processing is performed and `args` is passed @@ -88,26 +90,29 @@ class DeferredExecution: def __init__( self, - data: Union[ - ObjectRefType, - "DeferredExecution", - List[Union[ObjectRefType, "DeferredExecution"]], - ], + data: Any, func: Union[Callable, ObjectRefType], - args: Union[List[Any], Tuple[Any]], - kwargs: Dict[str, Any], + args: Union[List[Any], Tuple[Any]] = None, + kwargs: Dict[str, Any] = None, num_returns=1, ): - if isinstance(data, DeferredExecution): - data.subscribe() + self.flat_data = self._flat_args((data,)) self.data = data self.func = func - self.args = args - self.kwargs = kwargs self.num_returns = num_returns - self.flat_args = self._flat_args(args) - self.flat_kwargs = self._flat_args(kwargs.values()) self.subscribers = 0 + if args is not None: + self.args = args + self.flat_args = self._flat_args(args) + else: + self.args = () + self.flat_args = True + if kwargs is not None: + self.kwargs = kwargs + self.flat_kwargs = self._flat_args(kwargs.values()) + else: + self.kwargs = {} + self.flat_kwargs = True @classmethod def _flat_args(cls, args: Iterable): @@ -134,7 +139,7 @@ def _flat_args(cls, args: Iterable): def exec( self, - ) -> Tuple[ObjectRefOrListType, Union["MetaList", List], Union[int, List[int]]]: + ) -> Tuple[ObjectRefOrListType, "MetaList", Union[int, List[int]]]: """ Execute this task, if required. @@ -150,7 +155,7 @@ def exec( return self.data, self.meta, self.meta_offset if ( - not isinstance(self.data, DeferredExecution) + self.flat_data and self.flat_args and self.flat_kwargs and self.num_returns == 1 @@ -166,6 +171,7 @@ def exec( # it back. After the execution, the result is saved and the counter has no effect. self.subscribers += 2 consumers, output = self._deconstruct() + assert not any(isinstance(o, ListOrTuple) for o in output) # The last result is the MetaList, so adding +1 here. num_returns = sum(c.num_returns for c in consumers) + 1 results = self._remote_exec_chain(num_returns, *output) @@ -173,7 +179,8 @@ def exec( meta_offset = 0 results = iter(results) for de in consumers: - if de.num_returns == 1: + num_returns = de.num_returns + if num_returns == 1: de._set_result(next(results), meta, meta_offset) meta_offset += 2 else: @@ -318,6 +325,7 @@ def _deconstruct_chain( break elif not isinstance(data := de.data, DeferredExecution): if isinstance(data, ListOrTuple): + out_append(_Tag.LIST) yield cls._deconstruct_list( data, output, stack, result_consumers, out_append ) @@ -394,7 +402,13 @@ def _deconstruct_list( if out_pos := getattr(obj, "out_pos", None): obj.unsubscribe() if obj.has_result: - out_append(obj.data) + if isinstance(obj.data, ListOrTuple): + out_append(_Tag.LIST) + yield cls._deconstruct_list( + obj.data, output, stack, result_consumers, out_append + ) + else: + out_append(obj.data) else: out_append(_Tag.REF) out_append(out_pos) @@ -432,13 +446,13 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]: list The execution results. The last element of this list is the ``MetaList``. """ - # Prefer _remote_exec_single_chain(). It has fewer arguments and - # does not require the num_returns to be specified in options. + # Prefer _remote_exec_single_chain(). It does not require the num_returns + # to be specified in options. if num_returns == 2: return _remote_exec_single_chain.remote(*args) else: return _remote_exec_multi_chain.options(num_returns=num_returns).remote( - num_returns, *args + *args ) def _set_result( @@ -456,7 +470,7 @@ def _set_result( meta : MetaList meta_offset : int or list of int """ - del self.func, self.args, self.kwargs, self.flat_args, self.flat_kwargs + del self.func, self.args, self.kwargs self.data = result self.meta = meta self.meta_offset = meta_offset @@ -564,7 +578,7 @@ def exec_func(fn: Callable, obj: Any, args: Tuple, kwargs: Dict) -> Any: raise err @classmethod - def construct(cls, num_returns: int, args: Tuple): # pragma: no cover + def construct(cls, args: Tuple): # pragma: no cover """ Construct and execute the specified chain. @@ -574,7 +588,6 @@ def construct(cls, num_returns: int, args: Tuple): # pragma: no cover Parameters ---------- - num_returns : int args : tuple Yields @@ -646,7 +659,7 @@ def construct_chain( while chain: fn = pop() - if fn == tg_e: + if fn is tg_e: lst.append(obj) break @@ -676,10 +689,10 @@ def construct_chain( itr = iter([obj] if num_returns == 1 else obj) for _ in range(num_returns): - obj = next(itr) - meta.append(len(obj) if hasattr(obj, "__len__") else 0) - meta.append(len(obj.columns) if hasattr(obj, "columns") else 0) - yield obj + o = next(itr) + meta.append(len(o) if hasattr(o, "__len__") else 0) + meta.append(len(o.columns) if hasattr(o, "columns") else 0) + yield o @classmethod def construct_list( @@ -793,20 +806,18 @@ def _remote_exec_single_chain( ------- Generator """ - return remote_executor.construct(num_returns=2, args=args) + return remote_executor.construct(args=args) @ray.remote def _remote_exec_multi_chain( - num_returns: int, *args: Tuple, remote_executor=_REMOTE_EXEC + *args: Tuple, remote_executor=_REMOTE_EXEC ) -> Generator: # pragma: no cover """ Execute the deconstructed chain with a multiple return values in a worker process. Parameters ---------- - num_returns : int - The number of return values. *args : tuple A deconstructed chain to be executed. remote_executor : _RemoteExecutor, default: _REMOTE_EXEC @@ -816,4 +827,4 @@ def _remote_exec_multi_chain( ------- Generator """ - return remote_executor.construct(num_returns, args) + return remote_executor.construct(args) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index 2e0ded45428..c9944e9283e 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -148,7 +148,7 @@ def add_to_apply_calls( def drain_call_queue(self): data = self._data_ref if not isinstance(data, DeferredExecution): - return data + return log = get_logger() self._is_debug(log) and log.debug( diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 2f67bf94d73..172694a45a3 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -12,245 +12,446 @@ # governing permissions and limitations under the License. """Module houses classes responsible for storing a virtual partition and applying a function to it.""" +import math +from typing import List, Tuple, Union, Iterable, Optional import pandas import ray -from ray.util import get_node_ip_address +from modin.config import MinPartitionSize +from modin.core.dataframe.base.partitioning.axis_partition import ( + BaseDataframeAxisPartition, +) from modin.core.dataframe.pandas.partitioning.axis_partition import ( PandasDataframeAxisPartition, ) from modin.core.execution.ray.common import RayWrapper +from modin.core.execution.ray.common.deferred_execution import ( + DeferredExecution, + MetaList, + ObjectRefType, +) +from modin.core.execution.ray.common.utils import ObjectIDType from modin.utils import _inherit_docstrings from .partition import PandasOnRayDataframePartition -class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition): +class PandasOnRayDataframeVirtualPartition(BaseDataframeAxisPartition): """ The class implements the interface in ``PandasDataframeAxisPartition``. Parameters ---------- - list_of_partitions : Union[list, PandasOnRayDataframePartition] - List of ``PandasOnRayDataframePartition`` and - ``PandasOnRayDataframeVirtualPartition`` objects, or a single - ``PandasOnRayDataframePartition``. - get_ip : bool, default: False - Whether to get node IP addresses to conforming partitions or not. + data : DeferredExecution or list of PandasOnRayDataframePartition full_axis : bool, default: True Whether or not the virtual partition encompasses the whole axis. - call_queue : list, optional - A list of tuples (callable, args, kwargs) that contains deferred calls. length : ray.ObjectRef or int, optional Length, or reference to length, of wrapped ``pandas.DataFrame``. width : ray.ObjectRef or int, optional Width, or reference to width, of wrapped ``pandas.DataFrame``. + num_splits : int + The number of chunks to split the results on. """ - _PARTITIONS_METADATA_LEN = 3 # (length, width, ip) partition_type = PandasOnRayDataframePartition instance_type = ray.ObjectRef axis = None - # these variables are intentionally initialized at runtime (see #6023) - _DEPLOY_AXIS_FUNC = None - _DEPLOY_SPLIT_FUNC = None - _DRAIN_FUNC = None + def __init__( + self, + data: Union[ + DeferredExecution, + PandasOnRayDataframePartition, + List[PandasOnRayDataframePartition], + ], + full_axis: bool = True, + length: Union[int, ObjectRefType] = None, + width: Union[int, ObjectRefType] = None, + num_splits=None, + ): + self.full_axis = full_axis + self._meta = MetaList([length, width, None]) + self._meta_offset = 0 + + if isinstance(data, DeferredExecution): + self._set_axis_data(data) + self._num_splits = num_splits + self._list_of_block_partitions = None + return + + if isinstance(data, Iterable): + data = list(data) + else: + data = [data] + + self._axis_data = None + self._num_splits = len(data) + self._list_of_block_partitions = data + + def __del__(self): + """Unsubscribe from DeferredExecution.""" + if isinstance(self._axis_data, DeferredExecution): + self._axis_data.unsubscribe() + + @_inherit_docstrings(BaseDataframeAxisPartition.apply) + def apply( + self, + func, + *args, + num_splits=None, + other_axis_partition=None, + maintain_partitioning=True, + lengths=None, + manual_partition=False, + **kwargs, + ) -> Union[List[PandasOnRayDataframePartition], PandasOnRayDataframePartition]: + if not self.full_axis: + # If this is not a full axis partition, it already contains a subset of + # the full axis, so we shouldn't split the result further. + num_splits = 1 + elif num_splits is None: + num_splits = self._num_splits + if num_splits != self._num_splits or not maintain_partitioning: + lengths = None + elif lengths is not None: + lengths = None if num_splits == 1 else list(lengths) + + if other_axis_partition is not None: + assert isinstance(other_axis_partition, type(self)) + args = [other_axis_partition._data_ref] + list(args) + + de = self._apply(func, args, kwargs) + if num_splits > 1: + get_chunk = self._get_chunk + result = [ + PandasOnRayDataframePartition(get_chunk(de, i, num_splits, lengths)) + for i in range(num_splits) + ] + else: + result = [PandasOnRayDataframePartition(de)] + if self.full_axis: + return result + else: + # If this is not a full axis partition, just take out the single split in the result. + return result[0] + + @_inherit_docstrings(PandasDataframeAxisPartition.split) + def split( + self, split_func, num_splits, f_args=None, f_kwargs=None, extract_metadata=False + ) -> List[PandasOnRayDataframePartition]: + de = DeferredExecution( + self._data_ref, + split_func, + args=f_args, + kwargs=f_kwargs, + num_returns=num_splits, + ) - @classmethod - def _get_deploy_axis_func(cls): # noqa: GL08 - if cls._DEPLOY_AXIS_FUNC is None: - cls._DEPLOY_AXIS_FUNC = RayWrapper.put( - PandasDataframeAxisPartition.deploy_axis_func - ) - return cls._DEPLOY_AXIS_FUNC + if num_splits > 1: + return [ + PandasOnRayDataframePartition( + _GetChunk(de, None, (i, num_splits, None, None)) + ) + for i in range(num_splits) + ] + return [PandasOnRayDataframePartition(de)] - @classmethod - def _get_deploy_split_func(cls): # noqa: GL08 - if cls._DEPLOY_SPLIT_FUNC is None: - cls._DEPLOY_SPLIT_FUNC = RayWrapper.put( - PandasDataframeAxisPartition.deploy_splitting_func - ) - return cls._DEPLOY_SPLIT_FUNC + @_inherit_docstrings(PandasDataframeAxisPartition.add_to_apply_calls) + def add_to_apply_calls(self, func, *args, length=None, width=None, **kwargs): + de = self._apply(func, args, kwargs) + return type(self)(de, self.full_axis, length, width, self._num_splits) - @classmethod - def _get_drain_func(cls): # noqa: GL08 - if cls._DRAIN_FUNC is None: - cls._DRAIN_FUNC = RayWrapper.put(PandasDataframeAxisPartition.drain) - return cls._DRAIN_FUNC + @property + def _length_cache(self): # noqa: GL08 + return self._meta[self._meta_offset] + + @_length_cache.setter + def _length_cache(self, value): # noqa: GL08 + self._meta[self._meta_offset] = value + + def length(self, materialize=True): # noqa: GL08 + if (length := self._length_cache) is not None: + return length + if self._list_of_block_partitions is None: + return self._get_lengths()[0] + if self.axis == 1: + length = self._list_of_block_partitions[0].length(materialize) + else: + from . import PandasOnRayDataframePartitionManager + + lengths = [part.length(False) for part in self._list_of_block_partitions] + lengths = PandasOnRayDataframePartitionManager.materialize_futures(lengths) + length = sum(lengths) + self._length_cache = length + return length + + @property + def _width_cache(self): # noqa: GL08 + return self._meta[self._meta_offset + 1] + + @_width_cache.setter + def _width_cache(self, value): # noqa: GL08 + self._meta[self._meta_offset + 1] = value + + def width(self, materialize=True): # noqa: GL08 + if (width := self._width_cache) is not None: + return width + if self._list_of_block_partitions is None: + return self._get_lengths()[1] + if self.axis == 0: + width = self._list_of_block_partitions[0].width(materialize) + else: + from . import PandasOnRayDataframePartitionManager + + widths = [part.width(False) for part in self._list_of_block_partitions] + widths = PandasOnRayDataframePartitionManager.materialize_futures(widths) + width = sum(widths) + self._width_cache = width + + def _get_lengths(self, materialize=True) -> Tuple[int, int]: # noqa: GL08 + self._data # Trigger lazy execution + meta = self._meta + off = self._meta_offset + if materialize and any( + isinstance(o, ObjectIDType) for o in meta[off : off + 1] + ): + if all(isinstance(o, ObjectIDType) for o in meta[off : off + 1]): + meta[off], meta[off + 1] = RayWrapper.materialize(meta[off : off + 1]) + elif isinstance(meta[off], ObjectIDType): + meta[off] = RayWrapper.materialize(meta[off]) + else: + meta[off + 1] = RayWrapper.materialize(meta[off + 1]) + return meta[off], meta[off + 1] + + @_inherit_docstrings(PandasDataframeAxisPartition.drain_call_queue) + def drain_call_queue(self, num_splits=None): + if num_splits: + self._num_splits = num_splits + + @_inherit_docstrings(PandasDataframeAxisPartition.force_materialization) + def force_materialization(self, get_ip=False): + return type(self)(PandasOnRayDataframePartition(self._data_ref)) + + @_inherit_docstrings(PandasDataframeAxisPartition.wait) + def wait(self): + """Wait completing computations on the object wrapped by the partition.""" + if self._axis_data is not None: + RayWrapper.wait(self._data) + + @_inherit_docstrings(PandasDataframeAxisPartition.to_pandas) + def to_pandas(self): + return RayWrapper.materialize(self._data) + + @_inherit_docstrings(PandasDataframeAxisPartition.to_numpy) + def to_numpy(self): + return self.to_pandas().to_numpy() + + @_inherit_docstrings(PandasDataframeAxisPartition.mask) + def mask(self, row_indices, col_indices): + part = PandasOnRayDataframePartition(self._data_ref).mask( + row_indices, col_indices + ) + return type(self)(part, False) + + @property + @_inherit_docstrings(BaseDataframeAxisPartition.list_of_blocks) + def list_of_blocks(self): + return [part._data for part in self.list_of_block_partitions] + + @property + @_inherit_docstrings(PandasDataframeAxisPartition.list_of_block_partitions) + def list_of_block_partitions(self) -> list: + if self._list_of_block_partitions is not None: + return self._list_of_block_partitions + + data = self._data_ref + num_splits = self._num_splits + if num_splits > 1: + get_chunk = self._get_chunk + return [ + PandasOnRayDataframePartition(get_chunk(data, i, num_splits, None)) + for i in range(num_splits) + ] + return [PandasOnRayDataframePartition(data)] @property def list_of_ips(self): """ - Get the IPs holding the physical objects composing this partition. + Return the list of IP worker addresses. Returns ------- - List - A list of IPs as ``ray.ObjectRef`` or str. + list of str """ - # Defer draining call queue until we get the ip address - result = [None] * len(self.list_of_block_partitions) - for idx, partition in enumerate(self.list_of_block_partitions): - partition.drain_call_queue() - result[idx] = partition.ip(materialize=False) - return result + if (ip := self._meta[self._meta_offset + 2]) is not None: + return [ip] + if self._list_of_block_partitions is not None: + return [part.ip() for part in self._list_of_block_partitions] + return [] + + def _set_axis_data( + self, data: Union[DeferredExecution, ObjectRefType] + ): # noqa: GL08 + if isinstance(data, DeferredExecution): + data.subscribe() + self._axis_data = data - @classmethod - @_inherit_docstrings(PandasDataframeAxisPartition.deploy_splitting_func) - def deploy_splitting_func( - cls, - axis, - func, - f_args, - f_kwargs, - num_splits, - *partitions, - extract_metadata=False, - ): - return _deploy_ray_func.options( - num_returns=( - num_splits * (1 + cls._PARTITIONS_METADATA_LEN) - if extract_metadata - else num_splits - ), - ).remote( - cls._get_deploy_split_func(), - *f_args, - num_splits, - *partitions, - axis=axis, - f_to_deploy=func, - f_len_args=len(f_args), - f_kwargs=f_kwargs, - extract_metadata=extract_metadata, - ) + @property + def _data_ref(self): # noqa: GL08 + if (data := self._axis_data) is not None: + return data + + data = [part._data_ref for part in self._list_of_block_partitions] + + if len(data) == 1: + self._set_axis_data(data[0]) + return data[0] + + if ( + isinstance(data[0], _GetChunk) + and (data[0].index == 0) + and (len(data) == data[0].num_splits) + and all(p.is_next_chunk(n) for p, n in zip(data[:-1], data[1:])) + ): + data = data[0].data + self._set_axis_data(data) + return data + + data = self._concat(data) + self._set_axis_data(data) + return data + + @property + def _data(self): # noqa: GL08 + data = self._axis_data + if isinstance(data, DeferredExecution): + data, self._meta, self._meta_offset = data.exec() + self._axis_data = data + return data @classmethod - def deploy_axis_func( - cls, - axis, - func, - f_args, - f_kwargs, - num_splits, - maintain_partitioning, - *partitions, - lengths=None, - manual_partition=False, - max_retries=None, - ): - """ - Deploy a function along a full axis. - - Parameters - ---------- - axis : {0, 1} - The axis to perform the function along. - func : callable - The function to perform. - f_args : list or tuple - Positional arguments to pass to ``func``. - f_kwargs : dict - Keyword arguments to pass to ``func``. - num_splits : int - The number of splits to return (see ``split_result_of_axis_func_pandas``). - maintain_partitioning : bool - If True, keep the old partitioning if possible. - If False, create a new partition layout. - *partitions : iterable - All partitions that make up the full axis (row or column). - lengths : list, optional - The list of lengths to shuffle the object. - manual_partition : bool, default: False - If True, partition the result with `lengths`. - max_retries : int, default: None - The max number of times to retry the func. + def _concat(cls, data): # noqa: GL08 + if (fn := getattr(cls, "_CONCAT_FN", None)) is None: + + def concat(dfs, axis=cls.axis): + assert len(dfs) > 1 + lengths = [df.shape[axis] for df in dfs] + df = pandas.concat(dfs, axis=0, copy=False) + df._modin_axis_partition_lengths = lengths + return df + + cls._CONCAT_FN = fn = RayWrapper.put(concat) + return DeferredExecution(data, fn) + + def _apply(self, apply_fn, args, kwargs) -> DeferredExecution: # noqa: GL08 + cls = PandasOnRayDataframeVirtualPartition + if (fn := getattr(cls, "_APPLY_FN", None)) is None: + + def apply(df, fn, *args, **kwargs): + result = fn(df, *args, **kwargs) + result._modin_axis_partition_lengths = getattr( + df, "_modin_axis_partition_lengths", None + ) + return result + + cls._APPLY_FN = fn = RayWrapper.put(apply) + fn_args = [apply_fn] + if args is not None: + fn_args.extend(args) + return DeferredExecution(self._data_ref, fn, fn_args, kwargs) - Returns - ------- - list - A list of ``ray.ObjectRef``-s. - """ - return _deploy_ray_func.options( - num_returns=(num_splits if lengths is None else len(lengths)) - * (1 + cls._PARTITIONS_METADATA_LEN), - **({"max_retries": max_retries} if max_retries is not None else {}), - ).remote( - cls._get_deploy_axis_func(), - *f_args, - num_splits, - maintain_partitioning, - *partitions, - axis=axis, - f_to_deploy=func, - f_len_args=len(f_args), - f_kwargs=f_kwargs, - manual_partition=manual_partition, - lengths=lengths, - ) + @classmethod + def _get_chunk( + cls, data, idx: int, num_splits: int, lengths: Optional[List[int]] + ) -> DeferredExecution: # noqa: GL08 + if (fn := getattr(cls, "_GET_CHUNK_FN", None)) is None: + + def get_chunk( + df, + idx, + num_splits, + start, + end, + min_chunk_len, + axis=cls.axis, + ): + if start is None: + lengths = getattr(df, "_modin_axis_partition_lengths", None) + if lengths and ( + (len(lengths) >= idx) or (sum(lengths) != df.shape[axis]) + ): + lengths = None + if lengths: + start = 0 if idx == 0 else lengths[idx - 1] + end = lengths[idx] + else: + length = df.shape[axis] + chunk_len = max(math.ceil(length / num_splits), min_chunk_len) + start = chunk_len * idx + end = start + chunk_len + chunk = df.iloc[start:end] if axis == 0 else df.iloc[:, start:end] + if isinstance(chunk.axes[axis], pandas.MultiIndex): + chunk.set_axis( + chunk.axes[axis].remove_unused_levels(), axis=axis, copy=False + ) + return chunk + + cls._GET_CHUNK_FN = fn = RayWrapper.put(get_chunk) + + if lengths: + start = 0 if idx == 0 else lengths[idx - 1] + end = lengths[idx] + else: + start = end = None + args = (idx, num_splits, start, end, MinPartitionSize.get()) + return _GetChunk(data, fn, args) + + +class _GetChunk(DeferredExecution): # noqa: GL08 + def __init__(self, data, fn, args): + self.index = args[0] + self.num_splits = args[1] + if fn is None: # The input is already split + self.is_split = True + fn = self._remote_func() + args = (args[0],) + else: + self.is_split = False + super().__init__(data, fn, args) + + def is_next_chunk(self, other): # noqa: GL08 + if not (isinstance(other, _GetChunk) and (self.args[0] == other.args[0] + 1)): + return False + d1 = self.data + d2 = other.data + if d1 is d2: + return True + if isinstance(d1, DeferredExecution) and d1.has_result: + d1 = d1.data + if isinstance(d2, DeferredExecution) and d2.has_result: + d2 = d2.data + return d1 is d2 + + @_inherit_docstrings(DeferredExecution.exec) + def exec(self): + if not self.is_split: + return super().exec() + if not self.has_result: + chunks, meta, offsets = self.data.exec() + idx = self.index + self._set_result(chunks[idx], meta, offsets[idx]) + return self.data, self.meta, self.meta_offset @classmethod - def deploy_func_between_two_axis_partitions( - cls, - axis, - func, - f_args, - f_kwargs, - num_splits, - len_of_left, - other_shape, - *partitions, - ): - """ - Deploy a function along a full axis between two data sets. - - Parameters - ---------- - axis : {0, 1} - The axis to perform the function along. - func : callable - The function to perform. - f_args : list or tuple - Positional arguments to pass to ``func``. - f_kwargs : dict - Keyword arguments to pass to ``func``. - num_splits : int - The number of splits to return (see ``split_result_of_axis_func_pandas``). - len_of_left : int - The number of values in `partitions` that belong to the left data set. - other_shape : np.ndarray - The shape of right frame in terms of partitions, i.e. - (other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition. - *partitions : iterable - All partitions that make up the full axis (row or column) for both data sets. + def _remote_func(cls): # noqa: GL08 + if (fn := getattr(cls, "_REMOTE_FN", None)) is None: - Returns - ------- - list - A list of ``ray.ObjectRef``-s. - """ - return _deploy_ray_func.options( - num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) - ).remote( - PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, - *f_args, - num_splits, - len_of_left, - other_shape, - *partitions, - axis=axis, - f_to_deploy=func, - f_len_args=len(f_args), - f_kwargs=f_kwargs, - ) + def get_chunk(chunks, index): + return chunks[index] - def wait(self): - """Wait completing computations on the object wrapped by the partition.""" - self.drain_call_queue() - futures = self.list_of_blocks - RayWrapper.wait(futures) + cls._REMOTE_FN = fn = RayWrapper.put(get_chunk) + return fn @_inherit_docstrings(PandasOnRayDataframeVirtualPartition.__init__) @@ -261,71 +462,3 @@ class PandasOnRayDataframeColumnPartition(PandasOnRayDataframeVirtualPartition): @_inherit_docstrings(PandasOnRayDataframeVirtualPartition.__init__) class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition): axis = 1 - - -@ray.remote -def _deploy_ray_func( - deployer, - *positional_args, - axis, - f_to_deploy, - f_len_args, - f_kwargs, - extract_metadata=True, - **kwargs, -): # pragma: no cover - """ - Execute a function on an axis partition in a worker process. - - This is ALWAYS called on either ``PandasDataframeAxisPartition.deploy_axis_func`` - or ``PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions``, which both - serve to deploy another dataframe function on a Ray worker process. The provided `positional_args` - contains positional arguments for both: `deployer` and for `f_to_deploy`, the parameters can be separated - using the `f_len_args` value. The parameters are combined so they will be deserialized by Ray before the - kernel is executed (`f_kwargs` will never contain more Ray objects, and thus does not require deserialization). - - Parameters - ---------- - deployer : callable - A `PandasDataFrameAxisPartition.deploy_*` method that will call ``f_to_deploy``. - *positional_args : list - The first `f_len_args` elements in this list represent positional arguments - to pass to the `f_to_deploy`. The rest are positional arguments that will be - passed to `deployer`. - axis : {0, 1} - The axis to perform the function along. This argument is keyword only. - f_to_deploy : callable or RayObjectID - The function to deploy. This argument is keyword only. - f_len_args : int - Number of positional arguments to pass to ``f_to_deploy``. This argument is keyword only. - f_kwargs : dict - Keyword arguments to pass to ``f_to_deploy``. This argument is keyword only. - extract_metadata : bool, default: True - Whether to return metadata (length, width, ip) of the result. Passing `False` may relax - the load on object storage as the remote function would return 4 times fewer futures. - Passing `False` makes sense for temporary results where you know for sure that the - metadata will never be requested. This argument is keyword only. - **kwargs : dict - Keyword arguments to pass to ``deployer``. - - Returns - ------- - list : Union[tuple, list] - The result of the function call, and metadata for it. - - Notes - ----- - Ray functions are not detected by codecov (thus pragma: no cover). - """ - f_args = positional_args[:f_len_args] - deploy_args = positional_args[f_len_args:] - result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs) - if not extract_metadata: - return result - ip = get_node_ip_address() - if isinstance(result, pandas.DataFrame): - return result, len(result), len(result.columns), ip - elif all(isinstance(r, pandas.DataFrame) for r in result): - return [i for r in result for i in [r, len(r), len(r.columns), ip]] - else: - return [i for r in result for i in [r, None, None, ip]]