Skip to content

Commit

Permalink
✨(backends) add greedy option for data backends
Browse files Browse the repository at this point in the history
We try to align the data backend interface with the http backends.
The greedy option allows the caller to greedily read all records
before they are yielded by the generator.
  • Loading branch information
SergioSim committed Nov 29, 2023
1 parent 9143d64 commit beaa03d
Show file tree
Hide file tree
Showing 20 changed files with 222 additions and 52 deletions.
7 changes: 6 additions & 1 deletion src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read documents matching the query in the target index and yield them.
Expand All @@ -130,6 +131,10 @@ async def read( # noqa: PLR0913
raw_output (bool): Controls whether to yield dictionaries or bytes.
ignore_errors (bool): No impact as encoding errors are not expected in
Elasticsearch results.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -141,7 +146,7 @@ async def read( # noqa: PLR0913
BackendException: If a failure occurs during Elasticsearch connection.
"""
statements = super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)
async for statement in statements:
yield statement
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read documents matching the `query` from `target` collection and yield them.
Expand All @@ -137,6 +138,10 @@ async def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -150,7 +155,7 @@ async def read( # noqa: PLR0913
BackendParameterException: If the `target` is not a valid collection name.
"""
statements = super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)
async for statement in statements:
yield statement
Expand Down
56 changes: 53 additions & 3 deletions src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read records matching the `query` in the `target` container and yield them.
Expand All @@ -298,6 +299,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -310,6 +315,19 @@ def read( # noqa: PLR0913
during encoding records and `ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""
if greedy:
yield from list(
self.read(
query,
target,
chunk_size,
raw_output,
ignore_errors,
False,
max_statements,
)
)
return
chunk_size = chunk_size if chunk_size else self.settings.READ_CHUNK_SIZE
query = validate_backend_query(query, self.query_class, self.logger)
reader = self._read_bytes if raw_output else self._read_dicts
Expand All @@ -318,10 +336,14 @@ def read( # noqa: PLR0913
yield from statements
return

if not max_statements:
return

max_statements -= 1
for i, statement in enumerate(statements):
yield statement
if i >= max_statements:
return
yield statement

@abstractmethod
def _read_bytes(
Expand Down Expand Up @@ -507,6 +529,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read records matching the `query` in the `target` container and yield them.
Expand All @@ -526,6 +549,10 @@ async def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -538,6 +565,25 @@ async def read( # noqa: PLR0913
during encoding records and `ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""
if greedy:
greedy_statements = [
statement
async for statement in self.read(
query,
target,
chunk_size,
raw_output,
ignore_errors,
False,
max_statements,
)
]

for greedy_statement in greedy_statements:
yield greedy_statement

return

chunk_size = chunk_size if chunk_size else self.settings.READ_CHUNK_SIZE
query = validate_backend_query(query, self.query_class, self.logger)
reader = self._read_bytes if raw_output else self._read_dicts
Expand All @@ -546,12 +592,16 @@ async def read( # noqa: PLR0913
async for statement in statements:
yield statement
return

if not max_statements:
return

i = 0
async for statement in statements:
if i >= max_statements:
return
yield statement
i += 1
if i >= max_statements:
return

@abstractmethod
async def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the query in the target table and yield them.
Expand All @@ -225,6 +226,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -237,7 +242,7 @@ def read( # noqa: PLR0913
during encoding documents and `ignore_errors` is set to `False`.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the query in the target index and yield them.
Expand All @@ -214,6 +215,10 @@ def read( # noqa: PLR0913
raw_output (bool): Controls whether to yield dictionaries or bytes.
ignore_errors (bool): No impact as encoding errors are not expected in
Elasticsearch results.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -225,7 +230,7 @@ def read( # noqa: PLR0913
BackendException: If a failure occurs during Elasticsearch connection.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read files matching the query in the target folder and yield them.
Expand All @@ -171,6 +172,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -183,7 +188,7 @@ def read( # noqa: PLR0913
during JSON encoding lines and `ignore_errors` is set to `False`.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/ldp.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = True,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read an archive matching the query in the target stream_id and yield it.
Expand All @@ -169,6 +170,10 @@ def read( # noqa: PLR0913
If `chunk_size` is `None` it defaults to `READ_CHUNK_SIZE`.
raw_output (bool): Should always be set to `True`.
ignore_errors (bool): No impact as no encoding operation is performed.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -180,7 +185,7 @@ def read( # noqa: PLR0913
BackendParameterException: If the `query` argument is not an archive name.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_dicts(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the `query` from `target` collection and yield them.
Expand All @@ -192,6 +193,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -205,7 +210,7 @@ def read( # noqa: PLR0913
BackendParameterException: If the `target` is not a valid collection name.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read an object matching the `query` in the `target` bucket and yield it.
Expand All @@ -185,6 +186,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -198,7 +203,7 @@ def read( # noqa: PLR0913
BackendParameterException: If a backend argument value is not valid.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read objects matching the `query` in the `target` container and yield them.
Expand All @@ -199,6 +200,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -212,7 +217,7 @@ def read( # noqa: PLR0913
BackendParameterException: If a backend argument value is not valid.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
Loading

0 comments on commit beaa03d

Please sign in to comment.