From 36787a925010d7675c5e863b5fee0662dd9816b9 Mon Sep 17 00:00:00 2001 From: Gabriel Gerlero Date: Wed, 30 Oct 2024 15:35:07 -0300 Subject: [PATCH] Improve file handling --- foamlib/_files/_base.py | 17 ++------ foamlib/_files/_files.py | 74 ++++++++++++++++++-------------- foamlib/_files/_io.py | 56 ++++++++++-------------- foamlib/_files/_parsing.py | 53 +++++++++++++++++++++-- foamlib/_files/_serialization.py | 2 +- 5 files changed, 117 insertions(+), 85 deletions(-) diff --git a/foamlib/_files/_base.py b/foamlib/_files/_base.py index bca60b7..f116c5a 100644 --- a/foamlib/_files/_base.py +++ b/foamlib/_files/_base.py @@ -43,6 +43,9 @@ def __post_init__(self) -> None: DimensionSet, Sequence["Data"], Mapping[str, "Data"], + "np.ndarray[Tuple[()], np.dtype[np.generic]]", + "np.ndarray[Tuple[int], np.dtype[np.generic]]", + "np.ndarray[Tuple[int, int], np.dtype[np.generic]]", ] """ A value that can be stored in an OpenFOAM file. @@ -50,17 +53,3 @@ def __post_init__(self) -> None: _Dict = Dict[str, Union["Data", "_Dict"]] _File = Dict[Optional[str], Union["Data", "_Dict"]] - - _SetData = Union[ - str, - int, - float, - bool, - Dimensioned, - DimensionSet, - Sequence["_SetData"], - Mapping[str, "_SetData"], - "np.ndarray[Tuple[()], np.dtype[np.generic]]", - "np.ndarray[Tuple[int], np.dtype[np.generic]]", - "np.ndarray[Tuple[int, int], np.dtype[np.generic]]", - ] diff --git a/foamlib/_files/_files.py b/foamlib/_files/_files.py index 1c80cd7..d885136 100644 --- a/foamlib/_files/_files.py +++ b/foamlib/_files/_files.py @@ -1,4 +1,5 @@ import sys +from copy import deepcopy from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, cast if sys.version_info >= (3, 8): @@ -53,7 +54,7 @@ def __getitem__( def __setitem__( self, keyword: str, - data: "FoamFile._SetData", + data: "FoamFile.Data", ) -> None: self._file[(*self._keywords, keyword)] = data @@ -164,16 +165,16 @@ def __getitem__( elif not isinstance(keywords, tuple): keywords = (keywords,) - _, parsed = self._read() + parsed = self._get_parsed() value = parsed[keywords] if value is ...: return FoamFile.SubDict(self, keywords) - return value + return deepcopy(value) def __setitem__( - self, keywords: Optional[Union[str, Tuple[str, ...]]], data: "FoamFile._SetData" + self, keywords: Optional[Union[str, Tuple[str, ...]]], data: "FoamFile.Data" ) -> None: with self: if not keywords: @@ -230,20 +231,28 @@ def __setitem__( self[keywords] = data else: - contents, parsed = self._read(missing_ok=True) + parsed = self._get_parsed(missing_ok=True) start, end = parsed.entry_location(keywords, missing_ok=True) - before = contents[:start].rstrip() + b"\n" - if len(keywords) <= 1: - before += b"\n" - - after = contents[end:] - if after.startswith(b"}"): - after = b" " * (len(keywords) - 2) + after - if not after or after[:1] != b"\n": - after = b"\n" + after - if len(keywords) <= 1 and len(after) > 1 and after[:2] != b"\n\n": + before = b"" + if parsed.contents[:start] and not parsed.contents[:start].endswith( + b"\n" + ): + before = b"\n" + if ( + parsed.contents[:start] + and len(keywords) <= 1 + and not parsed.contents[:start].endswith(b"\n\n") + ): + before = b"\n\n" + + after = b"" + if parsed.contents[end:].startswith(b"}"): + after = b" " * (len(keywords) - 2) + if not parsed.contents[end:] or not parsed.contents[end:].startswith( + b"\n" + ): after = b"\n" + after indentation = b" " * (len(keywords) - 1) @@ -252,7 +261,9 @@ def __setitem__( if isinstance(data, (FoamFile, FoamFile.SubDict)): data = data.as_dict() - self._write( + parsed.put( + keywords, + ..., before + indentation + dumps(keywords[-1]) @@ -261,25 +272,27 @@ def __setitem__( + b"{\n" + indentation + b"}" - + after + + after, ) for k, v in data.items(): self[(*keywords, k)] = v elif keywords: - self._write( + parsed.put( + keywords, + data, before + indentation + dumps(keywords[-1]) + b" " + dumps(data, kind=kind) + b";" - + after + + after, ) else: - self._write(before + dumps(data, kind=kind) + after) + parsed.put(keywords, data, before + dumps(data, kind=kind) + after) def __delitem__(self, keywords: Optional[Union[str, Tuple[str, ...]]]) -> None: if not keywords: @@ -287,15 +300,13 @@ def __delitem__(self, keywords: Optional[Union[str, Tuple[str, ...]]]) -> None: elif not isinstance(keywords, tuple): keywords = (keywords,) - contents, parsed = self._read() - - start, end = parsed.entry_location(keywords) - - self._write(contents[:start] + contents[end:]) + with self: + del self._get_parsed()[keywords] def _iter(self, keywords: Tuple[str, ...] = ()) -> Iterator[Optional[str]]: - _, parsed = self._read() - yield from (k[-1] if k else None for k in parsed if k[:-1] == keywords) + yield from ( + k[-1] if k else None for k in self._get_parsed() if k[:-1] == keywords + ) def __iter__(self) -> Iterator[Optional[str]]: yield from (k for k in self._iter() if k != "FoamFile") @@ -306,9 +317,7 @@ def __contains__(self, keywords: object) -> bool: elif not isinstance(keywords, tuple): keywords = (keywords,) - _, parsed = self._read() - - return keywords in parsed + return keywords in self._get_parsed() def __len__(self) -> int: return len(list(iter(self))) @@ -330,11 +339,10 @@ def as_dict(self, *, include_header: bool = False) -> FoamFileBase._File: :param include_header: Whether to include the "FoamFile" header in the output. """ - _, parsed = self._read() - d = parsed.as_dict() + d = self._get_parsed().as_dict() if not include_header: d.pop("FoamFile", None) - return d + return deepcopy(d) class FoamFieldFile(FoamFile): diff --git a/foamlib/_files/_io.py b/foamlib/_files/_io.py index 51cc85e..541678a 100644 --- a/foamlib/_files/_io.py +++ b/foamlib/_files/_io.py @@ -1,12 +1,10 @@ import gzip import sys -from copy import deepcopy from pathlib import Path from types import TracebackType from typing import ( TYPE_CHECKING, Optional, - Tuple, Type, Union, ) @@ -26,14 +24,13 @@ class FoamFileIO: def __init__(self, path: Union["os.PathLike[str]", str]) -> None: self.path = Path(path).absolute() - self.__contents: Optional[bytes] = None self.__parsed: Optional[Parsed] = None + self.__missing: Optional[bool] = None self.__defer_io = 0 - self.__dirty = False def __enter__(self) -> Self: if self.__defer_io == 0: - self._read(missing_ok=True) + self._get_parsed(missing_ok=True) self.__defer_io += 1 return self @@ -44,47 +41,38 @@ def __exit__( exc_tb: Optional[TracebackType], ) -> None: self.__defer_io -= 1 - if self.__defer_io == 0 and self.__dirty: - assert self.__contents is not None - self._write(self.__contents) + if self.__defer_io == 0: + assert self.__parsed is not None + if self.__parsed.modified or self.__missing: + contents = self.__parsed.contents + + if self.path.suffix == ".gz": + contents = gzip.compress(contents) - def _read(self, *, missing_ok: bool = False) -> Tuple[bytes, Parsed]: + self.path.write_bytes(contents) + + def _get_parsed(self, *, missing_ok: bool = False) -> Parsed: if not self.__defer_io: try: contents = self.path.read_bytes() except FileNotFoundError: - contents = None + self.__missing = True + contents = b"" else: - assert isinstance(contents, bytes) + self.__missing = False if self.path.suffix == ".gz": contents = gzip.decompress(contents) - if contents != self.__contents: - self.__contents = contents - self.__parsed = None + if self.__parsed is None or self.__parsed.contents != contents: + self.__parsed = Parsed(contents) - if self.__contents is None: - if missing_ok: - return b"", Parsed(b"") - raise FileNotFoundError(self.path) + assert self.__parsed is not None + assert self.__missing is not None - if self.__parsed is None: - parsed = Parsed(self.__contents) - self.__parsed = parsed - - return self.__contents, deepcopy(self.__parsed) - - def _write(self, contents: bytes) -> None: - self.__contents = contents - self.__parsed = None - if not self.__defer_io: - if self.path.suffix == ".gz": - contents = gzip.compress(contents) + if self.__missing and not missing_ok: + raise FileNotFoundError(self.path) - self.path.write_bytes(contents) - self.__dirty = False - else: - self.__dirty = True + return self.__parsed def __repr__(self) -> str: return f"{type(self).__qualname__}('{self.path}')" diff --git a/foamlib/_files/_parsing.py b/foamlib/_files/_parsing.py index 1cb00ff..873a988 100644 --- a/foamlib/_files/_parsing.py +++ b/foamlib/_files/_parsing.py @@ -1,4 +1,5 @@ import array +import contextlib import sys from typing import Tuple, Union, cast @@ -213,13 +214,14 @@ def __init__(self, contents: bytes) -> None: Tuple[str, ...], Tuple[int, Union[FoamFileBase.Data, EllipsisType], int], ] = {} - self._end = len(contents) - for parse_result in _FILE.parse_string( contents.decode("latin-1"), parse_all=True ): self._parsed.update(self._flatten_result(parse_result)) + self.contents = contents + self.modified = False + @staticmethod def _flatten_result( parse_result: ParseResults, *, _keywords: Tuple[str, ...] = () @@ -263,6 +265,51 @@ def __getitem__( _, data, _ = self._parsed[keywords] return data + def put( + self, + keywords: Tuple[str, ...], + data: Union[FoamFileBase.Data, EllipsisType], + content: bytes, + ) -> None: + assert not isinstance(data, Mapping) + + with contextlib.suppress(KeyError): + del self[keywords] + + start, end = self.entry_location(keywords, missing_ok=True) + diff = len(content) - (end - start) + + self._parsed[keywords] = (start, data, end + diff) + for k, (s, d, e) in self._parsed.items(): + if s > end: + self._parsed[k] = (s + diff, d, e + diff) + elif e > start: + self._parsed[k] = (s, d, e + diff) + + self.contents = self.contents[:start] + content + self.contents[end:] + self.modified = True + + def __delitem__(self, keywords: Union[str, Tuple[str, ...]]) -> None: + if isinstance(keywords, str): + keywords = (keywords,) + + start, end = self.entry_location(keywords) + del self._parsed[keywords] + + for k in list(self._parsed): + if keywords == k[: len(keywords)]: + del self._parsed[k] + + diff = end - start + for k, (s, d, e) in self._parsed.items(): + if s > end: + self._parsed[k] = (s - diff, d, e - diff) + elif e > start: + self._parsed[k] = (s, d, e - diff) + + self.contents = self.contents[:start] + self.contents[end:] + self.modified = True + def __contains__(self, keywords: object) -> bool: return keywords in self._parsed @@ -283,7 +330,7 @@ def entry_location( _, _, end = self._parsed[keywords[:-1]] end -= 1 else: - end = self._end + end = len(self.contents) start = end else: diff --git a/foamlib/_files/_serialization.py b/foamlib/_files/_serialization.py index 2d76263..29332c5 100644 --- a/foamlib/_files/_serialization.py +++ b/foamlib/_files/_serialization.py @@ -28,7 +28,7 @@ class Kind(Enum): def dumps( - data: FoamFileBase._SetData, + data: FoamFileBase.Data, *, kind: Kind = Kind.DEFAULT, ) -> bytes: