Skip to content

Commit

Permalink
✨(backends) add greedy option for data backends
Browse files Browse the repository at this point in the history
We try to align the data backend interface with the http backends.
The greedy option allows the caller to greedily read all records
before they are yielded by the generator.
  • Loading branch information
SergioSim committed Nov 14, 2023
1 parent 27ef4ff commit a80bb8d
Show file tree
Hide file tree
Showing 20 changed files with 222 additions and 52 deletions.
7 changes: 6 additions & 1 deletion src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read documents matching the query in the target index and yield them.
Expand All @@ -130,6 +131,10 @@ async def read( # noqa: PLR0913
ignore_errors (bool): If `True`, errors during the encoding operation
will be ignored and logged. If `False` (default), a `BackendException`
will be raised if an error occurs.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yield:
Expand All @@ -140,7 +145,7 @@ async def read( # noqa: PLR0913
BackendException: If a failure occurs during Elasticsearch connection.
"""
statements = super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)
async for statement in statements:
yield statement
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read documents matching the `query` from `target` collection and yield them.
Expand All @@ -134,6 +135,10 @@ async def read( # noqa: PLR0913
If `chunk_size` is `None` it defaults to `READ_CHUNK_SIZE`.
raw_output (bool): Whether to yield dictionaries or bytes.
ignore_errors (bool): Whether to ignore errors when reading documents.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yield:
Expand All @@ -145,7 +150,7 @@ async def read( # noqa: PLR0913
BackendParameterException: If a failure occurs with MongoDB collection.
"""
statements = super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)
async for statement in statements:
yield statement
Expand Down
56 changes: 53 additions & 3 deletions src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read records matching the `query` in the `target` container and yield them.
Expand All @@ -299,6 +300,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, errors during the read operation
are be ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yield:
Expand All @@ -310,6 +315,19 @@ def read( # noqa: PLR0913
`ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""
if greedy:
yield from list(
self.read(
query,
target,
chunk_size,
raw_output,
ignore_errors,
False,
max_statements,
)
)
return
chunk_size = chunk_size if chunk_size else self.settings.READ_CHUNK_SIZE
query = validate_backend_query(query, self.query_class, self.logger)
reader = self._read_bytes if raw_output else self._read_dicts
Expand All @@ -318,10 +336,14 @@ def read( # noqa: PLR0913
yield from statements
return

if not max_statements:
return

max_statements -= 1
for i, statement in enumerate(statements):
yield statement
if i >= max_statements:
return
yield statement

@abstractmethod
def _read_bytes(
Expand Down Expand Up @@ -515,6 +537,7 @@ async def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]:
"""Read records matching the `query` in the `target` container and yield them.
Expand All @@ -534,6 +557,10 @@ async def read( # noqa: PLR0913
ignore_errors (bool): If `True`, errors during the read operation
are be ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yield:
Expand All @@ -545,6 +572,25 @@ async def read( # noqa: PLR0913
`ignore_errors` is set to `False`.
BackendParameterException: If a backend argument value is not valid.
"""
if greedy:
greedy_statements = [
statement
async for statement in self.read(
query,
target,
chunk_size,
raw_output,
ignore_errors,
False,
max_statements,
)
]

for greedy_statement in greedy_statements:
yield greedy_statement

return

chunk_size = chunk_size if chunk_size else self.settings.READ_CHUNK_SIZE
query = validate_backend_query(query, self.query_class, self.logger)
reader = self._read_bytes if raw_output else self._read_dicts
Expand All @@ -553,12 +599,16 @@ async def read( # noqa: PLR0913
async for statement in statements:
yield statement
return

if not max_statements:
return

i = 0
async for statement in statements:
if i >= max_statements:
return
yield statement
i += 1
if i >= max_statements:
return

@abstractmethod
async def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the query in the target table and yield them.
Expand All @@ -225,6 +226,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, errors during the encoding operation
will be ignored and logged. If `False` (default), a `BackendException`
will be raised if an error occurs.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yield:
Expand All @@ -235,7 +240,7 @@ def read( # noqa: PLR0913
BackendException: If a failure occurs during ClickHouse connection.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the query in the target index and yield them.
Expand All @@ -215,6 +216,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, errors during the encoding operation
will be ignored and logged. If `False` (default), a `BackendException`
will be raised if an error occurs.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yield:
Expand All @@ -225,7 +230,7 @@ def read( # noqa: PLR0913
BackendException: If a failure occurs during Elasticsearch connection.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read files matching the query in the target folder and yield them.
Expand All @@ -169,6 +170,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, errors during the read operation
will be ignored and logged. If `False` (default), a `BackendException`
will be raised if an error occurs.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yields:
Expand All @@ -180,7 +185,7 @@ def read( # noqa: PLR0913
`ignore_errors` is set to `False`.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/ldp.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = True,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read an archive matching the query in the target stream_id and yield it.
Expand All @@ -168,6 +169,10 @@ def read( # noqa: PLR0913
If `chunk_size` is `None` it defaults to `READ_CHUNK_SIZE`.
raw_output (bool): Should always be set to `True`.
ignore_errors (bool): Ignored.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yields:
Expand All @@ -179,7 +184,7 @@ def read( # noqa: PLR0913
or if the `raw_output` argument is set to `False`.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_dicts(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read documents matching the `query` from `target` collection and yield them.
Expand All @@ -190,6 +191,10 @@ def read( # noqa: PLR0913
If `chunk_size` is `None` it defaults to `READ_CHUNK_SIZE`.
raw_output (bool): Whether to yield dictionaries or bytes.
ignore_errors (bool): Whether to ignore errors when reading documents.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yields:
Expand All @@ -201,7 +206,7 @@ def read( # noqa: PLR0913
BackendParameterException: If the `target` is not a valid collection name.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read an object matching the `query` in the `target` bucket and yield it.
Expand All @@ -183,6 +184,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, errors during the read operation
will be ignored and logged. If `False` (default), a `BackendException`
will be raised if an error occurs.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yields:
Expand All @@ -196,7 +201,7 @@ def read( # noqa: PLR0913
`ignore_errors` is set to `False`.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
7 changes: 6 additions & 1 deletion src/ralph/backends/data/swift.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def read( # noqa: PLR0913
chunk_size: Optional[int] = None,
raw_output: bool = False,
ignore_errors: bool = False,
greedy: bool = False,
max_statements: Optional[int] = None,
) -> Union[Iterator[bytes], Iterator[dict]]:
"""Read objects matching the `query` in the `target` container and yield them.
Expand All @@ -198,6 +199,10 @@ def read( # noqa: PLR0913
ignore_errors (bool): If `True`, errors during the read operation
are be ignored and logged. If `False` (default), a `BackendException`
is raised if an error occurs.
greedy: If set to `True`, the client will fetch all available records
before they are yielded by the generator. Caution:
this might potentially lead to large amounts of API calls and to the
memory filling up.
max_statements (int): The maximum number of statements to yield.
Yields:
Expand All @@ -210,7 +215,7 @@ def read( # noqa: PLR0913
BackendParameterException: If a backend argument value is not valid.
"""
yield from super().read(
query, target, chunk_size, raw_output, ignore_errors, max_statements
query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements
)

def _read_bytes(
Expand Down
Loading

0 comments on commit a80bb8d

Please sign in to comment.