From e78f3ca811416dc28795c38a5e42b67278b8249e Mon Sep 17 00:00:00 2001 From: David Zwicker Date: Mon, 16 Oct 2023 14:19:40 +0100 Subject: [PATCH] Multiple smaller improvements * Replace Union by | * Allow multiprocessing in determining emulsions from data * Add function for refining multiple droplets --- codecov.yml | 11 ----- droplets/droplet_tracks.py | 53 +++++++++++++++----- droplets/emulsions.py | 56 +++++++++++++++------ droplets/image_analysis.py | 96 +++++++++++++++++++++++++++++++----- droplets/trackers.py | 10 ++-- tests/test_emulsion.py | 25 +++++++++- tests/test_image_analysis.py | 20 ++++++++ 7 files changed, 213 insertions(+), 58 deletions(-) delete mode 100644 codecov.yml diff --git a/codecov.yml b/codecov.yml deleted file mode 100644 index 8a414c2..0000000 --- a/codecov.yml +++ /dev/null @@ -1,11 +0,0 @@ -coverage: - status: - project: - default: - threshold: 1% - patch: off -ignore: - - "**/tests/*" - - "**/tests/*.py" - - "*/tests/*.py" - - "test_*.py" diff --git a/droplets/droplet_tracks.py b/droplets/droplet_tracks.py index 5a43e8c..165c7c9 100644 --- a/droplets/droplet_tracks.py +++ b/droplets/droplet_tracks.py @@ -15,7 +15,7 @@ import json import logging -from typing import Callable, List, Optional, Union +from typing import Callable, List, Literal, Optional import numpy as np from numpy.lib import recfunctions as rfn @@ -118,7 +118,7 @@ def __len__(self): """number of time points""" return len(self.times) - def __getitem__(self, key: Union[int, slice]): + def __getitem__(self, key: int | slice): """return the droplets identified by the given index/slice""" result = self.droplets.__getitem__(key) if isinstance(key, slice): @@ -357,7 +357,12 @@ def to_file(self, path: str, info: Optional[InfoDict] = None) -> None: @plot_on_axes() def plot( - self, attribute: str = "radius", smoothing: float = 0, ax=None, **kwargs + self, + attribute: str = "radius", + smoothing: float = 0, + t_max: Optional[float] = None, + ax=None, + **kwargs, ) -> PlotReference: """plot the time evolution of the droplet @@ -390,7 +395,14 @@ def plot( data = self.get_trajectory(smoothing=smoothing, attribute=attribute) ylabel = attribute.capitalize() - (line,) = ax.plot(self.times, data, **kwargs) + if t_max is not None and len(self.times) >= 2 and self.times[-1] < t_max: + dt = self.times[-1] - self.times[-2] + times = np.r_[self.times, self.times[-1] + dt] + data = np.r_[data, 0] + else: + times = self.times + + (line,) = ax.plot(times, data, **kwargs) ax.set_xlabel("Time") ax.set_ylabel(ylabel) return PlotReference(ax, line, {"attribute": attribute}) @@ -469,7 +481,7 @@ def plot_positions( class DropletTrackList(list): """a list of instances of :class:`DropletTrack`""" - def __getitem__(self, key: Union[int, slice]): # type: ignore + def __getitem__(self, key: int | slice): # type: ignore """return the droplets identified by the given index/slice""" result = super().__getitem__(key) if isinstance(key, slice): @@ -482,7 +494,7 @@ def from_emulsion_time_course( cls, time_course: EmulsionTimeCourse, *, - method: str = "overlap", + method: Literal["distance", "overlap"] = "overlap", grid: Optional[GridBase] = None, progress: bool = False, **kwargs, @@ -554,7 +566,7 @@ def match_tracks( # calculate the distance between droplets if tracks_alive: if grid is None: - metric: Union[str, Callable] = "euclidean" + metric: str | Callable = "euclidean" else: metric = grid.distance_real points_prev = [track.last.position for track in tracks_alive] @@ -603,8 +615,10 @@ def match_tracks( def from_storage( cls, storage: StorageBase, + *, + method: Literal["distance", "overlap"] = "overlap", refine: bool = False, - method: str = "overlap", + num_processes: int | Literal["auto"] = 1, progress: Optional[bool] = None, ) -> DropletTrackList: r"""obtain droplet tracks from stored scalar field data @@ -615,14 +629,17 @@ def from_storage( Args: storage (:class:`~pde.storage.base.StorageBase`): The phase fields for many time instances - refine (bool): - Flag determining whether the droplet properties should be refined - using fitting. This is a potentially slow procedure. method (str): The method used for tracking droplet identities. Possible methods are "overlap" (adding droplets that overlap with those in previous frames) and "distance" (matching droplets to minimize center-to-center distances). + refine (bool): + Flag determining whether the droplet properties should be refined + using fitting. This is a potentially slow procedure. + num_processes (int or "auto"): + Number of processes used for the refinement. If set to "auto", the + number of processes is choosen automatically. progress (bool): Whether to show the progress of the process. If `None`, the progress is not shown, except for the first step if `refine` is `True`. @@ -630,7 +647,9 @@ def from_storage( Returns: :class:`DropletTrackList`: the resulting droplet tracks """ - etc = EmulsionTimeCourse.from_storage(storage, refine=refine, progress=progress) + etc = EmulsionTimeCourse.from_storage( + storage, refine=refine, num_processes=num_processes, progress=progress + ) if progress is None: progress = False return cls.from_emulsion_time_course(etc, method=method, progress=progress) @@ -717,11 +736,19 @@ def plot(self, attribute: str = "radius", ax=None, **kwargs) -> PlotReference: else: kwargs["color"] = "k" # use black by default + # get maximal time + if self: + t_max = max(track.times[-1] for track in self if len(track.times) > 0) + else: + t_max = None + # adjust alpha such that multiple tracks are visible well kwargs.setdefault("alpha", min(0.8, 20 / len(self))) elements = [] for track in self: - elements.append(track.plot(attribute=attribute, ax=ax, **kwargs).element) + elements.append( + track.plot(attribute=attribute, t_max=t_max, ax=ax, **kwargs).element + ) kwargs["label"] = "" # set potential plot label only for first track return PlotReference(ax, elements, {"attribute": attribute}) diff --git a/droplets/emulsions.py b/droplets/emulsions.py index b7f0597..abf6f87 100644 --- a/droplets/emulsions.py +++ b/droplets/emulsions.py @@ -12,6 +12,7 @@ from __future__ import annotations +import functools import json import logging import math @@ -24,11 +25,11 @@ Iterable, Iterator, List, + Literal, Optional, Sequence, Tuple, Type, - Union, overload, ) @@ -62,7 +63,7 @@ def __init__( droplets: Optional[Iterable[SphericalDroplet]] = None, *, copy: bool = True, - dtype: Union[np.typing.DTypeLike, np.ndarray, SphericalDroplet] = None, + dtype: np.typing.DTypeLike | np.ndarray | SphericalDroplet = None, force_consistency: bool = False, grid: Optional[GridBase] = None, ): @@ -121,8 +122,8 @@ def empty(cls, droplet: SphericalDroplet) -> Emulsion: def from_random( cls, num: int, - grid_or_bounds: Union[GridBase, Sequence[Tuple[float, float]]], - radius: Union[float, Tuple[float, float]], + grid_or_bounds: GridBase | Sequence[Tuple[float, float]], + radius: float | Tuple[float, float], *, remove_overlapping: bool = True, droplet_class: Type[SphericalDroplet] = SphericalDroplet, @@ -267,7 +268,10 @@ def append( force_consistency (bool, optional): Whether to ensure that all droplets are of the same type """ - if self.dtype is None: + # during some multiprocessing examples, Emulsions might apparently not have + # a proper `dtype` define. This might be because they use some copying or + # __getstate__ methods of the underlying list class + if not hasattr(self, "dtype") or self.dtype is None: self.dtype = droplet.data.dtype elif force_consistency and self.dtype != droplet.data.dtype: raise ValueError( @@ -646,7 +650,7 @@ def plot( color_value: Optional[Callable] = None, cmap=None, norm=None, - colorbar: Union[bool, str] = True, + colorbar: bool | str = True, **kwargs, ) -> PlotReference: """plot the current emulsion together with a corresponding field @@ -791,7 +795,7 @@ class EmulsionTimeCourse: def __init__( self, emulsions: Optional[Iterable[Emulsion]] = None, - times: Union[np.ndarray, Sequence[float], None] = None, + times: np.ndarray | Sequence[float] | None = None, ) -> None: """ Args: @@ -858,7 +862,7 @@ def __repr__(self): def __len__(self): return len(self.times) - def __getitem__(self, key: Union[int, slice]): + def __getitem__(self, key: int | slice): """return the information for the given index""" result = self.emulsions.__getitem__(key) if isinstance(key, slice): @@ -882,6 +886,8 @@ def __eq__(self, other): def from_storage( cls, storage: StorageBase, + *, + num_processes: int | Literal["auto"] = 1, refine: bool = False, progress: Optional[bool] = None, **kwargs, @@ -894,9 +900,13 @@ def from_storage( refine (bool): Flag determining whether the droplet properties should be refined using fitting. This is a potentially slow procedure. + num_processes (int or "auto"): + Number of processes used for the refinement. If set to "auto", the + number of processes is choosen automatically. progress (bool): Whether to show the progress of the process. If `None`, the progress is - only shown when `refine` is `True`. + only shown when `refine` is `True`. Progress bars are only shown for + serial calculations (where `num_processes == 1`). \**kwargs: All other parameters are forwarded to the :meth:`~droplets.image_analysis.locate_droplets`. @@ -906,14 +916,28 @@ def from_storage( """ from .image_analysis import locate_droplets - if progress is None: - progress = refine # show progress only when refining by default + if num_processes == 1: + # obtain the emulsion data for all frames in this process - # obtain the emulsion data for all frames - emulsions = ( - locate_droplets(frame, refine=refine, **kwargs) - for frame in display_progress(storage, enabled=progress) - ) + if progress is None: + progress = refine # show progress only when refining by default + + emulsions: Iterable[Emulsion] = ( + locate_droplets(frame, refine=refine, **kwargs) + for frame in display_progress(storage, enabled=progress) + ) + + else: + # use multiprocessing to obtain emulsion data + from concurrent.futures import ProcessPoolExecutor + + _get_emulsion: Callable[[Emulsion], Emulsion] = functools.partial( + locate_droplets, refine=refine, **kwargs + ) + + max_workers = None if num_processes == "auto" else num_processes + with ProcessPoolExecutor(max_workers=max_workers) as executor: + emulsions = list(executor.map(_get_emulsion, storage)) return cls(emulsions, times=storage.times) diff --git a/droplets/image_analysis.py b/droplets/image_analysis.py index a4e91dc..ff4ae0e 100644 --- a/droplets/image_analysis.py +++ b/droplets/image_analysis.py @@ -5,6 +5,7 @@ :nosignatures: locate_droplets + refine_droplets refine_droplet get_structure_factor get_length_scale @@ -13,12 +14,25 @@ .. codeauthor:: David Zwicker """ +from __future__ import annotations + +import functools import logging import math import warnings from functools import reduce from itertools import product -from typing import Any, Dict, Iterable, List, Literal, Optional, Sequence, Tuple, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Literal, + Optional, + Sequence, + Tuple, +) import numpy as np from numpy.lib.recfunctions import ( @@ -120,8 +134,8 @@ def _locate_droplets_in_mask_cartesian(mask: ScalarField) -> Emulsion: # connect clusters linked viaperiodic boundary conditions for ax in np.flatnonzero(grid.periodic): # look at all periodic axes # compile list of all boundary points connected along the current axis - low: List[Union[List[int], np.ndarray]] = [] - high: List[Union[List[int], np.ndarray]] = [] + low: List[List[int] | np.ndarray] = [] + high: List[List[int] | np.ndarray] = [] for a in range(grid.num_axes): if a == ax: low.append([0]) @@ -344,13 +358,14 @@ def locate_droplets_in_mask(mask: ScalarField) -> Emulsion: def locate_droplets( phase_field: ScalarField, - threshold: Union[float, Literal["auto", "extrema", "mean", "otsu"]] = 0.5, + threshold: float | Literal["auto", "extrema", "mean", "otsu"] = 0.5, *, minimal_radius: float = 0, modes: int = 0, interface_width: Optional[float] = None, refine: bool = False, refine_args: Optional[Dict[str, Any]] = None, + num_processes: int | Literal["auto"] = 1, ) -> Emulsion: """Locates droplets in the phase field @@ -404,6 +419,9 @@ def locate_droplets( refine_args (dict): Additional keyword arguments passed on to :func:`refine_droplet`. Only has an effect if `refine=True`. + num_processes (int): + Number of processes used for the refinement. If set to "auto", the number of + processes is choosen automatically. Returns: :class:`~droplets.emulsions.Emulsion`: All detected droplets @@ -461,14 +479,14 @@ def locate_droplets( if droplet_class != droplet.__class__: droplet = droplet_class.from_droplet(droplet, **args) - # refine droplets if necessary - if refine: - try: - droplet = refine_droplet(phase_field, droplet, **refine_args) - except ValueError: - continue # do not add the droplet to the list droplets.append(droplet) + # refine droplets if necessary + if refine: + droplets = refine_droplets( + phase_field, droplets, num_processes=num_processes, **refine_args + ) + # return droplets as an emulsion emulsion = Emulsion(droplets) if minimal_radius > -np.inf: @@ -476,6 +494,58 @@ def locate_droplets( return emulsion +def refine_droplets( + phase_field: ScalarField, + candidates: Iterable[DiffuseDroplet], + *, + num_processes: int | Literal["auto"] = 1, + **kwargs, +) -> List[DiffuseDroplet]: + r"""Refines many droplets by fitting to phase field + + Args: + phase_field (:class:`~pde.fields.ScalarField`): + Phase_field that is being used to refine the droplet + droplets (sequence of :class:`~droplets.droplets.SphericalDroplet`): + Droplets that should be refined. + num_processes (int or "auto"): + Number of processes used for the refinement. If set to "auto", the number of + processes is choosen automatically. + \**kwargs (dict): + Additional keyword arguments are passed on to :func:`refine_droplet`. + + Returns: + list of :class:`~droplets.droplets.DiffuseDroplet`: + The refined droplets + """ + + if num_processes == 1: + # refine droplets serially in this process + droplets = [ + drop + for candidate in candidates + if (drop := refine_droplet(phase_field, candidate, **kwargs)) is not None + ] + + else: + # use multiprocessing to refine droplets + from concurrent.futures import ProcessPoolExecutor + + _refine_one: Callable[[DiffuseDroplet], DiffuseDroplet] = functools.partial( + refine_droplet, phase_field, **kwargs + ) + + max_workers = None if num_processes == "auto" else num_processes + with ProcessPoolExecutor(max_workers=max_workers) as executor: + droplets = [ + candidate_refined + for candidate_refined in executor.map(_refine_one, candidates) + if candidate_refined is not None + ] + + return droplets + + def refine_droplet( phase_field: ScalarField, droplet: DiffuseDroplet, @@ -609,8 +679,8 @@ def _image_deviation(params): def get_structure_factor( scalar_field: ScalarField, - smoothing: Union[None, float, Literal["auto", "none"]] = "auto", - wave_numbers: Union[Sequence[float], Literal["auto"]] = "auto", + smoothing: None | float | Literal["auto", "none"] = "auto", + wave_numbers: Sequence[float] | Literal["auto"] = "auto", add_zero: bool = False, ) -> Tuple[np.ndarray, np.ndarray]: r"""Calculates the structure factor associated with a field @@ -721,7 +791,7 @@ def get_length_scale( "structure_factor_mean", "structure_factor_maximum", "droplet_detection" ] = "structure_factor_maximum", **kwargs, -) -> Union[float, Tuple[float, Any]]: +) -> float | Tuple[float, Any]: """Calculates a length scale associated with a phase field Args: diff --git a/droplets/trackers.py b/droplets/trackers.py index 87dfced..4162320 100644 --- a/droplets/trackers.py +++ b/droplets/trackers.py @@ -10,8 +10,10 @@ .. codeauthor:: David Zwicker """ +from __future__ import annotations + import math -from typing import Any, Callable, Dict, List, Literal, Optional, Union +from typing import Any, Callable, Dict, List, Literal, Optional from pde.fields.base import FieldBase from pde.tools.docstrings import fill_in_docstring @@ -39,7 +41,7 @@ def __init__( method: Literal[ "structure_factor_mean", "structure_factor_maximum", "droplet_detection" ] = "structure_factor_mean", - source: Union[None, int, Callable] = None, + source: None | int | Callable = None, verbose: bool = False, ): r""" @@ -132,8 +134,8 @@ def __init__( filename: Optional[str] = None, *, emulsion_timecourse=None, - source: Union[None, int, Callable] = None, - threshold: Union[float, Literal["auto", "extrema", "mean", "otsu"]] = 0.5, + source: None | int | Callable = None, + threshold: float | Literal["auto", "extrema", "mean", "otsu"] = 0.5, minimal_radius: float = 0, refine: bool = False, refine_args: Optional[Dict[str, Any]] = None, diff --git a/tests/test_emulsion.py b/tests/test_emulsion.py index f2587c3..4cd689e 100644 --- a/tests/test_emulsion.py +++ b/tests/test_emulsion.py @@ -8,10 +8,11 @@ import numpy as np import pytest -from pde import CartesianGrid, ScalarField, UnitGrid +from pde import CartesianGrid, MemoryStorage, ScalarField, UnitGrid from pde.tools.misc import skipUnlessModule from droplets import DiffuseDroplet, Emulsion, SphericalDroplet, droplets, emulsions +from droplets.emulsions import EmulsionTimeCourse def test_empty_emulsion(caplog): @@ -293,3 +294,25 @@ def test_emulsion_random(dim, grid, rng): assert em.dim == dim assert np.all(em.data["position"] > 10) and np.all(em.data["position"] < 30) assert np.all(em.data["radius"] > 1) and np.all(em.data["radius"] < 2) + + +@pytest.mark.parametrize("proc", [1, 2]) +def test_emulsion_from_storage(proc): + """test reading emulsion time courses from a file""" + grid = UnitGrid([32, 32]) + radii = [3, 2.7, 4.3] + pos = [[7, 8], [9, 22], [22, 10]] + em1 = Emulsion( + [DiffuseDroplet(p, r, interface_width=1) for p, r in zip(pos, radii)] + ) + field1 = em1.get_phasefield(grid) + em2 = em1[1:] + field2 = em2.get_phasefield(grid) + + storage = MemoryStorage() + storage.start_writing(field1) + storage.append(field1, 1) + storage.append(field2, 2) + + etc = EmulsionTimeCourse.from_storage(storage, num_processes=proc, refine=True) + assert len(etc) == 2 diff --git a/tests/test_image_analysis.py b/tests/test_image_analysis.py index 19d07c2..62017cf 100644 --- a/tests/test_image_analysis.py +++ b/tests/test_image_analysis.py @@ -358,3 +358,23 @@ def test_droplets_on_periodic_grids(dim, pos): assert len(em) == 1 assert em[0].volume == pytest.approx(field.integral) np.testing.assert_allclose(em[0].position, np.full(dim, pos), atol=0.1) + + +@pytest.mark.parametrize("num_processes", [1, 2]) +def test_droplet_refine_parallel(num_processes): + """tests droplets localization in 2d with and without multiprocessing""" + grid = UnitGrid([32, 32]) + radii = [3, 2.7, 4.3] + pos = [[7, 8], [9, 22], [22, 10]] + em = Emulsion([DiffuseDroplet(p, r, interface_width=1) for p, r in zip(pos, radii)]) + field = em.get_phasefield(grid) + + emulsion = image_analysis.locate_droplets( + field, refine=True, num_processes=num_processes + ) + assert len(emulsion) == 3 + + for droplet, p, r in zip(emulsion, pos, radii): + np.testing.assert_almost_equal(droplet.position, p, decimal=4) + assert droplet.radius == pytest.approx(r, rel=1e-4) + assert droplet.interface_width == pytest.approx(1, rel=1e-4)