From 0defdcc7ce53a3d52b5b3754c34dac1eaccf1eac Mon Sep 17 00:00:00 2001 From: Gabriel Gerlero Date: Mon, 1 Apr 2024 16:29:52 -0300 Subject: [PATCH] Use own parser for dictionary insertions and modifications --- foamlib/_dictionaries.py | 275 +++++++++++++++++++++++++-------------- 1 file changed, 177 insertions(+), 98 deletions(-) diff --git a/foamlib/_dictionaries.py b/foamlib/_dictionaries.py index 90c3a59..c66cfa2 100644 --- a/foamlib/_dictionaries.py +++ b/foamlib/_dictionaries.py @@ -1,6 +1,7 @@ from pathlib import Path from dataclasses import dataclass from contextlib import suppress +import typing from typing import ( Any, Union, @@ -42,8 +43,6 @@ else: numpy = True -from ._subprocesses import run_process, CalledProcessError - class _FoamDictionary(MutableMapping[str, Union["FoamFile.Value", "_FoamDictionary"]]): @@ -51,91 +50,32 @@ def __init__(self, _file: "FoamFile", _keywords: Sequence[str]) -> None: self._file = _file self._keywords = _keywords - def _cmd(self, args: Sequence[str], *, key: Optional[str] = None) -> str: - keywords = self._keywords - - if key is not None: - keywords = [*self._keywords, key] - - if keywords: - args = ["-entry", "/".join(keywords), *args] - - try: - return ( - run_process( - ["foamDictionary", *args, "-precision", "15", self._file.path], - ) - .stdout.decode() - .strip() - ) - except CalledProcessError as e: - stderr = e.stderr.decode() - if "Cannot find entry" in stderr: - raise KeyError(key) from None - else: - raise RuntimeError( - f"{e.cmd} failed with return code {e.returncode}\n{e.stderr.decode()}" - ) from None - - def __getitem__(self, key: str) -> Union["FoamFile.Value", "_FoamDictionary"]: - contents = self._file.path.read_text() - parsed = _parse(contents) - - _, value, _ = parsed[(*self._keywords, key)] - - if value is None: - return _FoamDictionary(self._file, [*self._keywords, key]) - else: - return value + def __getitem__(self, keyword: str) -> Union["FoamFile.Value", "_FoamDictionary"]: + return self._file[(*self._keywords, keyword)] def _setitem( self, - key: str, + keyword: str, value: Any, *, assume_field: bool = False, assume_dimensions: bool = False, ) -> None: - if isinstance(value, _FoamDictionary): - value = value._cmd(["-value"]) - elif isinstance(value, Mapping): - self._cmd(["-set", "{}"], key=key) - subdict = self[key] - assert isinstance(subdict, _FoamDictionary) - for k, v in value.items(): - subdict[k] = v - return - else: - value = serialize( - value, assume_field=assume_field, assume_dimensions=assume_dimensions - ) - - if len(value) < 1000: - self._cmd(["-set", value], key=key) - else: - self._cmd(["-set", "_foamlib_value_"], key=key) - contents = self._file.path.read_text() - contents = contents.replace("_foamlib_value_", value, 1) - self._file.path.write_text(contents) - - def __setitem__(self, key: str, value: Any) -> None: - self._setitem(key, value) - - def __delitem__(self, key: str) -> None: - contents = self._file.path.read_text() - parsed = _parse(contents) + self._file._setitem( + (*self._keywords, keyword), + value, + assume_field=assume_field, + assume_dimensions=assume_dimensions, + ) - start, _, end = parsed[(*self._keywords, key)] + def __setitem__(self, keyword: str, value: Any) -> None: + self._setitem(keyword, value) - self._file.path.write_text(contents[:start] + contents[end:]) + def __delitem__(self, keyword: str) -> None: + del self._file[(*self._keywords, keyword)] def __iter__(self) -> Iterator[str]: - contents = self._file.path.read_text() - parsed = _parse(contents) - - for keywords in parsed: - if keywords[:-1] == tuple(self._keywords): - yield keywords[-1] + return self._file._iter(tuple(self._keywords)) def __len__(self) -> int: return len(list(iter(self))) @@ -143,6 +83,22 @@ def __len__(self) -> int: def __repr__(self) -> str: return f"FoamFile.Dictionary({self._file}, {self._keywords})" + _Dict = typing.Dict[str, Union["FoamFile.Value", "_Dict"]] + + def as_dict(self) -> _Dict: + """ + Return a nested dictionary representation of the dictionary. + """ + ret = self._file.as_dict() + + for k in self._keywords: + assert isinstance(ret, dict) + v = ret[k] + assert isinstance(v, dict) + ret = v + + return ret + class FoamFile(_FoamDictionary): """An OpenFOAM dictionary file as a mutable mapping.""" @@ -184,26 +140,117 @@ def __init__(self, path: Union[str, Path]) -> None: elif not self.path.is_file(): raise FileNotFoundError(self.path) + def __getitem__( + self, keywords: Union[str, Tuple[str, ...]] + ) -> Union["FoamFile.Value", "_FoamDictionary"]: + if not isinstance(keywords, tuple): + keywords = (keywords,) + + contents = self._file.path.read_text() + parsed = _parse(contents) + + _, value, _ = parsed[keywords] + + if value is None: + return _FoamDictionary(self._file, keywords) + else: + return value + + def _setitem( + self, + keywords: Union[str, Tuple[str, ...]], + value: Any, + *, + assume_field: bool = False, + assume_dimensions: bool = False, + ) -> None: + if not isinstance(keywords, tuple): + keywords = (keywords,) + + contents = self.path.read_text() + parsed = _parse(contents) + + if isinstance(value, Mapping): + if isinstance(value, FoamFile.Dictionary): + value = value.as_dict() + + start, end = _entry_locn(parsed, keywords) + + contents = f"{contents[:start]} {keywords[-1]} {{\n}}\n {contents[end:]}" + self.path.write_text(contents) + + for k, v in value.items(): + self[(*keywords, k)] = v + else: + start, end = _entry_locn(parsed, keywords) + + value = _serialize_value( + value, assume_field=assume_field, assume_dimensions=assume_dimensions + ) + + contents = f"{contents[:start]} {keywords[-1]} {value};\n {contents[end:]}" + self.path.write_text(contents) + + def __setitem__(self, keywords: Union[str, Tuple[str, ...]], value: Any) -> None: + self._setitem(keywords, value) + + def __delitem__(self, keywords: Union[str, Tuple[str, ...]]) -> None: + if not isinstance(keywords, tuple): + keywords = (keywords,) + + contents = self.path.read_text() + parsed = _parse(contents) + + start, _, end = parsed[keywords] + + self.path.write_text(contents[:start] + contents[end:]) + + def _iter(self, keywords: Union[str, Tuple[str, ...]] = ()) -> Iterator[str]: + if not isinstance(keywords, tuple): + keywords = (keywords,) + + contents = self.path.read_text() + parsed = _parse(contents) + + yield from (k[-1] for k in parsed if k[:-1] == keywords) + + def __iter__(self) -> Iterator[str]: + return self._iter() + def __fspath__(self) -> str: return str(self.path) def __repr__(self) -> str: return f"{type(self).__name__}({self.path})" + def as_dict(self) -> _FoamDictionary._Dict: + """ + Return a nested dictionary representation of the file. + """ + contents = self.path.read_text() + parsed = _parse(contents) + ret: _FoamDictionary._Dict = {} + for keywords, (_, value, _) in parsed.items(): + + r = ret + for k in keywords[:-1]: + assert isinstance(r, dict) + v = r[k] + assert isinstance(v, dict) + r = v + + assert isinstance(r, dict) + r[keywords[-1]] = {} if value is None else value + + return ret + class FoamFieldFile(FoamFile): """An OpenFOAM dictionary file representing a field as a mutable mapping.""" class BoundariesDictionary(_FoamDictionary): - def __getitem__( - self, key: str - ) -> Union["FoamFile.Value", "FoamFieldFile.BoundaryDictionary"]: - ret = super().__getitem__(key) - if isinstance(ret, _FoamDictionary): - ret = FoamFieldFile.BoundaryDictionary( - self._file, [*self._keywords, key] - ) - return ret + def __getitem__(self, keyword: str) -> "FoamFieldFile.BoundaryDictionary": + return cast(FoamFieldFile.BoundaryDictionary, super().__getitem__(keyword)) def __repr__(self) -> str: return f"{type(self).__qualname__}({self._file}, {self._keywords})" @@ -267,19 +314,30 @@ def value(self) -> None: def __repr__(self) -> str: return f"{type(self).__qualname__}({self._file}, {self._keywords})" - def __getitem__(self, key: str) -> Union[FoamFile.Value, _FoamDictionary]: - ret = super().__getitem__(key) - if key == "boundaryField" and isinstance(ret, _FoamDictionary): - ret = FoamFieldFile.BoundariesDictionary(self, [key]) + def __getitem__( + self, keywords: Union[str, Tuple[str, ...]] + ) -> Union[FoamFile.Value, _FoamDictionary]: + if not isinstance(keywords, tuple): + keywords = (keywords,) + + ret = super().__getitem__(keywords) + if keywords[0] == "boundaryField" and isinstance(ret, _FoamDictionary): + if len(keywords) == 1: + ret = FoamFieldFile.BoundariesDictionary(self, keywords) + elif len(keywords) == 2: + ret = FoamFieldFile.BoundaryDictionary(self, keywords) return ret - def __setitem__(self, key: str, value: Any) -> None: - if key == "internalField": - self._setitem(key, value, assume_field=True) - elif key == "dimensions": - self._setitem(key, value, assume_dimensions=True) + def __setitem__(self, keywords: Union[str, Tuple[str, ...]], value: Any) -> None: + if not isinstance(keywords, tuple): + keywords = (keywords,) + + if keywords == ("internalField",): + self._setitem(keywords, value, assume_field=True) + elif keywords == ("dimensions",): + self._setitem(keywords, value, assume_dimensions=True) else: - self._setitem(key, value) + self._setitem(keywords, value) @property def dimensions(self) -> FoamFile.DimensionSet: @@ -430,6 +488,27 @@ def _parse( return ret +def _entry_locn( + parsed: Mapping[Sequence[str], Tuple[int, Optional[FoamFile.Value], int]], + keywords: Tuple[str, ...], +) -> Tuple[int, int]: + """ + Location of an entry or where it should be inserted. + """ + try: + start, _, end = parsed[keywords] + except KeyError: + if len(keywords) > 1: + _, _, end = parsed[keywords[:-1]] + end -= 1 + else: + end = -1 + + start = end + + return start, end + + def _serialize_bool(value: Any) -> str: if value is True: return "yes" @@ -450,7 +529,7 @@ def _is_sequence(value: Any) -> bool: def _serialize_list(value: Any) -> str: if _is_sequence(value): - return f"({' '.join(serialize(v) for v in value)})" + return f"({' '.join(_serialize_value(v) for v in value)})" else: raise TypeError(f"Not a valid sequence: {type(value)}") @@ -492,14 +571,14 @@ def _serialize_dimensions(value: Any) -> str: def _serialize_dimensioned(value: Any) -> str: if isinstance(value, FoamFile.Dimensioned): if value.name is not None: - return f"{value.name} {_serialize_dimensions(value.dimensions)} {serialize(value.value)}" + return f"{value.name} {_serialize_dimensions(value.dimensions)} {_serialize_value(value.value)}" else: - return f"{_serialize_dimensions(value.dimensions)} {serialize(value.value)}" + return f"{_serialize_dimensions(value.dimensions)} {_serialize_value(value.value)}" else: raise TypeError(f"Not a valid dimensioned value: {type(value)}") -def serialize( +def _serialize_value( value: Any, *, assume_field: bool = False, assume_dimensions: bool = False ) -> str: if isinstance(value, FoamFile.DimensionSet) or assume_dimensions: