From 830ea4274b89e5f93df15d9e48567f79ddc40a29 Mon Sep 17 00:00:00 2001 From: Huite Bootsma Date: Tue, 9 Jul 2024 11:09:58 +0200 Subject: [PATCH] Merge partition chunks --- docs/changelog.rst | 15 ++++++++++++++- tests/test_partitioning.py | 21 +++++++++++++++++++++ xugrid/ugrid/partitioning.py | 13 ++++++++++++- 3 files changed, 47 insertions(+), 2 deletions(-) diff --git a/docs/changelog.rst b/docs/changelog.rst index 464be000a..5614e6576 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -9,10 +9,23 @@ The format is based on `Keep a Changelog`_, and this project adheres to [Unreleased] ------------ +Fixed +~~~~~ + +- :func:`xugrid.merge_partitions` now automatically merges chunks (if defined + in the partition datasets). This removes the commonly seen + ``PerformanceWarning: Slicing with an out-of-order index is generating ... + times more chunks`` warning in subsequent operations, and also greatly + improves the performance of subsequent operations (roughly scaling linearly + with the number of partitions). The previous behavior can be maintained by + setting ``merge_ugrid_chunks=False``. This keyword will likely be deprecated + in the future as merging the UGRID dimension chunks should be superior for + (almost all?) subsquent operations. + Added ~~~~~ -- included ``edge_node_connectivity`` in :meth:`xugrid.Ugrid2d.from_meshkernel`, +- Included ``edge_node_connectivity`` in :meth:`xugrid.Ugrid2d.from_meshkernel`, so the ordering of edges is consistent with ``meshkernel``. [0.10.0] 2024-05-01 diff --git a/tests/test_partitioning.py b/tests/test_partitioning.py index 19b3a3b4c..5b0b3b255 100644 --- a/tests/test_partitioning.py +++ b/tests/test_partitioning.py @@ -358,3 +358,24 @@ def test_merge_partitions__inconsistent_grid_types(self): assert merged["c"] == 1 assert self.dataset_expected.equals(merged) + + def test_merge_partitions_merge_chunks(self): + # Dataset has no chunks defined, chunks should not appear. + merged = pt.merge_partitions(self.datasets_parts) + assert len(merged.chunks) == 0 + + # Dataset has chunks, keyword is True, chunks should be size 1. + datasets_parts = [ + part.expand_dims({"time": 3}).chunk({"time": 1}) + for part in self.datasets_parts + ] + merged = pt.merge_partitions(datasets_parts) + assert len(merged.chunks["mesh2d_nFaces"]) == 1 + assert len(merged.chunks["mesh1d_nEdges"]) == 1 + assert len(merged.chunks["time"]) == 3 + + # Dataset has chunks, keyword is False, chunks should be size npartition. + merged = pt.merge_partitions(datasets_parts, merge_ugrid_chunks=False) + assert len(merged.chunks["mesh2d_nFaces"]) == 2 + assert len(merged.chunks["mesh1d_nEdges"]) == 2 + assert len(merged.chunks["time"]) == 3 diff --git a/xugrid/ugrid/partitioning.py b/xugrid/ugrid/partitioning.py index 5d5a6e2fb..4b42e7a6c 100644 --- a/xugrid/ugrid/partitioning.py +++ b/xugrid/ugrid/partitioning.py @@ -305,7 +305,7 @@ def merge_data_along_dim( return xr.concat(to_merge, dim=merge_dim) -def merge_partitions(partitions): +def merge_partitions(partitions, merge_ugrid_chunks: bool = True): """ Merge topology and data, partitioned along UGRID dimensions, into a single UgridDataset. @@ -320,6 +320,8 @@ def merge_partitions(partitions): Parameters ---------- partitions : sequence of UgridDataset or UgridDataArray + merge_ugrid_chunks: bool, default is True. + Whether to merge chunks along the UGRID topology dimensions. Returns ------- @@ -373,4 +375,13 @@ def merge_partitions(partitions): ) merged.update(merged_selection) + # Merge chunks along the UGRID dimensions. + if merged.chunks and merge_ugrid_chunks: + chunks = dict(merged.chunks) + for dim in chunks: + # Define a single chunk for each UGRID dimension. + if dim in ugrid_dims: + chunks[dim] = (merged.dims[dim],) + merged = merged.chunk(chunks) + return UgridDataset(merged, merged_grids)