From 128509da3279af7c680a626e12709e19ff847d9f Mon Sep 17 00:00:00 2001 From: Andrey Pavlenko Date: Tue, 19 Mar 2024 00:42:12 +0100 Subject: [PATCH] perf --- .../ray/common/deferred_execution.py | 111 ++++++------------ .../execution/ray/common/engine_wrapper.py | 2 +- .../partitioning/partition_manager.py | 68 ++++++++++- .../partitioning/virtual_partition.py | 84 +++++++++---- 4 files changed, 162 insertions(+), 103 deletions(-) diff --git a/modin/core/execution/ray/common/deferred_execution.py b/modin/core/execution/ray/common/deferred_execution.py index dc066e737ec..e2067a0bb9a 100644 --- a/modin/core/execution/ray/common/deferred_execution.py +++ b/modin/core/execution/ray/common/deferred_execution.py @@ -161,24 +161,6 @@ def exec( and self.flat_kwargs and self.num_returns == 1 ): - # self.data = RayWrapper.materialize(self.data) - # self.args = [ - # RayWrapper.materialize(o) if isinstance(o, ray.ObjectRef) else o - # for o in self.args - # ] - # self.kwargs = { - # k: RayWrapper.materialize(o) if isinstance(o, ray.ObjectRef) else o - # for k, o in self.kwargs.items() - # } - # obj = _REMOTE_EXEC.exec_func( - # RayWrapper.materialize(self.func), self.data, self.args, self.kwargs - # ) - # result, length, width, ip = ( - # obj, - # len(obj) if hasattr(obj, "__len__") else 0, - # len(obj.columns) if hasattr(obj, "columns") else 0, - # "", - # ) result, length, width, ip = remote_exec_func.remote( self.func, self.data, *self.args, **self.kwargs ) @@ -191,13 +173,6 @@ def exec( self.subscribers += 2 consumers, output = self._deconstruct() - # assert not any(isinstance(o, ListOrTuple) for o in output) - # tmp = [ - # RayWrapper.materialize(o) if isinstance(o, ray.ObjectRef) else o - # for o in output - # ] - # list(_REMOTE_EXEC.construct(tmp)) - # The last result is the MetaList, so adding +1 here. num_returns = sum(c.num_returns for c in consumers) + 1 results = self._remote_exec_chain(num_returns, *output) @@ -336,7 +311,9 @@ def _deconstruct_chain( out_extend = output.extend while True: de.unsubscribe() - if not de.has_result and (out_pos := getattr(de, "out_pos", None)): + if not (has_result := de.has_result) and ( + out_pos := getattr(de, "out_pos", None) + ): out_append(_Tag.REF) out_append(out_pos) output[out_pos] = out_pos @@ -357,7 +334,7 @@ def _deconstruct_chain( ) else: out_append(data) - if not de.has_result: + if not has_result: stack.append(de) break else: @@ -425,28 +402,24 @@ def _deconstruct_list( """ for obj in lst: if isinstance(obj, DeferredExecution): - if out_pos := getattr(obj, "out_pos", None): + if obj.has_result: + obj = obj.data + elif out_pos := getattr(obj, "out_pos", None): obj.unsubscribe() - if obj.has_result: - if isinstance(obj.data, ListOrTuple): - out_append(_Tag.LIST) - yield cls._deconstruct_list( - obj.data, output, stack, result_consumers, out_append - ) - else: - out_append(obj.data) - else: - out_append(_Tag.REF) - out_append(out_pos) - output[out_pos] = out_pos - if obj.subscribers == 0: - output[out_pos + 1] = 0 - result_consumers.remove(obj) + out_append(_Tag.REF) + out_append(out_pos) + output[out_pos] = out_pos + if obj.subscribers == 0: + output[out_pos + 1] = 0 + result_consumers.remove(obj) + continue else: out_append(_Tag.CHAIN) yield cls._deconstruct_chain(obj, output, stack, result_consumers) out_append(_Tag.END) - elif isinstance(obj, ListOrTuple): + continue + + if isinstance(obj, ListOrTuple): out_append(_Tag.LIST) yield cls._deconstruct_list( obj, output, stack, result_consumers, out_append @@ -517,27 +490,13 @@ class DeferredGetItem(DeferredExecution): ---------- data : ObjectRefOrDeType The object to get the item from. - idx : int + index : int The item index. """ - def __init__(self, data: ObjectRefOrDeType, idx: int): - super().__init__(data, self._remote_fn(), [idx]) - self.index = idx - - @_inherit_docstrings(DeferredExecution.exec) - def exec(self) -> Tuple[ObjectRefType, "MetaList", int]: - if self.has_result: - return self.data, self.meta, self.meta_offset - - if not isinstance(self.data, DeferredExecution) or self.data.num_returns == 1: - return super().exec() - - # If `data` is a `DeferredExecution`, that returns multiple results, - # it's not required to execute `_remote_fn()`. We can only execute - # `data` and get the result by index. - self._data_exec() - return self.data, self.meta, self.meta_offset + def __init__(self, data: ObjectRefOrDeType, index: int): + super().__init__(data, self._remote_fn(), [index]) + self.index = index @property @_inherit_docstrings(DeferredExecution.has_result) @@ -550,16 +509,18 @@ def has_result(self): and self.data.has_result and self.data.num_returns != 1 ): - self._data_exec() + # If `data` is a `DeferredExecution`, that returns multiple results, + # it's not required to execute `_remote_fn()`. We can only execute + # `data` and get the result by index. + self._set_result( + self.data.data[self.index], + self.data.meta, + self.data.meta_offset[self.index], + ) return True return False - def _data_exec(self): - """Execute the `data` task and get the result.""" - obj, meta, offsets = self.data.exec() - self._set_result(obj[self.index], meta, offsets[self.index]) - @classmethod def _remote_fn(cls) -> ObjectRefType: """ @@ -592,7 +553,8 @@ def __init__(self, obj: Union[ray.ObjectID, ClientObjectRef, List]): def materialize(self): """Materialized the list, if required.""" - self._obj = RayWrapper.materialize(self._obj) + if not isinstance(self._obj, list): + self._obj = RayWrapper.materialize(self._obj) def __getitem__(self, index): """ @@ -632,14 +594,13 @@ class MetaListHook(MaterializationHook, DeferredGetItem): ---------- meta : MetaList Non-materialized list to get the value from. - idx : int + index : int The value index in the list. """ - def __init__(self, meta: MetaList, idx: int): - super().__init__(meta._obj, idx) + def __init__(self, meta: MetaList, index: int): + super().__init__(meta._obj, index) self.meta = meta - self.idx = idx def pre_materialize(self): """ @@ -650,7 +611,7 @@ def pre_materialize(self): object """ obj = self.meta._obj - return obj[self.idx] if isinstance(obj, list) else obj + return obj[self.index] if isinstance(obj, list) else obj def post_materialize(self, materialized): """ @@ -665,7 +626,7 @@ def post_materialize(self, materialized): object """ self.meta._obj = materialized - return materialized[self.idx] + return materialized[self.index] class _Tag(Enum): # noqa: PR01 diff --git a/modin/core/execution/ray/common/engine_wrapper.py b/modin/core/execution/ray/common/engine_wrapper.py index 9050e168ab4..178b25e1d03 100644 --- a/modin/core/execution/ray/common/engine_wrapper.py +++ b/modin/core/execution/ray/common/engine_wrapper.py @@ -106,7 +106,7 @@ def materialize(cls, obj_id): Parameters ---------- - obj_id : ray.ObjectID + obj_id : ObjectRefTypes Ray object identifier to get the value by. Returns diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 0748efb219a..06029a91bcd 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -12,13 +12,16 @@ # governing permissions and limitations under the License. """Module houses class that implements ``GenericRayDataframePartitionManager`` using Ray.""" +import math import numpy as np +import pandas from pandas.core.dtypes.common import is_numeric_dtype -from modin.config import AsyncReadMode +from modin.config import AsyncReadMode, MinPartitionSize from modin.core.execution.modin_aqp import progress_bar_wrapper from modin.core.execution.ray.common import RayWrapper +from modin.core.execution.ray.common.deferred_execution import DeferredExecution from modin.core.execution.ray.generic.partitioning import ( GenericRayDataframePartitionManager, ) @@ -29,6 +32,7 @@ from .virtual_partition import ( PandasOnRayDataframeColumnPartition, PandasOnRayDataframeRowPartition, + PandasOnRayDataframeVirtualPartition, ) @@ -42,6 +46,68 @@ class PandasOnRayDataframePartitionManager(GenericRayDataframePartitionManager): _execution_wrapper = RayWrapper materialize_futures = RayWrapper.materialize + @classmethod + @_inherit_docstrings(GenericRayDataframePartitionManager.get_indices) + def get_indices(cls, axis, partitions, index_func=None): + partitions = partitions.T if axis == 0 else partitions + if len(partitions) == 0: + return pandas.Index([]), [] + + partitions = [part for part in partitions[0]] + non_split, lengths, _ = ( + PandasOnRayDataframeVirtualPartition.find_non_split_block(partitions) + ) + if non_split is not None: + partitions = [non_split] + else: + partitions = [part._data for part in partitions] + + if index_func is None: + attr_name = f"_GET_AXIS_{axis}" + if (fn := getattr(cls, attr_name, None)) is None: + + def get_cols(*dfs, axis=axis): + return [df.axes[axis] for df in dfs] + + setattr(cls, attr_name, get_cols) + fn = RayWrapper.put(get_cols) + data, args = partitions[0], partitions[1:] + else: + if (fn := getattr(cls, "_GET_AXIS_FN", None)) is None: + + def apply_index(fn, *dfs): + return [fn(df) for df in dfs] + + cls._GET_AXIS_FN = fn = RayWrapper.put(apply_index) + data, args = index_func, partitions + + de = DeferredExecution(data, fn, args, num_returns=len(partitions)) + part_indices = de.exec()[0] + + if non_split is not None: + materialized = RayWrapper.materialize([part_indices] + lengths) + idx = materialized[0][0] + lengths = materialized[1:] + + if (idx_len := len(idx)) != sum(lengths): + count = len(lengths) + chunk_len = max(math.ceil(idx_len / count), MinPartitionSize.get()) + lengths = [chunk_len] * count + + part_indices = [] + start = 0 + for length in lengths: + end = start + length + part_indices.append(idx[start:end]) + start = end + return idx, part_indices + + part_indices = RayWrapper.materialize(part_indices) + indices = [idx for idx in part_indices if len(idx)] + if len(indices) == 0: + return part_indices[0], part_indices + return indices[0].append(indices[1:]), part_indices + @classmethod def wait_partitions(cls, partitions): """ diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 17b3027c467..55295fe9ec1 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -13,7 +13,7 @@ """Module houses classes responsible for storing a virtual partition and applying a function to it.""" import math -from typing import Collection, List, Optional, Set, Union +from typing import Collection, Iterable, List, Optional, Set, Tuple, Union import pandas import ray @@ -95,28 +95,60 @@ def __init__( self._num_splits = len(data) self._list_of_block_partitions = data - refs = [part._data_ref for part in data] + non_split, lengths, full_concat = self.find_non_split_block(data) + if non_split is not None: + self._chunk_lengths_cache = lengths + if full_concat: + self._set_data_ref(non_split) + return + # TODO: We have a subset of the same frame here and can just get a single chunk + # from the original frame instead of concatenating all these chunks. + + self._set_data_ref(self._concat([part._data for part in data])) + + @staticmethod + def find_non_split_block( + partitions: Iterable[PandasOnRayDataframePartition], + ) -> Tuple[ + Union[ObjectRefOrDeType, None], + Union[List[Union[ObjectRefType, int]], None], + bool, + ]: + """ + Find a non-split block if there is one. + + If all the partitions are the sequential chunks of the same DataFrame, + the concatenation of all these chunks will give an identical DF. Thus, we + don't need to concatenate but can get the original one instead. + + Parameters + ---------- + partitions : list of PandasOnRayDataframePartition + + Returns + ------- + ObjectRefOrDeType or None + The non-split block or None if not found. + list of lengths or None + Estimated chunk lengths, that could be different form the real ones. + bool + Whether the specified partitions represent the full block or just the + first part of this block. + """ + refs = [part._data_ref for part in partitions] if ( isinstance(refs[0], _DeferredGetChunk) and isinstance(split := refs[0].data, _DeferredSplit) and (refs[0].index == 0) and all(prev.is_next_chunk(next) for prev, next in zip(refs[:-1], refs[1:])) ): - if all(chunk.length is not None for chunk in refs): - self._chunk_lengths_cache = [chunk.length for chunk in refs] - - if split.num_splits == refs[-1].index + 1: - # All the partitions are the chunks of the same DataFrame. Concatenation of - # all these chunks will get a df identical to the original one. Thus, we - # don't need to concatenate but can get the original one instead. - self._set_data_ref(split.data) - return - - # TODO: We have a subset of the same frame here and can just get a single chunk - # from the original frame instead of concatenating all these chunks. - - self._set_data_ref(self._concat([part._data for part in data])) + return ( + split.data, + [ref.length for ref in refs], + split.num_splits == refs[-1].index + 1, + ) + return None, None, False def _set_data_ref( self, data: Union[DeferredExecution, ObjectRefType] @@ -202,20 +234,17 @@ def add_to_apply_calls(self, func, *args, length=None, width=None, **kwargs): def split( self, split_func, num_splits, f_args=None, f_kwargs=None, extract_metadata=False ) -> List[PandasOnRayDataframePartition]: - de = DeferredExecution( + chunks, meta, offsets = DeferredExecution( self._data_ref, split_func, args=f_args, kwargs=f_kwargs, num_returns=num_splits, - ) - - if num_splits > 1: - return [ - PandasOnRayDataframePartition(DeferredGetItem(de, i)) - for i in range(num_splits) - ] - return [PandasOnRayDataframePartition(de)] + ).exec() + return [ + PandasOnRayDataframePartition(chunks[i], meta=meta, meta_offset=offsets[i]) + for i in range(num_splits) + ] @property def _length_cache(self): # noqa: GL08 @@ -224,6 +253,8 @@ def _length_cache(self): # noqa: GL08 def length(self, materialize=True): # noqa: GL08 if self._length_cache is None: self._calculate_lengths(materialize) + elif materialize: + self._meta.materialize() return self._length_cache @property @@ -233,6 +264,8 @@ def _width_cache(self): # noqa: GL08 def width(self, materialize=True): # noqa: GL08 if self._width_cache is None: self._calculate_lengths(materialize) + elif materialize: + self._meta.materialize() return self._width_cache def _calculate_lengths(self, materialize=True): # noqa: GL08 @@ -422,7 +455,6 @@ def __init__( class _DeferredGetChunk(DeferredGetItem): # noqa: GL08 def __init__(self, split: _DeferredSplit, index: int, length: Optional[int] = None): super().__init__(split, index) - self.split = split self.length = length def __del__(self):