From 3e85381445ce4086dceb86dd293b95e12e617006 Mon Sep 17 00:00:00 2001 From: Gabriel Gerlero Date: Sun, 28 Jul 2024 19:17:07 -0300 Subject: [PATCH] Refactor case runner logic --- foamlib/_cases.py | 195 ++++++++++++++++++++-------------------------- 1 file changed, 85 insertions(+), 110 deletions(-) diff --git a/foamlib/_cases.py b/foamlib/_cases.py index fbe518e..a06bf48 100644 --- a/foamlib/_cases.py +++ b/foamlib/_cases.py @@ -5,7 +5,9 @@ from contextlib import asynccontextmanager from pathlib import Path from typing import ( + Any, Optional, + Tuple, Union, overload, ) @@ -15,13 +17,23 @@ AsyncGenerator, Callable, Collection, + Generator, Iterator, + Mapping, Sequence, Set, ) else: from typing import AbstractSet as Set - from typing import AsyncGenerator, Callable, Collection, Iterator, Sequence + from typing import ( + AsyncGenerator, + Callable, + Collection, + Generator, + Iterator, + Mapping, + Sequence, + ) import aioshutil @@ -209,6 +221,51 @@ def _run_script(self, *, parallel: Optional[bool] = None) -> Optional[Path]: else: return None + def _run_cmds( + self, + cmd: Optional[Union[Sequence[Union[str, Path]], str, Path]] = None, + *, + script: bool = True, + parallel: Optional[bool] = None, + check: bool = True, + ) -> Generator[Tuple[str, Sequence[Any], Mapping[str, Any]], None, None]: + if cmd is not None: + if parallel: + cmd = self._parallel_cmd(cmd) + + yield ("_run", (cmd,), {"check": check}) + else: + script_path = self._run_script(parallel=parallel) if script else None + + if script_path is not None: + yield ("_run", ([script_path],), {"check": check}) + + else: + if not self and (self.path / "0.orig").is_dir(): + yield ("restore_0_dir", (), {}) + + if (self.path / "system" / "blockMeshDict").is_file(): + yield ("block_mesh", (), {"check": check}) + + if parallel is None: + parallel = ( + self._nprocessors > 0 + or (self.path / "system" / "decomposeParDict").is_file() + ) + + if parallel: + if ( + self._nprocessors == 0 + and (self.path / "system" / "decomposeParDict").is_file() + ): + yield ("decompose_par", (), {"check": check}) + + yield ( + "run", + ([self.application],), + {"parallel": parallel, "check": check}, + ) + def _parallel_cmd( self, cmd: Union[Sequence[Union[str, Path]], str, Path] ) -> Union[Sequence[Union[str, Path]], str]: @@ -338,6 +395,14 @@ def clean( else: p.unlink() + def _run( + self, + cmd: Union[Sequence[Union[str, Path]], str, Path], + *, + check: bool = True, + ) -> None: + run_process(cmd, cwd=self.path, check=check) + def run( self, cmd: Optional[Union[Sequence[Union[str, Path]], str, Path]] = None, @@ -354,46 +419,10 @@ def run( :param parallel: If True, run in parallel using MPI. If None, autodetect whether to run in parallel. :param check: If True, raise a CalledProcessError if any command returns a non-zero exit code. """ - if cmd is not None: - if parallel: - cmd = self._parallel_cmd(cmd) - - run_process( - cmd, - check=check, - cwd=self.path, - ) - else: - script_path = self._run_script(parallel=parallel) if script else None - - if script_path is not None: - return self.run([script_path], check=check) - - else: - if not self and (self.path / "0.orig").is_dir(): - self.restore_0_dir() - - if (self.path / "system" / "blockMeshDict").is_file(): - self.block_mesh() - - if parallel is None: - parallel = ( - self._nprocessors > 0 - or (self.path / "system" / "decomposeParDict").is_file() - ) - - if parallel: - if ( - self._nprocessors == 0 - and (self.path / "system" / "decomposeParDict").is_file() - ): - self.decompose_par() - - self.run( - [self.application], - parallel=parallel, - check=check, - ) + for name, args, kwargs in self._run_cmds( + cmd=cmd, script=script, parallel=parallel, check=check + ): + getattr(self, name)(*args, **kwargs) def block_mesh(self, *, check: bool = True) -> None: """Run blockMesh on this case.""" @@ -502,6 +531,14 @@ async def clean( else: p.unlink() + async def _run( + self, + cmd: Union[Sequence[Union[str, Path]], str, Path], + *, + check: bool = True, + ) -> None: + await run_process_async(cmd, cwd=self.path, check=check) + async def run( self, cmd: Optional[Union[Sequence[Union[str, Path]], str, Path]] = None, @@ -511,79 +548,17 @@ async def run( cpus: Optional[int] = None, check: bool = True, ) -> None: - """ - Run this case, or a specified command in the context of this case. - - :param cmd: The command to run. If None, run the case. If a sequence, the first element is the command and the rest are arguments. If a string, `cmd` is executed in a shell. - :param script: If True and `cmd` is None, use an (All)run(-parallel) script if it exists for running the case. If False or no run script is found, autodetermine the command(s) needed to run the case. - :param parallel: If True, run in parallel using MPI. If None, autodetect whether to run in parallel. - :param cpus: The number of CPUs to reserve for the run. The run will wait until the requested number of CPUs is available. If None, autodetect the number of CPUs to reserve. - :param check: If True, raise a CalledProcessError if a command returns a non-zero exit code. - """ - if cmd is not None: + for name, args, kwargs in self._run_cmds( + cmd=cmd, script=script, parallel=parallel, check=check + ): if cpus is None: - if parallel: - cpus = min(self._nprocessors, 1) - else: - cpus = 1 + cpus = 1 - if parallel: - cmd = self._parallel_cmd(cmd) + if name == "run" and kwargs.get("parallel", False): + cpus = min(self._nprocessors, cpus) async with self._cpus(cpus): - await run_process_async( - cmd, - check=check, - cwd=self.path, - ) - else: - script_path = self._run_script(parallel=parallel) if script else None - - if script_path is not None: - if cpus is None: - if self._nprocessors > 0: - cpus = self._nprocessors - else: - nsubdomains = self._nsubdomains - if nsubdomains is not None: - cpus = nsubdomains - else: - cpus = 1 - - await self.run([script_path], check=check, cpus=cpus) - - else: - if not self and (self.path / "0.orig").is_dir(): - await self.restore_0_dir() - - if (self.path / "system" / "blockMeshDict").is_file(): - await self.block_mesh() - - if parallel is None: - parallel = ( - self._nprocessors > 0 - or (self.path / "system" / "decomposeParDict").is_file() - ) - - if parallel: - if ( - self._nprocessors == 0 - and (self.path / "system" / "decomposeParDict").is_file() - ): - await self.decompose_par() - - if cpus is None: - cpus = min(self._nprocessors, 1) - else: - if cpus is None: - cpus = 1 - - await self.run( - [self.application], - parallel=parallel, - check=check, - cpus=cpus, - ) + await getattr(self, name)(*args, **kwargs) async def block_mesh(self, *, check: bool = True) -> None: """Run blockMesh on this case."""