From beaa03d6866defdc0b16ef58eeb3ec6ee6035e05 Mon Sep 17 00:00:00 2001 From: SergioSim Date: Tue, 14 Nov 2023 21:22:04 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(backends)=20add=20greedy=20option=20f?= =?UTF-8?q?or=20data=20backends?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We try to align the data backend interface with the http backends. The greedy option allows the caller to greedily read all records before they are yielded by the generator. --- src/ralph/backends/data/async_es.py | 7 +++- src/ralph/backends/data/async_mongo.py | 7 +++- src/ralph/backends/data/base.py | 56 +++++++++++++++++++++++-- src/ralph/backends/data/clickhouse.py | 7 +++- src/ralph/backends/data/es.py | 7 +++- src/ralph/backends/data/fs.py | 7 +++- src/ralph/backends/data/ldp.py | 7 +++- src/ralph/backends/data/mongo.py | 7 +++- src/ralph/backends/data/s3.py | 7 +++- src/ralph/backends/data/swift.py | 7 +++- tests/backends/data/test_async_es.py | 14 +++++-- tests/backends/data/test_async_mongo.py | 11 +++-- tests/backends/data/test_base.py | 50 ++++++++++++++++++++++ tests/backends/data/test_clickhouse.py | 11 +++-- tests/backends/data/test_es.py | 10 +++-- tests/backends/data/test_fs.py | 22 ++++++---- tests/backends/data/test_ldp.py | 7 +++- tests/backends/data/test_mongo.py | 10 +++-- tests/backends/data/test_s3.py | 10 ++--- tests/backends/data/test_swift.py | 10 +++-- 20 files changed, 222 insertions(+), 52 deletions(-) diff --git a/src/ralph/backends/data/async_es.py b/src/ralph/backends/data/async_es.py index 311ddf137..9800cf2ac 100644 --- a/src/ralph/backends/data/async_es.py +++ b/src/ralph/backends/data/async_es.py @@ -115,6 +115,7 @@ async def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]: """Read documents matching the query in the target index and yield them. @@ -130,6 +131,10 @@ async def read( # noqa: PLR0913 raw_output (bool): Controls whether to yield dictionaries or bytes. ignore_errors (bool): No impact as encoding errors are not expected in Elasticsearch results. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -141,7 +146,7 @@ async def read( # noqa: PLR0913 BackendException: If a failure occurs during Elasticsearch connection. """ statements = super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) async for statement in statements: yield statement diff --git a/src/ralph/backends/data/async_mongo.py b/src/ralph/backends/data/async_mongo.py index 811e10388..2d7cb2ff3 100644 --- a/src/ralph/backends/data/async_mongo.py +++ b/src/ralph/backends/data/async_mongo.py @@ -123,6 +123,7 @@ async def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]: """Read documents matching the `query` from `target` collection and yield them. @@ -137,6 +138,10 @@ async def read( # noqa: PLR0913 ignore_errors (bool): If `True`, encoding errors during the read operation will be ignored and logged. If `False` (default), a `BackendException` is raised on any error. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -150,7 +155,7 @@ async def read( # noqa: PLR0913 BackendParameterException: If the `target` is not a valid collection name. """ statements = super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) async for statement in statements: yield statement diff --git a/src/ralph/backends/data/base.py b/src/ralph/backends/data/base.py index 308a1423b..29d09af2e 100644 --- a/src/ralph/backends/data/base.py +++ b/src/ralph/backends/data/base.py @@ -279,6 +279,7 @@ def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[Iterator[bytes], Iterator[dict]]: """Read records matching the `query` in the `target` container and yield them. @@ -298,6 +299,10 @@ def read( # noqa: PLR0913 ignore_errors (bool): If `True`, encoding errors during the read operation will be ignored and logged. If `False` (default), a `BackendException` is raised on any error. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -310,6 +315,19 @@ def read( # noqa: PLR0913 during encoding records and `ignore_errors` is set to `False`. BackendParameterException: If a backend argument value is not valid. """ + if greedy: + yield from list( + self.read( + query, + target, + chunk_size, + raw_output, + ignore_errors, + False, + max_statements, + ) + ) + return chunk_size = chunk_size if chunk_size else self.settings.READ_CHUNK_SIZE query = validate_backend_query(query, self.query_class, self.logger) reader = self._read_bytes if raw_output else self._read_dicts @@ -318,10 +336,14 @@ def read( # noqa: PLR0913 yield from statements return + if not max_statements: + return + + max_statements -= 1 for i, statement in enumerate(statements): + yield statement if i >= max_statements: return - yield statement @abstractmethod def _read_bytes( @@ -507,6 +529,7 @@ async def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[AsyncIterator[bytes], AsyncIterator[dict]]: """Read records matching the `query` in the `target` container and yield them. @@ -526,6 +549,10 @@ async def read( # noqa: PLR0913 ignore_errors (bool): If `True`, encoding errors during the read operation will be ignored and logged. If `False` (default), a `BackendException` is raised on any error. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -538,6 +565,25 @@ async def read( # noqa: PLR0913 during encoding records and `ignore_errors` is set to `False`. BackendParameterException: If a backend argument value is not valid. """ + if greedy: + greedy_statements = [ + statement + async for statement in self.read( + query, + target, + chunk_size, + raw_output, + ignore_errors, + False, + max_statements, + ) + ] + + for greedy_statement in greedy_statements: + yield greedy_statement + + return + chunk_size = chunk_size if chunk_size else self.settings.READ_CHUNK_SIZE query = validate_backend_query(query, self.query_class, self.logger) reader = self._read_bytes if raw_output else self._read_dicts @@ -546,12 +592,16 @@ async def read( # noqa: PLR0913 async for statement in statements: yield statement return + + if not max_statements: + return + i = 0 async for statement in statements: - if i >= max_statements: - return yield statement i += 1 + if i >= max_statements: + return @abstractmethod async def _read_bytes( diff --git a/src/ralph/backends/data/clickhouse.py b/src/ralph/backends/data/clickhouse.py index 1841b44c2..4b9d2d5d1 100755 --- a/src/ralph/backends/data/clickhouse.py +++ b/src/ralph/backends/data/clickhouse.py @@ -211,6 +211,7 @@ def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[Iterator[bytes], Iterator[dict]]: """Read documents matching the query in the target table and yield them. @@ -225,6 +226,10 @@ def read( # noqa: PLR0913 ignore_errors (bool): If `True`, encoding errors during the read operation will be ignored and logged. If `False` (default), a `BackendException` is raised on any error. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -237,7 +242,7 @@ def read( # noqa: PLR0913 during encoding documents and `ignore_errors` is set to `False`. """ yield from super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) def _read_bytes( diff --git a/src/ralph/backends/data/es.py b/src/ralph/backends/data/es.py index 0020a24fd..85c3e26aa 100644 --- a/src/ralph/backends/data/es.py +++ b/src/ralph/backends/data/es.py @@ -199,6 +199,7 @@ def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[Iterator[bytes], Iterator[dict]]: """Read documents matching the query in the target index and yield them. @@ -214,6 +215,10 @@ def read( # noqa: PLR0913 raw_output (bool): Controls whether to yield dictionaries or bytes. ignore_errors (bool): No impact as encoding errors are not expected in Elasticsearch results. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -225,7 +230,7 @@ def read( # noqa: PLR0913 BackendException: If a failure occurs during Elasticsearch connection. """ yield from super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) def _read_bytes( diff --git a/src/ralph/backends/data/fs.py b/src/ralph/backends/data/fs.py index f111fbff1..3257d6d8a 100644 --- a/src/ralph/backends/data/fs.py +++ b/src/ralph/backends/data/fs.py @@ -154,6 +154,7 @@ def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[Iterator[bytes], Iterator[dict]]: """Read files matching the query in the target folder and yield them. @@ -171,6 +172,10 @@ def read( # noqa: PLR0913 ignore_errors (bool): If `True`, encoding errors during the read operation will be ignored and logged. If `False` (default), a `BackendException` is raised on any error. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -183,7 +188,7 @@ def read( # noqa: PLR0913 during JSON encoding lines and `ignore_errors` is set to `False`. """ yield from super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) def _read_bytes( diff --git a/src/ralph/backends/data/ldp.py b/src/ralph/backends/data/ldp.py index 53b2e6d9b..207daf398 100644 --- a/src/ralph/backends/data/ldp.py +++ b/src/ralph/backends/data/ldp.py @@ -157,6 +157,7 @@ def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = True, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[Iterator[bytes], Iterator[dict]]: """Read an archive matching the query in the target stream_id and yield it. @@ -169,6 +170,10 @@ def read( # noqa: PLR0913 If `chunk_size` is `None` it defaults to `READ_CHUNK_SIZE`. raw_output (bool): Should always be set to `True`. ignore_errors (bool): No impact as no encoding operation is performed. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -180,7 +185,7 @@ def read( # noqa: PLR0913 BackendParameterException: If the `query` argument is not an archive name. """ yield from super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) def _read_dicts( diff --git a/src/ralph/backends/data/mongo.py b/src/ralph/backends/data/mongo.py index 3f960694f..9bcc67864 100644 --- a/src/ralph/backends/data/mongo.py +++ b/src/ralph/backends/data/mongo.py @@ -178,6 +178,7 @@ def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[Iterator[bytes], Iterator[dict]]: """Read documents matching the `query` from `target` collection and yield them. @@ -192,6 +193,10 @@ def read( # noqa: PLR0913 ignore_errors (bool): If `True`, encoding errors during the read operation will be ignored and logged. If `False` (default), a `BackendException` is raised on any error. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -205,7 +210,7 @@ def read( # noqa: PLR0913 BackendParameterException: If the `target` is not a valid collection name. """ yield from super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) def _read_bytes( diff --git a/src/ralph/backends/data/s3.py b/src/ralph/backends/data/s3.py index 3643b8ff1..22071bf8b 100644 --- a/src/ralph/backends/data/s3.py +++ b/src/ralph/backends/data/s3.py @@ -170,6 +170,7 @@ def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[Iterator[bytes], Iterator[dict]]: """Read an object matching the `query` in the `target` bucket and yield it. @@ -185,6 +186,10 @@ def read( # noqa: PLR0913 ignore_errors (bool): If `True`, encoding errors during the read operation will be ignored and logged. If `False` (default), a `BackendException` is raised on any error. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -198,7 +203,7 @@ def read( # noqa: PLR0913 BackendParameterException: If a backend argument value is not valid. """ yield from super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) def _read_bytes( diff --git a/src/ralph/backends/data/swift.py b/src/ralph/backends/data/swift.py index 006a8cc92..fdff26527 100644 --- a/src/ralph/backends/data/swift.py +++ b/src/ralph/backends/data/swift.py @@ -180,6 +180,7 @@ def read( # noqa: PLR0913 chunk_size: Optional[int] = None, raw_output: bool = False, ignore_errors: bool = False, + greedy: bool = False, max_statements: Optional[PositiveInt] = None, ) -> Union[Iterator[bytes], Iterator[dict]]: """Read objects matching the `query` in the `target` container and yield them. @@ -199,6 +200,10 @@ def read( # noqa: PLR0913 ignore_errors (bool): If `True`, encoding errors during the read operation will be ignored and logged. If `False` (default), a `BackendException` is raised on any error. + greedy: If set to `True`, the client will fetch all available records + before they are yielded by the generator. Caution: + this might potentially lead to large amounts of API calls and to the + memory filling up. max_statements (int): The maximum number of statements to yield. If `None` (default), there is no maximum. @@ -212,7 +217,7 @@ def read( # noqa: PLR0913 BackendParameterException: If a backend argument value is not valid. """ yield from super().read( - query, target, chunk_size, raw_output, ignore_errors, max_statements + query, target, chunk_size, raw_output, ignore_errors, greedy, max_statements ) def _read_bytes( diff --git a/tests/backends/data/test_async_es.py b/tests/backends/data/test_async_es.py index cf6e9f735..c6374e00b 100644 --- a/tests/backends/data/test_async_es.py +++ b/tests/backends/data/test_async_es.py @@ -398,13 +398,16 @@ async def mock_async_es_search(**kwargs): @pytest.mark.anyio -async def test_backends_data_async_es_read_with_raw_ouput(es, async_es_backend): +@pytest.mark.parametrize("greedy", [False, True]) +async def test_backends_data_async_es_read_with_raw_ouput(greedy, es, async_es_backend): """Test the `AsyncESDataBackend.read` method with `raw_output` set to `True`.""" backend = async_es_backend() documents = [{"id": idx, "timestamp": now()} for idx in range(10)] assert await backend.write(documents) == 10 - hits = [statement async for statement in backend.read(raw_output=True)] + hits = [ + statement async for statement in backend.read(raw_output=True, greedy=greedy) + ] for i, hit in enumerate(hits): assert isinstance(hit, bytes) assert json.loads(hit).get("_source") == documents[i] @@ -413,13 +416,16 @@ async def test_backends_data_async_es_read_with_raw_ouput(es, async_es_backend): @pytest.mark.anyio -async def test_backends_data_async_es_read_without_raw_ouput(es, async_es_backend): +@pytest.mark.parametrize("greedy", [False, True]) +async def test_backends_data_async_es_read_without_raw_ouput( + greedy, es, async_es_backend +): """Test the `AsyncESDataBackend.read` method with `raw_output` set to `False`.""" backend = async_es_backend() documents = [{"id": idx, "timestamp": now()} for idx in range(10)] assert await backend.write(documents) == 10 - hits = [statement async for statement in backend.read()] + hits = [statement async for statement in backend.read(greedy=greedy)] for i, hit in enumerate(hits): assert isinstance(hit, dict) assert hit.get("_source") == documents[i] diff --git a/tests/backends/data/test_async_mongo.py b/tests/backends/data/test_async_mongo.py index fb3badae9..a53288e21 100644 --- a/tests/backends/data/test_async_mongo.py +++ b/tests/backends/data/test_async_mongo.py @@ -314,7 +314,9 @@ async def test_backends_data_async_mongo_list_with_history( @pytest.mark.anyio +@pytest.mark.parametrize("greedy", [False, True]) async def test_backends_data_async_mongo_read_with_raw_output( + greedy, mongo, async_mongo_backend, ): @@ -334,7 +336,9 @@ async def test_backends_data_async_mongo_read_with_raw_output( await backend.collection.insert_many(documents) await backend.database.foobar.insert_many(documents[:2]) - result = [statement async for statement in backend.read(raw_output=True)] + result = [ + statement async for statement in backend.read(raw_output=True, greedy=greedy) + ] assert result == expected result = [ statement async for statement in backend.read(raw_output=True, target="foobar") @@ -351,8 +355,9 @@ async def test_backends_data_async_mongo_read_with_raw_output( @pytest.mark.anyio +@pytest.mark.parametrize("greedy", [False, True]) async def test_backends_data_async_mongo_read_without_raw_output( - mongo, async_mongo_backend + greedy, mongo, async_mongo_backend ): """Test the `AsyncMongoDataBackend.read` method with `raw_output` set to `False`. @@ -372,7 +377,7 @@ async def test_backends_data_async_mongo_read_without_raw_output( await backend.collection.insert_many(documents) await backend.database.foobar.insert_many(documents[:2]) - assert [statement async for statement in backend.read()] == expected + assert [statement async for statement in backend.read(greedy=greedy)] == expected assert [statement async for statement in backend.read(target="foobar")] == expected[ :2 ] diff --git a/tests/backends/data/test_base.py b/tests/backends/data/test_base.py index f67f6b5cc..9fa28c203 100644 --- a/tests/backends/data/test_base.py +++ b/tests/backends/data/test_base.py @@ -158,6 +158,56 @@ def close(self): assert not list(backend.read(max_statements=0, raw_output=True)) +def test_backends_data_base_read_with_greedy(): + """Test the `BaseDataBackend.read` method with `greedy` argument.""" + + def get_mock_read_dicts(data): + """Return a `_read_dict` mock method yielding data.""" + + def mock_read_dicts(self, *args): + """Mock the `BaseDataBackend._read_dicts` method yielding data.""" + for item in data: + yield item + + return mock_read_dicts + + class MockBaseDataBackend(BaseDataBackend[BaseDataBackendSettings, BaseQuery]): + """A class mocking the base database class.""" + + def _read_dicts(self, *args): + pass + + def _read_bytes(self, *args): + pass + + def status(self): + pass + + def close(self): + pass + + backend = MockBaseDataBackend() + # Given `greedy` set to `False`, the `read` method should consume data on demand. + data = ({"foo": "bar"} for _ in range(4)) + backend._read_dicts = get_mock_read_dicts(data) + assert next(backend.read()) == {"foo": "bar"} + assert len(list(data)) == 3 # one item was requested - one item was consumed. + + # Given `greedy` set to `True`, the `read` method should consume all data. + data = ({"foo": "bar"} for _ in range(4)) + backend._read_dicts = get_mock_read_dicts(data) + assert next(backend.read(greedy=True)) == {"foo": "bar"} + assert not list(data) # all items are consumed. + + # Given `greedy` set to `True` and a `max_statements` limit, the `read` method + # should consume all data until reaching `max_statements`. + data = ({"foo": "bar"} for _ in range(4)) + backend._read_dicts = get_mock_read_dicts(data) + statements = backend.read(greedy=True, max_statements=3) + assert next(statements) == {"foo": "bar"} + assert list(data) == [{"foo": "bar"}] # 3 items are consumed. + + @pytest.mark.anyio async def test_backends_data_base_async_read_with_max_statements(): """Test the async `BaseDataBackend.read` method with `max_statements` argument.""" diff --git a/tests/backends/data/test_clickhouse.py b/tests/backends/data/test_clickhouse.py index a3f6f93ca..3714eb0b7 100644 --- a/tests/backends/data/test_clickhouse.py +++ b/tests/backends/data/test_clickhouse.py @@ -110,7 +110,10 @@ def mock_query(*_, **__): backend.close() -def test_backends_data_clickhouse_read_with_raw_output(clickhouse, clickhouse_backend): +@pytest.mark.parametrize("greedy", [False, True]) +def test_backends_data_clickhouse_read_with_raw_output( + greedy, clickhouse, clickhouse_backend +): """Test the `ClickHouseDataBackend.read` method.""" # Create records @@ -127,19 +130,19 @@ def test_backends_data_clickhouse_read_with_raw_output(clickhouse, clickhouse_ba backend = clickhouse_backend() backend.write(statements) - results = list(backend.read()) + results = list(backend.read(greedy=greedy)) assert len(results) == 3 assert results[0]["event"] == statements[0] assert results[1]["event"] == statements[1] assert results[2]["event"] == statements[2] - results = list(backend.read(chunk_size=10)) + results = list(backend.read(chunk_size=10, greedy=greedy)) assert len(results) == 3 assert results[0]["event"] == statements[0] assert results[1]["event"] == statements[1] assert results[2]["event"] == statements[2] - results = list(backend.read(raw_output=True)) + results = list(backend.read(raw_output=True, greedy=greedy)) assert len(results) == 3 assert isinstance(results[0], bytes) assert json.loads(results[0])["event"] == statements[0] diff --git a/tests/backends/data/test_es.py b/tests/backends/data/test_es.py index 8552abf5c..f8db4a756 100644 --- a/tests/backends/data/test_es.py +++ b/tests/backends/data/test_es.py @@ -358,13 +358,14 @@ def mock_es_search(**kwargs): backend.close() -def test_backends_data_es_read_with_raw_ouput(es, es_backend): +@pytest.mark.parametrize("greedy", [False, True]) +def test_backends_data_es_read_with_raw_ouput(greedy, es, es_backend): """Test the `ESDataBackend.read` method with `raw_output` set to `True`.""" backend = es_backend() documents = [{"id": idx, "timestamp": now()} for idx in range(10)] assert backend.write(documents) == 10 - hits = list(backend.read(raw_output=True)) + hits = list(backend.read(raw_output=True, greedy=greedy)) for i, hit in enumerate(hits): assert isinstance(hit, bytes) assert json.loads(hit).get("_source") == documents[i] @@ -372,13 +373,14 @@ def test_backends_data_es_read_with_raw_ouput(es, es_backend): backend.close() -def test_backends_data_es_read_without_raw_ouput(es, es_backend): +@pytest.mark.parametrize("greedy", [False, True]) +def test_backends_data_es_read_without_raw_ouput(greedy, es, es_backend): """Test the `ESDataBackend.read` method with `raw_output` set to `False`.""" backend = es_backend() documents = [{"id": idx, "timestamp": now()} for idx in range(10)] assert backend.write(documents) == 10 - hits = backend.read() + hits = backend.read(greedy=greedy) for i, hit in enumerate(hits): assert isinstance(hit, dict) assert hit.get("_source") == documents[i] diff --git a/tests/backends/data/test_fs.py b/tests/backends/data/test_fs.py index 23223288e..8b6cf79b9 100644 --- a/tests/backends/data/test_fs.py +++ b/tests/backends/data/test_fs.py @@ -358,7 +358,8 @@ def test_backends_data_fs_list_with_history_and_details(fs_backend, fs): assert sorted(result, key=itemgetter("path")) == expected -def test_backends_data_fs_read_with_raw_ouput(fs_backend, fs, monkeypatch): +@pytest.mark.parametrize("greedy", [False, True]) +def test_backends_data_fs_read_with_raw_ouput(greedy, fs_backend, fs, monkeypatch): """Test the `FSDataBackend.read` method with `raw_output` set to `True`.""" # Create files in absolute path directory. @@ -378,7 +379,7 @@ def test_backends_data_fs_read_with_raw_ouput(fs_backend, fs, monkeypatch): # Given no `target`, the `read` method should read all files in the default # directory and yield bytes. - result = backend.read(raw_output=True) + result = backend.read(raw_output=True, greedy=greedy) assert isinstance(result, Iterable) assert list(result) == [b"baz"] @@ -397,7 +398,7 @@ def test_backends_data_fs_read_with_raw_ouput(fs_backend, fs, monkeypatch): # Given an absolute `target` path, the `read` method should read all files in the # target directory and yield bytes. - result = backend.read(raw_output=True, target=absolute_path) + result = backend.read(raw_output=True, target=absolute_path, greedy=greedy) assert isinstance(result, Iterable) assert list(result) == [b"foo", b"bar"] @@ -424,7 +425,7 @@ def test_backends_data_fs_read_with_raw_ouput(fs_backend, fs, monkeypatch): # Given a relative `target` path, the `read` method should read all files in the # target directory relative to the default directory and yield bytes. - result = backend.read(raw_output=True, target="./bar") + result = backend.read(raw_output=True, target="./bar", greedy=greedy) assert isinstance(result, Iterable) assert list(result) == [b"qux"] @@ -444,7 +445,9 @@ def test_backends_data_fs_read_with_raw_ouput(fs_backend, fs, monkeypatch): # Given a `chunk_size` and an absolute `target` path, # the `read` method should write the output bytes in chunks of the specified # `chunk_size`. - result = backend.read(raw_output=True, target=absolute_path, chunk_size=2) + result = backend.read( + raw_output=True, target=absolute_path, chunk_size=2, greedy=greedy + ) assert isinstance(result, Iterable) assert list(result) == [b"fo", b"o", b"ba", b"r"] @@ -470,7 +473,8 @@ def test_backends_data_fs_read_with_raw_ouput(fs_backend, fs, monkeypatch): ] -def test_backends_data_fs_read_without_raw_output(fs_backend, fs, monkeypatch): +@pytest.mark.parametrize("greedy", [False, True]) +def test_backends_data_fs_read_without_raw_output(greedy, fs_backend, fs, monkeypatch): """Test the `FSDataBackend.read` method with `raw_output` set to `False`.""" # File contents. @@ -495,7 +499,7 @@ def test_backends_data_fs_read_without_raw_output(fs_backend, fs, monkeypatch): # Given no `target`, the `read` method should read all files in the default # directory and yield dictionaries. - result = backend.read(raw_output=False) + result = backend.read(raw_output=False, greedy=greedy) assert isinstance(result, Iterable) assert list(result) == [valid_dictionary, valid_dictionary] @@ -514,7 +518,7 @@ def test_backends_data_fs_read_without_raw_output(fs_backend, fs, monkeypatch): # Given an absolute `target` path, the `read` method should read all files in the # target directory and yield dictionaries. - result = backend.read(raw_output=False, target=absolute_path) + result = backend.read(raw_output=False, target=absolute_path, greedy=greedy) assert isinstance(result, Iterable) assert list(result) == [valid_dictionary] @@ -533,7 +537,7 @@ def test_backends_data_fs_read_without_raw_output(fs_backend, fs, monkeypatch): # Given a relative `target` path, the `read` method should read all files in the # target directory relative to the default directory and yield dictionaries. - result = backend.read(raw_output=False, target="bar") + result = backend.read(raw_output=False, target="bar", greedy=greedy) assert isinstance(result, Iterable) assert list(result) == [valid_dictionary, valid_dictionary, valid_dictionary] diff --git a/tests/backends/data/test_ldp.py b/tests/backends/data/test_ldp.py index ad2c2d1e3..677204674 100644 --- a/tests/backends/data/test_ldp.py +++ b/tests/backends/data/test_ldp.py @@ -564,7 +564,8 @@ def mock_requests_get(url, stream=True, timeout=None): next(backend.read(query="foo")) -def test_backends_data_ldp_read_with_query(ldp_backend, monkeypatch, fs): +@pytest.mark.parametrize("greedy", [False, True]) +def test_backends_data_ldp_read_with_query(greedy, ldp_backend, monkeypatch, fs): """Test the `LDPDataBackend.read` method, given a query argument.""" # Create fake archive to stream. @@ -614,7 +615,9 @@ def mock_ovh_get(url): with requests_mock.Mocker() as request_mocker: request_mocker.get("http://example.com", content=archive) - result = b"".join(backend.read(query="5d5c4c93-04a4-42c5-9860-f51fa4044aa1")) + result = b"".join( + backend.read(query="5d5c4c93-04a4-42c5-9860-f51fa4044aa1", greedy=greedy) + ) assert os.path.exists(settings.HISTORY_FILE) assert backend.history == [ diff --git a/tests/backends/data/test_mongo.py b/tests/backends/data/test_mongo.py index 618b670ca..7173b9f20 100644 --- a/tests/backends/data/test_mongo.py +++ b/tests/backends/data/test_mongo.py @@ -250,7 +250,8 @@ def test_backends_data_mongo_list_with_history(mongo_backend, caplog): backend.close() -def test_backends_data_mongo_read_with_raw_output(mongo, mongo_backend): +@pytest.mark.parametrize("greedy", [False, True]) +def test_backends_data_mongo_read_with_raw_output(greedy, mongo, mongo_backend): """Test the `MongoDataBackend.read` method with `raw_output` set to `True`.""" backend = mongo_backend() @@ -266,14 +267,15 @@ def test_backends_data_mongo_read_with_raw_output(mongo, mongo_backend): ] backend.collection.insert_many(documents) backend.database.foobar.insert_many(documents[:2]) - assert list(backend.read(raw_output=True)) == expected + assert list(backend.read(raw_output=True, greedy=greedy)) == expected assert list(backend.read(raw_output=True, target="foobar")) == expected[:2] assert list(backend.read(raw_output=True, chunk_size=2)) == expected assert list(backend.read(raw_output=True, chunk_size=1000)) == expected backend.close() -def test_backends_data_mongo_read_without_raw_output(mongo, mongo_backend): +@pytest.mark.parametrize("greedy", [False, True]) +def test_backends_data_mongo_read_without_raw_output(greedy, mongo, mongo_backend): """Test the `MongoDataBackend.read` method with `raw_output` set to `False`.""" backend = mongo_backend() @@ -289,7 +291,7 @@ def test_backends_data_mongo_read_without_raw_output(mongo, mongo_backend): ] backend.collection.insert_many(documents) backend.database.foobar.insert_many(documents[:2]) - assert list(backend.read()) == expected + assert list(backend.read(greedy=greedy)) == expected assert list(backend.read(target="foobar")) == expected[:2] assert list(backend.read(chunk_size=2)) == expected assert list(backend.read(chunk_size=1000)) == expected diff --git a/tests/backends/data/test_s3.py b/tests/backends/data/test_s3.py index 8f9062229..c6a8d3ef8 100644 --- a/tests/backends/data/test_s3.py +++ b/tests/backends/data/test_s3.py @@ -226,7 +226,9 @@ def test_backends_data_s3_list_with_failed_connection_should_log_the_error( @mock_s3 +@pytest.mark.parametrize("greedy", [False, True]) def test_backends_data_s3_read_with_valid_name_should_write_to_history( + greedy, s3_backend, monkeypatch, ): @@ -267,6 +269,7 @@ def test_backends_data_s3_read_with_valid_name_should_write_to_history( target=bucket_name, chunk_size=1000, raw_output=True, + greedy=greedy, ) ) @@ -278,12 +281,7 @@ def test_backends_data_s3_read_with_valid_name_should_write_to_history( "timestamp": freezed_now, } in backend.history - list( - backend.read( - query="2022-09-30.gz", - raw_output=False, - ) - ) + list(backend.read(query="2022-09-30.gz", raw_output=False, greedy=greedy)) assert { "backend": "s3", diff --git a/tests/backends/data/test_swift.py b/tests/backends/data/test_swift.py index 5800645ec..eace4efe1 100644 --- a/tests/backends/data/test_swift.py +++ b/tests/backends/data/test_swift.py @@ -278,8 +278,9 @@ def mock_get_container(*args, **kwargs): backend.close() +@pytest.mark.parametrize("greedy", [False, True]) def test_backends_data_swift_read_with_raw_output( - swift_backend, monkeypatch, fs, settings_fs + greedy, swift_backend, monkeypatch, fs, settings_fs ): """Test the `SwiftDataBackend.read` method with `raw_output` set to `True`.""" @@ -300,7 +301,7 @@ def mock_get_object(*args, **kwargs): fs.create_file(settings.HISTORY_FILE, contents=json.dumps([])) # The `read` method should read the object and yield bytes. - result = backend.read(raw_output=True, query="2020-04-29.gz") + result = backend.read(raw_output=True, query="2020-04-29.gz", greedy=greedy) assert isinstance(result, Iterable) assert list(result) == [content] @@ -339,8 +340,9 @@ def mock_get_object(*args, **kwargs): backend.close() +@pytest.mark.parametrize("greedy", [False, True]) def test_backends_data_swift_read_without_raw_output( - swift_backend, monkeypatch, fs, settings_fs + greedy, swift_backend, monkeypatch, fs, settings_fs ): """Test the `SwiftDataBackend.read` method with `raw_output` set to `False`.""" @@ -362,7 +364,7 @@ def mock_get_object(*args, **kwargs): fs.create_file(settings.HISTORY_FILE, contents=json.dumps([])) # The `read` method should read the object and yield bytes. - result = backend.read(raw_output=False, query="2020-04-29.gz") + result = backend.read(raw_output=False, query="2020-04-29.gz", greedy=greedy) assert isinstance(result, Iterable) assert list(result) == [content_dict]