Skip to content

Commit

Permalink
fixup! update greedy option to use an async task and rename it to pre…
Browse files Browse the repository at this point in the history
…fetch
  • Loading branch information
SergioSim committed Nov 29, 2023
1 parent beaa03d commit 34bd639
Show file tree
Hide file tree
Showing 20 changed files with 162 additions and 193 deletions.
20 changes: 14 additions & 6 deletions src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
prefetch: Optional[int] = None,
max_statements: Optional[PositiveInt] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read documents matching the query in the target index and yield them.
Expand All @@ -131,10 +131,12 @@ async def read( # noqa: PLR0913
raw_output (bool): Controls whether to yield dictionaries or bytes.
ignore_errors (bool): No impact as encoding errors are not expected in
Elasticsearch results.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
prefetch: The number of records to prefetch (queue) while yielding.
If `prefetch` is `None` or `0` it defaults to `1` - no records are
prefetched.
If `prefetch` is less than zero, all records are prefetched.
Caution: setting `prefetch<0` might potentially lead to large amounts
of API calls and to the memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -146,7 +148,13 @@ async def read( # noqa: PLR0913
BackendException: If a failure occurs during Elasticsearch connection.
"""
statements = super().read(
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
query,
target,
chunk_size,
raw_output,
ignore_errors,
prefetch,
max_statements,
)
async for statement in statements:
yield statement
Expand Down
20 changes: 14 additions & 6 deletions src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
prefetch: Optional[int] = None,
max_statements: Optional[PositiveInt] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read documents matching the `query` from `target` collection and yield them.
Expand All @@ -138,10 +138,12 @@ async def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
prefetch: The number of records to prefetch (queue) while yielding.
If `prefetch` is `None` or `0` it defaults to `1` - no records are
prefetched.
If `prefetch` is less than zero, all records are prefetched.
Caution: setting `prefetch<0` might potentially lead to large amounts
of API calls and to the memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -155,7 +157,13 @@ async def read( # noqa: PLR0913
BackendParameterException: If the `target` is not a valid collection name.
"""
statements = super().read(
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
query,
target,
chunk_size,
raw_output,
ignore_errors,
prefetch,
max_statements,
)
async for statement in statements:
yield statement
Expand Down
83 changes: 44 additions & 39 deletions src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
from abc import ABC, abstractmethod
from asyncio import Queue, create_task
from enum import Enum, unique
from functools import cached_property
from io import IOBase
Expand Down Expand Up @@ -279,7 +280,6 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read records matching the `query` in the `target` container and yield them.
Expand All @@ -299,10 +299,6 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -315,19 +311,6 @@ def read( # noqa: PLR0913
during encoding records and `ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""
if greedy:
yield from list(
self.read(
query,
target,
chunk_size,
raw_output,
ignore_errors,
False,
max_statements,
)
)
return
chunk_size = chunk_size if chunk_size else self.settings.READ_CHUNK_SIZE
query = validate_backend_query(query, self.query_class, self.logger)
reader = self._read_bytes if raw_output else self._read_dicts
Expand Down Expand Up @@ -529,7 +512,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
prefetch: Optional[int] = None,
max_statements: Optional[PositiveInt] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read records matching the `query` in the `target` container and yield them.
Expand All @@ -549,10 +532,12 @@ async def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
prefetch: The number of records to prefetch (queue) while yielding.
If `prefetch` is `None` or `0` it defaults to `1` - no records are
prefetched.
If `prefetch` is less than zero, all records are prefetched.
Caution: setting `prefetch<0` might potentially lead to large amounts
of API calls and to the memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -565,22 +550,28 @@ async def read( # noqa: PLR0913
during encoding records and `ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""
if greedy:
greedy_statements = [
statement
async for statement in self.read(
query,
target,
chunk_size,
raw_output,
ignore_errors,
False,
max_statements,
)
]

for greedy_statement in greedy_statements:
yield greedy_statement
if prefetch and prefetch != 1:
queue = Queue(prefetch - 1)
statements = self.read(
query,
target,
chunk_size,
raw_output,
ignore_errors,
None,
max_statements,
)
task = create_task(self._queue_records(queue, statements))
while True:
statement = await queue.get()
if statement is None:
error = task.exception()
if error:
raise error

break

yield statement

return

Expand Down Expand Up @@ -623,6 +614,20 @@ async def close(self) -> None:
BackendException: If a failure occurs during the close operation.
"""

async def _queue_records(
self, queue: Queue, records: Union[AsyncIterator[bytes], AsyncIterator[dict]]
):
"""Iterate over the `records` and put them into the `queue`."""
try:
async for record in records:
await queue.put(record)
except Exception as error:
# None signals that the queue is done
await queue.put(None)
raise error

await queue.put(None)


def get_backend_generic_argument(
backend_class: Type[Union[BaseDataBackend, BaseAsyncDataBackend]], position: int
Expand Down
7 changes: 1 addition & 6 deletions src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the query in the target table and yield them.
Expand All @@ -226,10 +225,6 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -242,7 +237,7 @@ def read( # noqa: PLR0913
during encoding documents and `ignore_errors` is set to `False`.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
query, target, chunk_size, raw_output, ignore_errors, max_statements
)

def _read_bytes(
Expand Down
7 changes: 1 addition & 6 deletions src/ralph/backends/data/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the query in the target index and yield them.
Expand All @@ -215,10 +214,6 @@ def read( # noqa: PLR0913
raw_output (bool): Controls whether to yield dictionaries or bytes.
ignore_errors (bool): No impact as encoding errors are not expected in
Elasticsearch results.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -230,7 +225,7 @@ def read( # noqa: PLR0913
BackendException: If a failure occurs during Elasticsearch connection.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
query, target, chunk_size, raw_output, ignore_errors, max_statements
)

def _read_bytes(
Expand Down
7 changes: 1 addition & 6 deletions src/ralph/backends/data/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read files matching the query in the target folder and yield them.
Expand All @@ -172,10 +171,6 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -188,7 +183,7 @@ def read( # noqa: PLR0913
during JSON encoding lines and `ignore_errors` is set to `False`.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
query, target, chunk_size, raw_output, ignore_errors, max_statements
)

def _read_bytes(
Expand Down
7 changes: 1 addition & 6 deletions src/ralph/backends/data/ldp.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = True,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read an archive matching the query in the target stream_id and yield it.
Expand All @@ -170,10 +169,6 @@ def read( # noqa: PLR0913
If `chunk_size` is `None` it defaults to `READ_CHUNK_SIZE`.
raw_output (bool): Should always be set to `True`.
ignore_errors (bool): No impact as no encoding operation is performed.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -185,7 +180,7 @@ def read( # noqa: PLR0913
BackendParameterException: If the `query` argument is not an archive name.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
query, target, chunk_size, raw_output, ignore_errors, max_statements
)

def _read_dicts(
Expand Down
7 changes: 1 addition & 6 deletions src/ralph/backends/data/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the `query` from `target` collection and yield them.
Expand All @@ -193,10 +192,6 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -210,7 +205,7 @@ def read( # noqa: PLR0913
BackendParameterException: If the `target` is not a valid collection name.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
query, target, chunk_size, raw_output, ignore_errors, max_statements
)

def _read_bytes(
Expand Down
7 changes: 1 addition & 6 deletions src/ralph/backends/data/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[PositiveInt] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read an object matching the `query` in the `target` bucket and yield it.
Expand All @@ -186,10 +185,6 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, encoding errors during the read operation
will be ignored and logged.
If `False` (default), a `BackendException` is raised on any error.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
If `None` (default), there is no maximum.
Expand All @@ -203,7 +198,7 @@ def read( # noqa: PLR0913
BackendParameterException: If a backend argument value is not valid.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
query, target, chunk_size, raw_output, ignore_errors, max_statements
)

def _read_bytes(
Expand Down
Loading

0 comments on commit 34bd639

Please sign in to comment.