Skip to content

Commit

Permalink
Refactor internal dictionaries module
Browse files Browse the repository at this point in the history
  • Loading branch information
gerlero committed Mar 25, 2024
1 parent 1e987de commit 27510f8
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 209 deletions.
204 changes: 16 additions & 188 deletions foamlib/_dictionaries.py → foamlib/_dictionaries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,200 +9,28 @@
MutableMapping,
cast,
)
from collections import namedtuple
from dataclasses import dataclass
from contextlib import suppress

from ._subprocesses import run_process, CalledProcessError
from ._values import FoamDimensionSet, FoamDimensioned, FoamValue
from ._parsing import parse
from ._serialization import serialize
from .._subprocesses import run_process, CalledProcessError

try:
import numpy as np
from numpy.typing import NDArray
except ModuleNotFoundError:
numpy = False
else:
numpy = True

from pyparsing import (
Forward,
Group,
Keyword,
Literal,
Opt,
QuotedString,
Word,
common,
printables,
)

FoamDimensionSet = namedtuple(
"FoamDimensionSet",
[
"mass",
"length",
"time",
"temperature",
"moles",
"current",
"luminous_intensity",
],
defaults=(0, 0, 0, 0, 0, 0, 0),
)
pass


@dataclass
class FoamDimensioned:
value: Union[int, float, Sequence[Union[int, float]]] = 0
dimensions: Union[FoamDimensionSet, Sequence[Union[int, float]]] = (
FoamDimensionSet()
)
name: Optional[str] = None

def __post_init__(self) -> None:
if not isinstance(self.dimensions, FoamDimensionSet):
self.dimensions = FoamDimensionSet(*self.dimensions)


