-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #136 from gerlero/cases
Refactor internal cases module
- Loading branch information
Showing
4 changed files
with
289 additions
and
253 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from ._async import AsyncFoamCase | ||
from ._base import FoamCaseBase | ||
from ._sync import FoamCase | ||
|
||
__all__ = [ | ||
"FoamCaseBase", | ||
"FoamCase", | ||
"AsyncFoamCase", | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
import asyncio | ||
import multiprocessing | ||
import sys | ||
from contextlib import asynccontextmanager | ||
from pathlib import Path | ||
from typing import ( | ||
Optional, | ||
Union, | ||
) | ||
|
||
if sys.version_info >= (3, 9): | ||
from collections.abc import AsyncGenerator, Sequence | ||
else: | ||
from typing import AsyncGenerator, Sequence | ||
|
||
import aioshutil | ||
|
||
from .._util import run_process_async | ||
from ._base import FoamCaseBase | ||
|
||
|
||
class AsyncFoamCase(FoamCaseBase): | ||
""" | ||
An OpenFOAM case with asynchronous support. | ||
Provides methods for running and cleaning cases, as well as accessing files. | ||
Access the time directories of the case as a sequence, e.g. `case[0]` or `case[-1]`. | ||
:param path: The path to the case directory. | ||
""" | ||
|
||
max_cpus = multiprocessing.cpu_count() | ||
""" | ||
Maximum number of CPUs to use for running `AsyncFoamCase`s concurrently. Defaults to the number of CPUs on the system. | ||
""" | ||
|
||
_reserved_cpus = 0 | ||
_cpus_cond = None # Cannot be initialized here yet | ||
|
||
@staticmethod | ||
@asynccontextmanager | ||
async def _cpus(cpus: int) -> AsyncGenerator[None, None]: | ||
if AsyncFoamCase._cpus_cond is None: | ||
AsyncFoamCase._cpus_cond = asyncio.Condition() | ||
|
||
cpus = min(cpus, AsyncFoamCase.max_cpus) | ||
if cpus > 0: | ||
async with AsyncFoamCase._cpus_cond: | ||
await AsyncFoamCase._cpus_cond.wait_for( | ||
lambda: AsyncFoamCase.max_cpus - AsyncFoamCase._reserved_cpus | ||
>= cpus | ||
) | ||
AsyncFoamCase._reserved_cpus += cpus | ||
try: | ||
yield | ||
finally: | ||
if cpus > 0: | ||
async with AsyncFoamCase._cpus_cond: | ||
AsyncFoamCase._reserved_cpus -= cpus | ||
AsyncFoamCase._cpus_cond.notify(cpus) | ||
|
||
async def clean( | ||
self, | ||
*, | ||
script: bool = True, | ||
check: bool = False, | ||
) -> None: | ||
""" | ||
Clean this case. | ||
:param script: If True, use an (All)clean script if it exists. If False, ignore any clean scripts. | ||
:param check: If True, raise a CalledProcessError if the clean script returns a non-zero exit code. | ||
""" | ||
script_path = self._clean_script() if script else None | ||
|
||
if script_path is not None: | ||
await self.run([script_path], check=check) | ||
else: | ||
for p in self._clean_paths(): | ||
if p.is_dir(): | ||
await aioshutil.rmtree(p) # type: ignore [call-arg] | ||
else: | ||
p.unlink() | ||
|
||
async def _run( | ||
self, | ||
cmd: Union[Sequence[Union[str, Path]], str, Path], | ||
*, | ||
check: bool = True, | ||
) -> None: | ||
await run_process_async(cmd, cwd=self.path, check=check) | ||
|
||
async def run( | ||
self, | ||
cmd: Optional[Union[Sequence[Union[str, Path]], str, Path]] = None, | ||
*, | ||
script: bool = True, | ||
parallel: Optional[bool] = None, | ||
cpus: Optional[int] = None, | ||
check: bool = True, | ||
) -> None: | ||
for name, args, kwargs in self._run_cmds( | ||
cmd=cmd, script=script, parallel=parallel, check=check | ||
): | ||
if cpus is None: | ||
cpus = 1 | ||
|
||
if name == "run" and kwargs.get("parallel", False): | ||
cpus = min(self._nprocessors, cpus) | ||
|
||
async with self._cpus(cpus): | ||
await getattr(self, name)(*args, **kwargs) | ||
|
||
async def block_mesh(self, *, check: bool = True) -> None: | ||
"""Run blockMesh on this case.""" | ||
await self.run(["blockMesh"], check=check) | ||
|
||
async def decompose_par(self, *, check: bool = True) -> None: | ||
"""Decompose this case for parallel running.""" | ||
await self.run(["decomposePar"], check=check) | ||
|
||
async def reconstruct_par(self, *, check: bool = True) -> None: | ||
"""Reconstruct this case after parallel running.""" | ||
await self.run(["reconstructPar"], check=check) | ||
|
||
async def restore_0_dir(self) -> None: | ||
"""Restore the 0 directory from the 0.orig directory.""" | ||
await aioshutil.rmtree(self.path / "0", ignore_errors=True) # type: ignore [call-arg] | ||
await aioshutil.copytree(self.path / "0.orig", self.path / "0") | ||
|
||
async def copy(self, dest: Union[Path, str]) -> "AsyncFoamCase": | ||
""" | ||
Make a copy of this case. | ||
:param dest: The destination path. | ||
""" | ||
return AsyncFoamCase(await aioshutil.copytree(self.path, dest, symlinks=True)) | ||
|
||
async def clone(self, dest: Union[Path, str]) -> "AsyncFoamCase": | ||
""" | ||
Clone this case (make a clean copy). | ||
:param dest: The destination path. | ||
""" | ||
if self._clean_script() is not None: | ||
copy = await self.copy(dest) | ||
await copy.clean() | ||
return copy | ||
|
||
dest = Path(dest) | ||
|
||
await aioshutil.copytree( | ||
self.path, dest, symlinks=True, ignore=self._clone_ignore() | ||
) | ||
|
||
return AsyncFoamCase(dest) |
Oops, something went wrong.