From 9534478597c74a70943cba6fdde5f78f48a8fce9 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Sun, 27 Nov 2022 23:31:17 +0100 Subject: [PATCH] PERF-#5268: Call `get` on all partitions at once in `to_pandas` (#4776) Co-authored-by: Vasily Litvinov Co-authored-by: Dmitry Chigarev Signed-off-by: Myachev --- .../pandas/partitioning/partition_manager.py | 19 ++++++++++++++++++- .../partitioning/partition_manager.py | 3 +++ .../partitioning/partition_manager.py | 3 +++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 9003fc527b0..c0f41740030 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -641,7 +641,24 @@ def to_pandas(cls, partitions): pandas.DataFrame A pandas DataFrame """ - retrieved_objects = [[obj.to_pandas() for obj in part] for part in partitions] + retrieved_objects = cls.get_objects_from_partitions(partitions.flatten()) + if all( + isinstance(obj, (pandas.DataFrame, pandas.Series)) + for obj in retrieved_objects + ): + height, width, *_ = tuple(partitions.shape) + (0,) + # restore 2d array + objs = iter(retrieved_objects) + retrieved_objects = [ + [next(objs) for _ in range(width)] for __ in range(height) + ] + else: + # Partitions do not always contain pandas objects, for example, hdk uses pyarrow tables. + # This implementation comes from the fact that calling `partition.get` + # function is not always equivalent to `partition.to_pandas`. + retrieved_objects = [ + [obj.to_pandas() for obj in part] for part in partitions + ] if all( isinstance(part, pandas.Series) for row in retrieved_objects for part in row ): diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py index d32243c9747..b945f3d65b4 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/partition_manager.py @@ -51,6 +51,9 @@ def get_objects_from_partitions(cls, partitions): list The objects wrapped by `partitions`. """ + for idx, part in enumerate(partitions): + if hasattr(part, "force_materialization"): + partitions[idx] = part.force_materialization() assert all( [len(partition.list_of_blocks) == 1 for partition in partitions] ), "Implementation assumes that each partition contains a signle block." diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py index 91a720bcb3e..baa4d5cb38c 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition_manager.py @@ -108,6 +108,9 @@ def get_objects_from_partitions(cls, partitions): list The objects wrapped by `partitions`. """ + for idx, part in enumerate(partitions): + if hasattr(part, "force_materialization"): + partitions[idx] = part.force_materialization() assert all( [len(partition.list_of_blocks) == 1 for partition in partitions] ), "Implementation assumes that each partition contains a signle block."