FoamValue = Union[
str, int, float, bool, FoamDimensioned, FoamDimensionSet, Sequence["FoamValue"]
__all__ = [
"FoamDictionary",
"FoamBoundaryDictionary",
"FoamBoundariesDictionary",
"FoamFile",
"FoamFieldFile",
"FoamDimensionSet",
"FoamDimensioned",
"FoamValue",
]
"""
A value that can be stored in an OpenFOAM dictionary.
"""

_YES = Keyword("yes").set_parse_action(lambda s, loc, tks: True)
_NO = Keyword("no").set_parse_action(lambda s, loc, tks: False)
_DIMENSIONS = (
Literal("[").suppress() + common.number * 7 + Literal("]").suppress()
).set_parse_action(lambda s, loc, tks: FoamDimensionSet(*tks))
_TOKEN = common.identifier | QuotedString('"', unquote_results=False)
_ITEM = Forward()
_LIST = Opt(
Literal("List") + Literal("<") + common.identifier + Literal(">")
).suppress() + (
(
Opt(common.integer).suppress()
+ Literal("(").suppress()
+ Group(_ITEM[...])
+ Literal(")").suppress()
)
| (
common.integer + Literal("{").suppress() + _ITEM + Literal("}").suppress()
).set_parse_action(lambda s, loc, tks: [tks[1]] * tks[0])
)
_FIELD = (Keyword("uniform").suppress() + _ITEM) | (
Keyword("nonuniform").suppress() + _LIST
)
_DIMENSIONED = (Opt(common.identifier) + _DIMENSIONS + _ITEM).set_parse_action(
lambda s, loc, tks: FoamDimensioned(*reversed(tks.as_list()))
)
_ITEM <<= (
_FIELD | _LIST | _DIMENSIONED | _DIMENSIONS | common.number | _YES | _NO | _TOKEN
)

_TOKENS = (QuotedString('"', unquote_results=False) | Word(printables))[
1, ...
].set_parse_action(lambda s, loc, tks: " ".join(tks))

_VALUE = _ITEM ^ _TOKENS


def _parse(value: str) -> FoamValue:
return cast(FoamValue, _VALUE.parse_string(value, parse_all=True).as_list()[0])


def _serialize_bool(value: Any) -> str:
if value is True:
return "yes"
elif value is False:
return "no"
else:
raise TypeError(f"Not a bool: {type(value)}")


def _is_sequence(value: Any) -> bool:
return (
isinstance(value, Sequence)
and not isinstance(value, str)
or numpy
and isinstance(value, np.ndarray)
)


def _serialize_list(value: Any) -> str:
if _is_sequence(value):
return f"({' '.join(_serialize(v) for v in value)})"
else:
raise TypeError(f"Not a valid sequence: {type(value)}")


def _serialize_field(value: Any) -> str:
if _is_sequence(value):
try:
s = _serialize_list(value)
except TypeError:
raise TypeError(f"Not a valid field: {type(value)}") from None
else:
if len(value) < 10:
return f"uniform {s}"
else:
if isinstance(value[0], (int, float)):
kind = "scalar"
elif len(value[0]) == 3:
kind = "vector"
elif len(value[0]) == 6:
kind = "symmTensor"
elif len(value[0]) == 9:
kind = "tensor"
else:
raise TypeError(
f"Unsupported sequence length for field: {len(value[0])}"
)
return f"nonuniform List<{kind}> {len(value)}{s}"
else:
return f"uniform {value}"


def _serialize_dimensions(value: Any) -> str:
if _is_sequence(value) and len(value) == 7:
return f"[{' '.join(str(v) for v in value)}]"
else:
raise TypeError(f"Not a valid dimension set: {type(value)}")


def _serialize_dimensioned(value: Any) -> str:
if isinstance(value, FoamDimensioned):
if value.name is not None:
return f"{value.name} {_serialize_dimensions(value.dimensions)} {_serialize(value.value)}"
else:
return (
f"{_serialize_dimensions(value.dimensions)} {_serialize(value.value)}"
)
else:
raise TypeError(f"Not a valid dimensioned value: {type(value)}")


def _serialize(
value: Any, *, assume_field: bool = False, assume_dimensions: bool = False
) -> str:
if isinstance(value, FoamDimensionSet) or assume_dimensions:
with suppress(TypeError):
return _serialize_dimensions(value)

if assume_field:
with suppress(TypeError):
return _serialize_field(value)

with suppress(TypeError):
return _serialize_dimensioned(value)

with suppress(TypeError):
return _serialize_list(value)

with suppress(TypeError):
return _serialize_bool(value)

return str(value)


class FoamDictionary(MutableMapping[str, Union[FoamValue, "FoamDictionary"]]):
Expand Down Expand Up @@ -245,7 +73,7 @@ def __getitem__(self, key: str) -> Union[FoamValue, "FoamDictionary"]:
assert value.endswith("}")
return FoamDictionary(self._file, [*self._keywords, key])
else:
return _parse(value)
return parse(value)

def _setitem(
self,
Expand All @@ -265,7 +93,7 @@ def _setitem(
subdict[k] = v
return
else:
value = _serialize(
value = serialize(
value, assume_field=assume_field, assume_dimensions=assume_dimensions
)

Expand Down
56 changes: 56 additions & 0 deletions foamlib/_dictionaries/_parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from typing import cast

from pyparsing import (
Forward,
Group,
Keyword,
Literal,
Opt,
QuotedString,
Word,
common,
printables,
)

from ._values import FoamValue, FoamDimensionSet, FoamDimensioned


_YES = Keyword("yes").set_parse_action(lambda s, loc, tks: True)
_NO = Keyword("no").set_parse_action(lambda s, loc, tks: False)
_DIMENSIONS = (
Literal("[").suppress() + common.number * 7 + Literal("]").suppress()
).set_parse_action(lambda s, loc, tks: FoamDimensionSet(*tks))
_TOKEN = common.identifier | QuotedString('"', unquote_results=False)
_ITEM = Forward()
_LIST = Opt(
Literal("List") + Literal("<") + common.identifier + Literal(">")
).suppress() + (
(
Opt(common.integer).suppress()
+ Literal("(").suppress()
+ Group(_ITEM[...])
+ Literal(")").suppress()
)
| (
common.integer + Literal("{").suppress() + _ITEM + Literal("}").suppress()
).set_parse_action(lambda s, loc, tks: [tks[1]] * tks[0])
)
_FIELD = (Keyword("uniform").suppress() + _ITEM) | (
Keyword("nonuniform").suppress() + _LIST
)
_DIMENSIONED = (Opt(common.identifier) + _DIMENSIONS + _ITEM).set_parse_action(
lambda s, loc, tks: FoamDimensioned(*reversed(tks.as_list()))
)
_ITEM <<= (
_FIELD | _LIST | _DIMENSIONED | _DIMENSIONS | common.number | _YES | _NO | _TOKEN
)

_TOKENS = (QuotedString('"', unquote_results=False) | Word(printables))[
1, ...
].set_parse_action(lambda s, loc, tks: " ".join(tks))

_VALUE = _ITEM ^ _TOKENS


def parse(value: str) -> FoamValue:
return cast(FoamValue, _VALUE.parse_string(value, parse_all=True).as_list()[0])
103 changes: 103 additions & 0 deletions foamlib/_dictionaries/_serialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from contextlib import suppress
from typing import Any, Sequence

try:
import numpy as np
except ModuleNotFoundError:
numpy = False
else:
numpy = True

from ._values import FoamDimensioned, FoamDimensionSet


def _serialize_bool(value: Any) -> str:
if value is True:
return "yes"
elif value is False:
return "no"
else:
raise TypeError(f"Not a bool: {type(value)}")


def _is_sequence(value: Any) -> bool:
return (
isinstance(value, Sequence)
and not isinstance(value, str)
or numpy
and isinstance(value, np.ndarray)
)


def _serialize_list(value: Any) -> str:
if _is_sequence(value):
return f"({' '.join(serialize(v) for v in value)})"
else:
raise TypeError(f"Not a valid sequence: {type(value)}")


def _serialize_field(value: Any) -> str:
if _is_sequence(value):
try:
s = _serialize_list(value)
except TypeError:
raise TypeError(f"Not a valid field: {type(value)}") from None
else:
if len(value) < 10:
return f"uniform {s}"
else:
if isinstance(value[0], (int, float)):
kind = "scalar"
elif len(value[0]) == 3:
kind = "vector"
elif len(value[0]) == 6:
kind = "symmTensor"
elif len(value[0]) == 9:
kind = "tensor"
else:
raise TypeError(
f"Unsupported sequence length for field: {len(value[0])}"
)
return f"nonuniform List<{kind}> {len(value)}{s}"
else:
return f"uniform {value}"


def _serialize_dimensions(value: Any) -> str:
if _is_sequence(value) and len(value) == 7:
return f"[{' '.join(str(v) for v in value)}]"
else:
raise TypeError(f"Not a valid dimension set: {type(value)}")


def _serialize_dimensioned(value: Any) -> str:
if isinstance(value, FoamDimensioned):
if value.name is not None:
return f"{value.name} {_serialize_dimensions(value.dimensions)} {serialize(value.value)}"
else:
return f"{_serialize_dimensions(value.dimensions)} {serialize(value.value)}"
else:
raise TypeError(f"Not a valid dimensioned value: {type(value)}")


def serialize(
value: Any, *, assume_field: bool = False, assume_dimensions: bool = False
) -> str:
if isinstance(value, FoamDimensionSet) or assume_dimensions:
with suppress(TypeError):
return _serialize_dimensions(value)

if assume_field:
with suppress(TypeError):
return _serialize_field(value)

with suppress(TypeError):
return _serialize_dimensioned(value)

with suppress(TypeError):
return _serialize_list(value)

with suppress(TypeError):
return _serialize_bool(value)

return str(value)
Loading

0 comments on commit 27510f8

Please sign in to comment.