Skip to content

Commit

Permalink
Merge pull request #135 from gerlero/cases
Browse files Browse the repository at this point in the history
Refactor case runner logic
  • Loading branch information
gerlero authored Jul 28, 2024
2 parents 1aa54a8 + 3e85381 commit 8e0f3c2
Showing 1 changed file with 85 additions and 110 deletions.
195 changes: 85 additions & 110 deletions foamlib/_cases.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from contextlib import asynccontextmanager
from pathlib import Path
from typing import (
Any,
Optional,
Tuple,
Union,
overload,
)
Expand All @@ -15,13 +17,23 @@
AsyncGenerator,
Callable,
Collection,
Generator,
Iterator,
Mapping,
Sequence,
Set,
)
else:
from typing import AbstractSet as Set
from typing import AsyncGenerator, Callable, Collection, Iterator, Sequence
from typing import (
AsyncGenerator,
Callable,
Collection,
Generator,
Iterator,
Mapping,
Sequence,
)

import aioshutil

Expand Down Expand Up @@ -209,6 +221,51 @@ def _run_script(self, *, parallel: Optional[bool] = None) -> Optional[Path]:
else:
return None

def _run_cmds(
self,
cmd: Optional[Union[Sequence[Union[str, Path]], str, Path]] = None,
*,
script: bool = True,
parallel: Optional[bool] = None,
check: bool = True,
) -> Generator[Tuple[str, Sequence[Any], Mapping[str, Any]], None, None]:
if cmd is not None:
if parallel:
cmd = self._parallel_cmd(cmd)

yield ("_run", (cmd,), {"check": check})
else:
script_path = self._run_script(parallel=parallel) if script else None

if script_path is not None:
yield ("_run", ([script_path],), {"check": check})

else:
if not self and (self.path / "0.orig").is_dir():
yield ("restore_0_dir", (), {})

if (self.path / "system" / "blockMeshDict").is_file():
yield ("block_mesh", (), {"check": check})

if parallel is None:
parallel = (
self._nprocessors > 0
or (self.path / "system" / "decomposeParDict").is_file()
)

if parallel:
if (
self._nprocessors == 0
and (self.path / "system" / "decomposeParDict").is_file()
):
yield ("decompose_par", (), {"check": check})

yield (
"run",
([self.application],),
{"parallel": parallel, "check": check},
)

def _parallel_cmd(
self, cmd: Union[Sequence[Union[str, Path]], str, Path]
) -> Union[Sequence[Union[str, Path]], str]:
Expand Down Expand Up @@ -338,6 +395,14 @@ def clean(
else:
p.unlink()

def _run(
self,
cmd: Union[Sequence[Union[str, Path]], str, Path],
*,
check: bool = True,
) -> None:
run_process(cmd, cwd=self.path, check=check)

def run(
self,
cmd: Optional[Union[Sequence[Union[str, Path]], str, Path]] = None,
Expand All @@ -354,46 +419,10 @@ def run(
:param parallel: If True, run in parallel using MPI. If None, autodetect whether to run in parallel.
:param check: If True, raise a CalledProcessError if any command returns a non-zero exit code.
"""
if cmd is not None:
if parallel:
cmd = self._parallel_cmd(cmd)

run_process(
cmd,
check=check,
cwd=self.path,
)
else:
script_path = self._run_script(parallel=parallel) if script else None

if script_path is not None:
return self.run([script_path], check=check)

else:
if not self and (self.path / "0.orig").is_dir():
self.restore_0_dir()

if (self.path / "system" / "blockMeshDict").is_file():
self.block_mesh()

if parallel is None:
parallel = (
self._nprocessors > 0
or (self.path / "system" / "decomposeParDict").is_file()
)

if parallel:
if (
self._nprocessors == 0
and (self.path / "system" / "decomposeParDict").is_file()
):
self.decompose_par()

self.run(
[self.application],
parallel=parallel,
check=check,
)
for name, args, kwargs in self._run_cmds(
cmd=cmd, script=script, parallel=parallel, check=check
):
getattr(self, name)(*args, **kwargs)

def block_mesh(self, *, check: bool = True) -> None:
"""Run blockMesh on this case."""
Expand Down Expand Up @@ -502,6 +531,14 @@ async def clean(
else:
p.unlink()

async def _run(
self,
cmd: Union[Sequence[Union[str, Path]], str, Path],
*,
check: bool = True,
) -> None:
await run_process_async(cmd, cwd=self.path, check=check)

async def run(
self,
cmd: Optional[Union[Sequence[Union[str, Path]], str, Path]] = None,
Expand All @@ -511,79 +548,17 @@ async def run(
cpus: Optional[int] = None,
check: bool = True,
) -> None:
"""
Run this case, or a specified command in the context of this case.
:param cmd: The command to run. If None, run the case. If a sequence, the first element is the command and the rest are arguments. If a string, `cmd` is executed in a shell.
:param script: If True and `cmd` is None, use an (All)run(-parallel) script if it exists for running the case. If False or no run script is found, autodetermine the command(s) needed to run the case.
:param parallel: If True, run in parallel using MPI. If None, autodetect whether to run in parallel.
:param cpus: The number of CPUs to reserve for the run. The run will wait until the requested number of CPUs is available. If None, autodetect the number of CPUs to reserve.
:param check: If True, raise a CalledProcessError if a command returns a non-zero exit code.
"""
if cmd is not None:
for name, args, kwargs in self._run_cmds(
cmd=cmd, script=script, parallel=parallel, check=check
):
if cpus is None:
if parallel:
cpus = min(self._nprocessors, 1)
else:
cpus = 1
cpus = 1

if parallel:
cmd = self._parallel_cmd(cmd)
if name == "run" and kwargs.get("parallel", False):
cpus = min(self._nprocessors, cpus)

async with self._cpus(cpus):
await run_process_async(
cmd,
check=check,
cwd=self.path,
)
else:
script_path = self._run_script(parallel=parallel) if script else None

if script_path is not None:
if cpus is None:
if self._nprocessors > 0:
cpus = self._nprocessors
else:
nsubdomains = self._nsubdomains
if nsubdomains is not None:
cpus = nsubdomains
else:
cpus = 1

await self.run([script_path], check=check, cpus=cpus)

else:
if not self and (self.path / "0.orig").is_dir():
await self.restore_0_dir()

if (self.path / "system" / "blockMeshDict").is_file():
await self.block_mesh()

if parallel is None:
parallel = (
self._nprocessors > 0
or (self.path / "system" / "decomposeParDict").is_file()
)

if parallel:
if (
self._nprocessors == 0
and (self.path / "system" / "decomposeParDict").is_file()
):
await self.decompose_par()

if cpus is None:
cpus = min(self._nprocessors, 1)
else:
if cpus is None:
cpus = 1

await self.run(
[self.application],
parallel=parallel,
check=check,
cpus=cpus,
)
await getattr(self, name)(*args, **kwargs)

async def block_mesh(self, *, check: bool = True) -> None:
"""Run blockMesh on this case."""
Expand Down

0 comments on commit 8e0f3c2

Please sign in to comment.