Skip to content

Commit

Permalink
Use own parser for dictionary insertions and modifications
Browse files Browse the repository at this point in the history
  • Loading branch information
gerlero committed Apr 1, 2024
1 parent b5ba7e0 commit 73c8a66
Showing 1 changed file with 177 additions and 98 deletions.
275 changes: 177 additions & 98 deletions foamlib/_dictionaries.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from pathlib import Path
from dataclasses import dataclass
from contextlib import suppress
import typing
from typing import (
Any,
Union,
Expand Down Expand Up @@ -42,107 +43,62 @@
else:
numpy = True

from ._subprocesses import run_process, CalledProcessError


class _FoamDictionary(MutableMapping[str, Union["FoamFile.Value", "_FoamDictionary"]]):

def __init__(self, _file: "FoamFile", _keywords: Sequence[str]) -> None:
self._file = _file
self._keywords = _keywords

def _cmd(self, args: Sequence[str], *, key: Optional[str] = None) -> str:
keywords = self._keywords

if key is not None:
keywords = [*self._keywords, key]

if keywords:
args = ["-entry", "/".join(keywords), *args]

try:
return (
run_process(
["foamDictionary", *args, "-precision", "15", self._file.path],
)
.stdout.decode()
.strip()
)
except CalledProcessError as e:
stderr = e.stderr.decode()
if "Cannot find entry" in stderr:
raise KeyError(key) from None
else:
raise RuntimeError(
f"{e.cmd} failed with return code {e.returncode}\n{e.stderr.decode()}"
) from None

def __getitem__(self, key: str) -> Union["FoamFile.Value", "_FoamDictionary"]:
contents = self._file.path.read_text()
parsed = _parse(contents)

_, value, _ = parsed[(*self._keywords, key)]

if value is None:
return _FoamDictionary(self._file, [*self._keywords, key])
else:
return value
def __getitem__(self, keyword: str) -> Union["FoamFile.Value", "_FoamDictionary"]:
return self._file[(*self._keywords, keyword)]

def _setitem(
self,
key: str,
keyword: str,
value: Any,
*,
assume_field: bool = False,
assume_dimensions: bool = False,
) -> None:
if isinstance(value, _FoamDictionary):
value = value._cmd(["-value"])
elif isinstance(value, Mapping):
self._cmd(["-set", "{}"], key=key)
subdict = self[key]
assert isinstance(subdict, _FoamDictionary)
for k, v in value.items():
subdict[k] = v
return
else:
value = serialize(
value, assume_field=assume_field, assume_dimensions=assume_dimensions
)

if len(value) < 1000:
self._cmd(["-set", value], key=key)
else:
self._cmd(["-set", "_foamlib_value_"], key=key)
contents = self._file.path.read_text()
contents = contents.replace("_foamlib_value_", value, 1)
self._file.path.write_text(contents)

def __setitem__(self, key: str, value: Any) -> None:
self._setitem(key, value)

def __delitem__(self, key: str) -> None:
contents = self._file.path.read_text()
parsed = _parse(contents)
self._file._setitem(
(*self._keywords, keyword),
value,
assume_field=assume_field,
assume_dimensions=assume_dimensions,
)

start, _, end = parsed[(*self._keywords, key)]
def __setitem__(self, keyword: str, value: Any) -> None:
self._setitem(keyword, value)

self._file.path.write_text(contents[:start] + contents[end:])
def __delitem__(self, keyword: str) -> None:
del self._file[(*self._keywords, keyword)]

def __iter__(self) -> Iterator[str]:
contents = self._file.path.read_text()
parsed = _parse(contents)

for keywords in parsed:
if keywords[:-1] == tuple(self._keywords):
yield keywords[-1]
return self._file._iter(tuple(self._keywords))

def __len__(self) -> int:
return len(list(iter(self)))

def __repr__(self) -> str:
return f"FoamFile.Dictionary({self._file}, {self._keywords})"

_Dict = typing.Dict[str, Union["FoamFile.Value", "_Dict"]]

def as_dict(self) -> _Dict:
"""
Return a nested dict representation of the dictionary.
"""
ret = self._file.as_dict()

for k in self._keywords:
assert isinstance(ret, dict)
v = ret[k]
assert isinstance(v, dict)
ret = v

return ret


class FoamFile(_FoamDictionary):
"""An OpenFOAM dictionary file as a mutable mapping."""
Expand Down Expand Up @@ -184,26 +140,117 @@ def __init__(self, path: Union[str, Path]) -> None:
elif not self.path.is_file():
raise FileNotFoundError(self.path)

def __getitem__(
self, keywords: Union[str, Tuple[str, ...]]
) -> Union["FoamFile.Value", "_FoamDictionary"]:
if not isinstance(keywords, tuple):
keywords = (keywords,)

contents = self._file.path.read_text()
parsed = _parse(contents)

_, value, _ = parsed[keywords]

if value is None:
return _FoamDictionary(self._file, keywords)
else:
return value

def _setitem(
self,
keywords: Union[str, Tuple[str, ...]],
value: Any,
*,
assume_field: bool = False,
assume_dimensions: bool = False,
) -> None:
if not isinstance(keywords, tuple):
keywords = (keywords,)

contents = self.path.read_text()
parsed = _parse(contents)

if isinstance(value, Mapping):
if isinstance(value, FoamFile.Dictionary):
value = value.as_dict()

start, end = _entry_locn(parsed, keywords)

contents = f"{contents[:start]} {keywords[-1]} {{\n}}\n {contents[end:]}"
self.path.write_text(contents)

for k, v in value.items():
self[(*keywords, k)] = v
else:
start, end = _entry_locn(parsed, keywords)

value = _serialize_value(
value, assume_field=assume_field, assume_dimensions=assume_dimensions
)

contents = f"{contents[:start]} {keywords[-1]} {value};\n {contents[end:]}"
self.path.write_text(contents)

def __setitem__(self, keywords: Union[str, Tuple[str, ...]], value: Any) -> None:
self._setitem(keywords, value)

def __delitem__(self, keywords: Union[str, Tuple[str, ...]]) -> None:
if not isinstance(keywords, tuple):
keywords = (keywords,)

contents = self.path.read_text()
parsed = _parse(contents)

start, _, end = parsed[keywords]

self.path.write_text(contents[:start] + contents[end:])

def _iter(self, keywords: Union[str, Tuple[str, ...]] = ()) -> Iterator[str]:
if not isinstance(keywords, tuple):
keywords = (keywords,)

contents = self.path.read_text()
parsed = _parse(contents)

yield from (k[-1] for k in parsed if k[:-1] == keywords)

def __iter__(self) -> Iterator[str]:
return self._iter()

def __fspath__(self) -> str:
return str(self.path)

def __repr__(self) -> str:
return f"{type(self).__name__}({self.path})"

def as_dict(self) -> _FoamDictionary._Dict:
"""
Return a nested dict representation of the file.
"""
contents = self.path.read_text()
parsed = _parse(contents)
ret: _FoamDictionary._Dict = {}
for keywords, (_, value, _) in parsed.items():

r = ret
for k in keywords[:-1]:
assert isinstance(r, dict)
v = r[k]
assert isinstance(v, dict)
r = v

assert isinstance(r, dict)
r[keywords[-1]] = {} if value is None else value

return ret


class FoamFieldFile(FoamFile):
"""An OpenFOAM dictionary file representing a field as a mutable mapping."""

class BoundariesDictionary(_FoamDictionary):
def __getitem__(
self, key: str
) -> Union["FoamFile.Value", "FoamFieldFile.BoundaryDictionary"]:
ret = super().__getitem__(key)
if isinstance(ret, _FoamDictionary):
ret = FoamFieldFile.BoundaryDictionary(
self._file, [*self._keywords, key]
)
return ret
def __getitem__(self, keyword: str) -> "FoamFieldFile.BoundaryDictionary":
return cast(FoamFieldFile.BoundaryDictionary, super().__getitem__(keyword))

def __repr__(self) -> str:
return f"{type(self).__qualname__}({self._file}, {self._keywords})"
Expand Down Expand Up @@ -267,19 +314,30 @@ def value(self) -> None:
def __repr__(self) -> str:
return f"{type(self).__qualname__}({self._file}, {self._keywords})"

def __getitem__(self, key: str) -> Union[FoamFile.Value, _FoamDictionary]:
ret = super().__getitem__(key)
if key == "boundaryField" and isinstance(ret, _FoamDictionary):
ret = FoamFieldFile.BoundariesDictionary(self, [key])
def __getitem__(
self, keywords: Union[str, Tuple[str, ...]]
) -> Union[FoamFile.Value, _FoamDictionary]:
if not isinstance(keywords, tuple):
keywords = (keywords,)

ret = super().__getitem__(keywords)
if keywords[0] == "boundaryField" and isinstance(ret, _FoamDictionary):
if len(keywords) == 1:
ret = FoamFieldFile.BoundariesDictionary(self, keywords)
elif len(keywords) == 2:
ret = FoamFieldFile.BoundaryDictionary(self, keywords)
return ret

def __setitem__(self, key: str, value: Any) -> None:
if key == "internalField":
self._setitem(key, value, assume_field=True)
elif key == "dimensions":
self._setitem(key, value, assume_dimensions=True)
def __setitem__(self, keywords: Union[str, Tuple[str, ...]], value: Any) -> None:
if not isinstance(keywords, tuple):
keywords = (keywords,)

if keywords == ("internalField",):
self._setitem(keywords, value, assume_field=True)
elif keywords == ("dimensions",):
self._setitem(keywords, value, assume_dimensions=True)
else:
self._setitem(key, value)
self._setitem(keywords, value)

@property
def dimensions(self) -> FoamFile.DimensionSet:
Expand Down Expand Up @@ -430,6 +488,27 @@ def _parse(
return ret


def _entry_locn(
parsed: Mapping[Sequence[str], Tuple[int, Optional[FoamFile.Value], int]],
keywords: Tuple[str, ...],
) -> Tuple[int, int]:
"""
Location of an entry or where it should be inserted.
"""
try:
start, _, end = parsed[keywords]
except KeyError:
if len(keywords) > 1:
_, _, end = parsed[keywords[:-1]]
end -= 1
else:
end = -1

start = end

return start, end


def _serialize_bool(value: Any) -> str:
if value is True:
return "yes"
Expand All @@ -450,7 +529,7 @@ def _is_sequence(value: Any) -> bool:

def _serialize_list(value: Any) -> str:
if _is_sequence(value):
return f"({' '.join(serialize(v) for v in value)})"
return f"({' '.join(_serialize_value(v) for v in value)})"
else:
raise TypeError(f"Not a valid sequence: {type(value)}")

Expand Down Expand Up @@ -492,14 +571,14 @@ def _serialize_dimensions(value: Any) -> str:
def _serialize_dimensioned(value: Any) -> str:
if isinstance(value, FoamFile.Dimensioned):
if value.name is not None:
return f"{value.name} {_serialize_dimensions(value.dimensions)} {serialize(value.value)}"
return f"{value.name} {_serialize_dimensions(value.dimensions)} {_serialize_value(value.value)}"
else:
return f"{_serialize_dimensions(value.dimensions)} {serialize(value.value)}"
return f"{_serialize_dimensions(value.dimensions)} {_serialize_value(value.value)}"
else:
raise TypeError(f"Not a valid dimensioned value: {type(value)}")


def serialize(
def _serialize_value(
value: Any, *, assume_field: bool = False, assume_dimensions: bool = False
) -> str:
if isinstance(value, FoamFile.DimensionSet) or assume_dimensions:
Expand Down

0 comments on commit 73c8a66

Please sign in to comment.