From e0221c8fa5db3bdd7dbffb27529e1b29fa037897 Mon Sep 17 00:00:00 2001 From: SergioSim Date: Tue, 14 Nov 2023 17:50:40 +0100 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(backends)=20add=20simultaneous=20opti?= =?UTF-8?q?on=20for=20AsyncWritable=20data=20backends?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We try to align the data backend interface with the http backends. The simultaneous option allows the caller to write data concurrently. --- src/ralph/backends/data/async_es.py | 14 +- src/ralph/backends/data/async_mongo.py | 14 +- src/ralph/backends/data/base.py | 39 ++++- src/ralph/backends/http/async_lrs.py | 2 +- tests/backends/data/test_async_es.py | 37 +++- tests/backends/data/test_async_mongo.py | 36 +++- tests/backends/data/test_base.py | 221 +++++++++++++++++++++++- tests/backends/data/test_clickhouse.py | 2 +- tests/backends/data/test_es.py | 6 +- tests/backends/data/test_fs.py | 4 +- tests/backends/data/test_mongo.py | 4 +- tests/backends/data/test_s3.py | 2 +- tests/backends/data/test_swift.py | 2 +- tests/backends/http/test_async_lrs.py | 2 +- 14 files changed, 359 insertions(+), 26 deletions(-) diff --git a/src/ralph/backends/data/async_es.py b/src/ralph/backends/data/async_es.py index 43f34733d..4fa6c995c 100644 --- a/src/ralph/backends/data/async_es.py +++ b/src/ralph/backends/data/async_es.py @@ -217,6 +217,8 @@ async def write( # noqa: PLR0913 chunk_size: Optional[int] = None, ignore_errors: bool = False, operation_type: Optional[BaseOperationType] = None, + simultaneous: bool = False, + max_num_simultaneous: Optional[int] = None, ) -> int: """Write data documents to the target index and return their count. @@ -232,6 +234,10 @@ async def write( # noqa: PLR0913 operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. + simultaneous (bool): If `True`, chunks will be written concurrently. + If `False` (default), chunks will be written sequentially. + max_num_simultaneous (int or None): If simultaneous is `True`, the maximum + number of chunks to write concurrently. If `None` it defaults to 1. Return: int: The number of documents written. @@ -243,7 +249,13 @@ async def write( # noqa: PLR0913 supported. """ return await super().write( - data, target, chunk_size, ignore_errors, operation_type + data, + target, + chunk_size, + ignore_errors, + operation_type, + simultaneous, + max_num_simultaneous, ) async def _write_bytes( # noqa: PLR0913 diff --git a/src/ralph/backends/data/async_mongo.py b/src/ralph/backends/data/async_mongo.py index 810559217..b32883e73 100644 --- a/src/ralph/backends/data/async_mongo.py +++ b/src/ralph/backends/data/async_mongo.py @@ -196,6 +196,8 @@ async def write( # noqa: PLR0913 chunk_size: Optional[int] = None, ignore_errors: bool = False, operation_type: Optional[BaseOperationType] = None, + simultaneous: bool = False, + max_num_simultaneous: Optional[int] = None, ) -> int: """Write data documents to the target collection and return their count. @@ -210,6 +212,10 @@ async def write( # noqa: PLR0913 operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. + simultaneous (bool): If `True`, chunks will be written concurrently. + If `False` (default), chunks will be written sequentially. + max_num_simultaneous (int or None): If simultaneous is `True`, the maximum + number of chunks to write concurrently. If `None` it defaults to 1. Return: int: The number of documents written. @@ -221,7 +227,13 @@ async def write( # noqa: PLR0913 supported. """ return await super().write( - data, target, chunk_size, ignore_errors, operation_type + data, + target, + chunk_size, + ignore_errors, + operation_type, + simultaneous, + max_num_simultaneous, ) async def _write_bytes( # noqa: PLR0913 diff --git a/src/ralph/backends/data/base.py b/src/ralph/backends/data/base.py index 47eacb3b1..9dbedf73d 100644 --- a/src/ralph/backends/data/base.py +++ b/src/ralph/backends/data/base.py @@ -23,6 +23,7 @@ from ralph.conf import BaseSettingsConfig, core_settings from ralph.exceptions import BackendParameterException +from ralph.utils import gather_with_limited_concurrency, iter_by_batch class BaseDataBackendSettings(BaseSettings): @@ -131,7 +132,7 @@ def write( # noqa: PLR0913 operation_type = self.default_operation_type if operation_type in self.unsupported_operation_types: - msg = f"{operation_type.value.capitalize()} operation_type is not allowed." + msg = f"{operation_type.value.capitalize()} operation_type is not allowed" self.logger.error(msg) raise BackendParameterException(msg) @@ -139,7 +140,7 @@ def write( # noqa: PLR0913 try: first_record = next(data) except StopIteration: - self.logger.info("Data Iterator is empty; skipping write to target.") + self.logger.info("Data Iterator is empty; skipping write to target") return 0 data = chain((first_record,), data) @@ -356,6 +357,8 @@ async def write( # noqa: PLR0913 chunk_size: Optional[int] = None, ignore_errors: bool = False, operation_type: Optional[BaseOperationType] = None, + simultaneous: bool = False, + max_num_simultaneous: Optional[int] = None, ) -> int: """Write `data` records to the `target` container and return their count. @@ -371,6 +374,10 @@ async def write( # noqa: PLR0913 operation_type (BaseOperationType or None): The mode of the write operation. If `operation_type` is `None`, the `default_operation_type` is used instead. See `BaseOperationType`. + simultaneous (bool): If `True`, chunks will be written concurrently. + If `False` (default), chunks will be written sequentially. + max_num_simultaneous (int or None): If simultaneous is `True`, the maximum + number of chunks to write concurrently. If `None` it defaults to 1. Return: int: The number of written records. @@ -384,7 +391,7 @@ async def write( # noqa: PLR0913 operation_type = self.default_operation_type if operation_type in self.unsupported_operation_types: - msg = f"{operation_type.value.capitalize()} operation_type is not allowed." + msg = f"{operation_type.value.capitalize()} operation_type is not allowed" self.logger.error(msg) raise BackendParameterException(msg) @@ -392,14 +399,36 @@ async def write( # noqa: PLR0913 try: first_record = next(data) except StopIteration: - self.logger.info("Data Iterator is empty; skipping write to target.") + self.logger.info("Data Iterator is empty; skipping write to target") return 0 data = chain((first_record,), data) chunk_size = chunk_size if chunk_size else self.settings.WRITE_CHUNK_SIZE is_bytes = isinstance(first_record, bytes) writer = self._write_bytes if is_bytes else self._write_dicts - return await writer(data, target, chunk_size, ignore_errors, operation_type) + + max_num_simultaneous = max_num_simultaneous if max_num_simultaneous else 1 + if not simultaneous or max_num_simultaneous == 1: + if max_num_simultaneous != 1: + msg = "max_num_simultaneous is ignored when `simultaneous=False`" + self.logger.warning(msg) + return await writer(data, target, chunk_size, ignore_errors, operation_type) + + if max_num_simultaneous < 1: + msg = "max_num_simultaneous must be a strictly positive integer" + self.logger.error(msg) + raise BackendParameterException(msg) + + count = 0 + batches = iter_by_batch(iter_by_batch(data, chunk_size), max_num_simultaneous) + for batch in batches: + tasks = set() + for chunk in batch: + task = writer(chunk, target, chunk_size, ignore_errors, operation_type) + tasks.add(task) + result = await gather_with_limited_concurrency(max_num_simultaneous, *tasks) + count += sum(result) + return count @abstractmethod async def _write_bytes( # noqa: PLR0913 diff --git a/src/ralph/backends/http/async_lrs.py b/src/ralph/backends/http/async_lrs.py index a92d40fc1..b94e7ad34 100644 --- a/src/ralph/backends/http/async_lrs.py +++ b/src/ralph/backends/http/async_lrs.py @@ -238,7 +238,7 @@ async def write( # noqa: PLR0913 try: first_record = next(data) except StopIteration: - logger.info("Data Iterator is empty; skipping write to target.") + logger.info("Data Iterator is empty; skipping write to target") return 0 if not operation_type: diff --git a/tests/backends/data/test_async_es.py b/tests/backends/data/test_async_es.py index 6c2d3c3f5..35ccee09f 100644 --- a/tests/backends/data/test_async_es.py +++ b/tests/backends/data/test_async_es.py @@ -491,6 +491,37 @@ async def test_backends_data_async_es_read_with_query(es, async_es_backend, capl await backend.close() +@pytest.mark.anyio +async def test_backends_data_async_es_write_with_simultaneous( + es, async_es_backend, caplog +): + """Test the `AsyncESDataBackend.write` method, given `simultaneous` set to `True`, + should insert the target documents concurrently. + """ + backend = async_es_backend() + data = ({"value": str(idx)} for idx in range(5)) + + with caplog.at_level(logging.INFO): + assert ( + await backend.write( + data, chunk_size=3, simultaneous=True, max_num_simultaneous=2 + ) + == 5 + ) + + assert ( + "ralph.backends.data.async_es", + logging.INFO, + "Finished writing 3 documents with success", + ) in caplog.record_tuples + + assert ( + "ralph.backends.data.async_es", + logging.INFO, + "Finished writing 2 documents with success", + ) in caplog.record_tuples + + @pytest.mark.anyio async def test_backends_data_async_es_write_with_create_operation( es, async_es_backend, caplog @@ -510,7 +541,7 @@ async def test_backends_data_async_es_write_with_create_operation( assert ( "ralph.backends.data.async_es", logging.INFO, - "Data Iterator is empty; skipping write to target.", + "Data Iterator is empty; skipping write to target", ) in caplog.record_tuples # Given an iterator with multiple documents, the write method should write the @@ -626,7 +657,7 @@ async def test_backends_data_async_es_write_with_append_operation( should raise a `BackendParameterException`. """ backend = async_es_backend() - msg = "Append operation_type is not allowed." + msg = "Append operation_type is not allowed" with pytest.raises(BackendParameterException, match=msg): with caplog.at_level(logging.ERROR): await backend.write(data=[{}], operation_type=BaseOperationType.APPEND) @@ -634,7 +665,7 @@ async def test_backends_data_async_es_write_with_append_operation( assert ( "ralph.backends.data.async_es", logging.ERROR, - "Append operation_type is not allowed.", + "Append operation_type is not allowed", ) in caplog.record_tuples await backend.close() diff --git a/tests/backends/data/test_async_mongo.py b/tests/backends/data/test_async_mongo.py index 79f4438bb..d5011f8de 100644 --- a/tests/backends/data/test_async_mongo.py +++ b/tests/backends/data/test_async_mongo.py @@ -591,6 +591,38 @@ async def test_backends_data_async_mongo_read_with_query( ] == expected +@pytest.mark.anyio +async def test_backends_data_async_mongo_write_with_simultaneous( + mongo, async_mongo_backend, caplog +): + """Test the `AsyncMongoDataBackend.write` method, given `simultaneous` set to + `True`, should insert the target documents concurrently. + """ + backend = async_mongo_backend() + timestamp = {"timestamp": "2022-06-27T15:36:50"} + documents = [{"id": str(i), **timestamp} for i in range(5)] + + with caplog.at_level(logging.INFO): + assert ( + await backend.write( + documents, chunk_size=3, simultaneous=True, max_num_simultaneous=2 + ) + == 5 + ) + + assert ( + "ralph.backends.data.async_mongo", + logging.INFO, + "Inserted 3 documents with success", + ) in caplog.record_tuples + + assert ( + "ralph.backends.data.async_mongo", + logging.INFO, + "Inserted 2 documents with success", + ) in caplog.record_tuples + + @pytest.mark.anyio async def test_backends_data_async_mongo_write_with_target( mongo, @@ -899,7 +931,7 @@ async def test_backends_data_async_mongo_write_with_append_operation( `operation_type`, should raise a `BackendParameterException`. """ backend = async_mongo_backend() - msg = "Append operation_type is not allowed." + msg = "Append operation_type is not allowed" with pytest.raises(BackendParameterException, match=msg): with caplog.at_level(logging.ERROR): await backend.write(data=[], operation_type=BaseOperationType.APPEND) @@ -1008,7 +1040,7 @@ async def test_backends_data_async_mongo_write_with_no_data( with caplog.at_level(logging.INFO): assert await backend.write(data=[]) == 0 - msg = "Data Iterator is empty; skipping write to target." + msg = "Data Iterator is empty; skipping write to target" assert ( "ralph.backends.data.async_mongo", logging.INFO, diff --git a/tests/backends/data/test_base.py b/tests/backends/data/test_base.py index fd55e702a..aa918cdee 100644 --- a/tests/backends/data/test_base.py +++ b/tests/backends/data/test_base.py @@ -6,10 +6,13 @@ import pytest from ralph.backends.data.base import ( + AsyncWritable, BaseAsyncDataBackend, BaseDataBackend, BaseDataBackendSettings, + BaseOperationType, BaseQuery, + Writable, get_backend_generic_argument, ) from ralph.exceptions import BackendParameterException @@ -171,10 +174,10 @@ async def _read_bytes(self, *args): for _ in range(3): yield b"" - def status(self): + async def status(self): pass - def close(self): + async def close(self): pass backend = MockAsyncBaseDataBackend() @@ -197,3 +200,217 @@ def close(self): assert not [_ async for _ in backend.read(max_statements=0)] assert not [_ async for _ in backend.read(max_statements=0, raw_output=True)] + + +@pytest.mark.parametrize( + "chunk_size,max_num_simultaneous,expected_item_count,expected_write_calls", + [ + # Given a chunk size equal to the size of the data, only one write call should + # be performed, regardless of how many simultaneous requests are allowed. + (4, None, {4}, 1), + (4, 1, {4}, 1), + (4, 20, {4}, 1), + # Given a chunk size equal to half (or a bit more) of the data, two write + # calls should be performed. + (2, 2, {2}, 2), + (2, 20, {2}, 2), + (3, 2, {1, 3}, 2), + (3, 20, {1, 3}, 2), + # However, given a limit of one simultaneous request, only one write call is + # allowed. + (2, 1, {4}, 1), + (3, 1, {4}, 1), + # Given a chunk size equal to one, up to four simultaneous write calls can be + # performed. + (1, 1, {4}, 1), + (1, 2, {1}, 4), + (1, 20, {1}, 4), + ], +) +@pytest.mark.anyio +async def test_backends_data_base_async_write_with_simultaneous( + chunk_size, max_num_simultaneous, expected_item_count, expected_write_calls +): + """Test the async `AsyncWritable.write` method with `simultaneous` parameter.""" + + write_calls = {"count": 0} + expected_data = {0, 1, 2, 3} + + class MockAsyncBaseDataBackend( + BaseAsyncDataBackend[BaseDataBackendSettings, BaseQuery], AsyncWritable + ): + """A class mocking the base database class.""" + + async def _read_dicts(self, *args): + pass + + async def _read_bytes(self, *args): + pass + + async def _write_bytes(self, data, *args): + pass + + async def _write_dicts(self, data, *args): + write_calls["count"] += 1 + item_count = 0 + for item in data: + expected_data.remove(item) + item_count += 1 + + assert item_count in expected_item_count + return item_count + + async def status(self): + pass + + async def close(self): + pass + + backend = MockAsyncBaseDataBackend() + assert ( + await backend.write( + (i for i in range(4)), + chunk_size=chunk_size, + simultaneous=True, + max_num_simultaneous=max_num_simultaneous, + ) + ) == 4 + # All data should be consumed. + assert not expected_data + assert write_calls["count"] == expected_write_calls + + +@pytest.mark.anyio +async def test_backends_data_base_write_with_invalid_parameters(caplog): + """Test the Writable backend `write` method, given invalid parameters.""" + + class MockBaseDataBackend( + BaseDataBackend[BaseDataBackendSettings, BaseQuery], Writable + ): + """A class mocking the base database class.""" + + unsupported_operation_types = {BaseOperationType.DELETE} + + def _read_dicts(self, *args): + pass + + def _read_bytes(self, *args): + pass + + def _write_bytes(self, *args): + pass + + def _write_dicts(self, *args): + return 1 + + def status(self): + pass + + def close(self): + pass + + backend = MockBaseDataBackend() + # Given an unsupported `operation_type`, the write method should raise a + # `BackendParameterException` and log an error. + msg = "Delete operation_type is not allowed" + with pytest.raises(BackendParameterException, match=msg): + with caplog.at_level(logging.ERROR): + assert backend.write([{}], operation_type=BaseOperationType.DELETE) + + assert ( + "tests.backends.data.test_base", + logging.ERROR, + msg, + ) in caplog.record_tuples + + # Given an empty data iterator, the write method should log an info and return 0. + msg = "Data Iterator is empty; skipping write to target" + with caplog.at_level(logging.INFO): + assert backend.write([]) == 0 + + assert ( + "tests.backends.data.test_base", + logging.INFO, + msg, + ) in caplog.record_tuples + + +@pytest.mark.anyio +async def test_backends_data_base_async_write_with_invalid_parameters(caplog): + """Test the AsyncWritable backend `write` method, given invalid parameters.""" + + class MockAsyncBaseDataBackend( + BaseAsyncDataBackend[BaseDataBackendSettings, BaseQuery], AsyncWritable + ): + """A class mocking the base database class.""" + + unsupported_operation_types = {BaseOperationType.DELETE} + + async def _read_dicts(self, *args): + pass + + async def _read_bytes(self, *args): + pass + + async def _write_bytes(self, *args): + pass + + async def _write_dicts(self, *args): + return 1 + + async def status(self): + pass + + async def close(self): + pass + + backend = MockAsyncBaseDataBackend() + + # Given `max_num_simultation` is set to a negative value and `simultaneous` is True, + # the write method should raise a `BackendParameterException` and log an error. + msg = "max_num_simultaneous must be a strictly positive integer" + with pytest.raises(BackendParameterException, match=msg): + with caplog.at_level(logging.ERROR): + assert await backend.write([{}], simultaneous=True, max_num_simultaneous=-1) + + assert ( + "tests.backends.data.test_base", + logging.ERROR, + msg, + ) in caplog.record_tuples + + # Given `max_num_simultation` is set and `simultaneous` is False, the write method + # should log a warning. + msg = "max_num_simultaneous is ignored when `simultaneous=False`" + with caplog.at_level(logging.WARNING): + assert await backend.write([{}], max_num_simultaneous=-1) == 1 + + assert ( + "tests.backends.data.test_base", + logging.WARNING, + msg, + ) in caplog.record_tuples + + # Given an unsupported `operation_type`, the write method should raise a + # `BackendParameterException` and log an error. + msg = "Delete operation_type is not allowed" + with pytest.raises(BackendParameterException, match=msg): + with caplog.at_level(logging.ERROR): + assert await backend.write([{}], operation_type=BaseOperationType.DELETE) + + assert ( + "tests.backends.data.test_base", + logging.ERROR, + msg, + ) in caplog.record_tuples + + # Given an empty data iterator, the write method should log an info and return 0. + msg = "Data Iterator is empty; skipping write to target" + with caplog.at_level(logging.INFO): + assert await backend.write([]) == 0 + + assert ( + "tests.backends.data.test_base", + logging.INFO, + msg, + ) in caplog.record_tuples diff --git a/tests/backends/data/test_clickhouse.py b/tests/backends/data/test_clickhouse.py index 950f37416..a3f6f93ca 100644 --- a/tests/backends/data/test_clickhouse.py +++ b/tests/backends/data/test_clickhouse.py @@ -631,7 +631,7 @@ def test_backends_data_clickhouse_write_wrong_operation_type( ] backend = clickhouse_backend() - msg = "Append operation_type is not allowed." + msg = "Append operation_type is not allowed" with pytest.raises(BackendParameterException, match=msg): backend.write(data=statements, operation_type=BaseOperationType.APPEND) backend.close() diff --git a/tests/backends/data/test_es.py b/tests/backends/data/test_es.py index 31a427c4c..8552abf5c 100644 --- a/tests/backends/data/test_es.py +++ b/tests/backends/data/test_es.py @@ -463,7 +463,7 @@ def test_backends_data_es_write_with_create_operation(es, es_backend, caplog): assert ( "ralph.backends.data.es", logging.INFO, - "Data Iterator is empty; skipping write to target.", + "Data Iterator is empty; skipping write to target", ) in caplog.record_tuples # Given an iterator with multiple documents, the write method should write the @@ -569,7 +569,7 @@ def test_backends_data_es_write_with_append_operation(es_backend, caplog): should raise a `BackendParameterException`. """ backend = es_backend() - msg = "Append operation_type is not allowed." + msg = "Append operation_type is not allowed" with pytest.raises(BackendParameterException, match=msg): with caplog.at_level(logging.ERROR): backend.write(data=[{}], operation_type=BaseOperationType.APPEND) @@ -577,7 +577,7 @@ def test_backends_data_es_write_with_append_operation(es_backend, caplog): assert ( "ralph.backends.data.es", logging.ERROR, - "Append operation_type is not allowed.", + "Append operation_type is not allowed", ) in caplog.record_tuples backend.close() diff --git a/tests/backends/data/test_fs.py b/tests/backends/data/test_fs.py index e235122c5..23223288e 100644 --- a/tests/backends/data/test_fs.py +++ b/tests/backends/data/test_fs.py @@ -787,7 +787,7 @@ def test_backends_data_fs_write_with_delete_operation( backend = fs_backend() - msg = "Delete operation_type is not allowed." + msg = "Delete operation_type is not allowed" with pytest.raises(BackendParameterException, match=msg): backend.write(data=[b"foo"], operation_type=BaseOperationType.DELETE) @@ -958,7 +958,7 @@ def test_backends_data_fs_write_with_no_data(fs_backend, caplog): with caplog.at_level(logging.INFO): assert backend.write(data=[]) == 0 - msg = "Data Iterator is empty; skipping write to target." + msg = "Data Iterator is empty; skipping write to target" assert ("ralph.backends.data.fs", logging.INFO, msg) in caplog.record_tuples diff --git a/tests/backends/data/test_mongo.py b/tests/backends/data/test_mongo.py index 3cdae3b3c..618b670ca 100644 --- a/tests/backends/data/test_mongo.py +++ b/tests/backends/data/test_mongo.py @@ -753,7 +753,7 @@ def test_backends_data_mongo_write_with_append_operation(mongo_backend, caplog): should raise a `BackendParameterException`. """ backend = mongo_backend() - msg = "Append operation_type is not allowed." + msg = "Append operation_type is not allowed" with caplog.at_level(logging.ERROR): with pytest.raises(BackendParameterException, match=msg): backend.write(data=[], operation_type=BaseOperationType.APPEND) @@ -845,7 +845,7 @@ def test_backends_data_mongo_write_with_no_data(mongo_backend, caplog): with caplog.at_level(logging.INFO): assert backend.write(data=[]) == 0 - msg = "Data Iterator is empty; skipping write to target." + msg = "Data Iterator is empty; skipping write to target" assert ("ralph.backends.data.mongo", logging.INFO, msg) in caplog.record_tuples backend.close() diff --git a/tests/backends/data/test_s3.py b/tests/backends/data/test_s3.py index 992c3e75d..8f9062229 100644 --- a/tests/backends/data/test_s3.py +++ b/tests/backends/data/test_s3.py @@ -518,7 +518,7 @@ def test_backends_data_s3_write_with_append_or_delete_operation( backend = s3_backend() with pytest.raises( BackendParameterException, - match=f"{operation_type.value.capitalize()} operation_type is not allowed.", + match=f"{operation_type.value.capitalize()} operation_type is not allowed", ): backend.write(data=[b"foo"], operation_type=operation_type) backend.close() diff --git a/tests/backends/data/test_swift.py b/tests/backends/data/test_swift.py index d93edb22f..5800645ec 100644 --- a/tests/backends/data/test_swift.py +++ b/tests/backends/data/test_swift.py @@ -612,7 +612,7 @@ def test_backends_data_swift_write_with_invalid_operation( backend = swift_backend() - msg = f"{operation_type.value.capitalize()} operation_type is not allowed." + msg = f"{operation_type.value.capitalize()} operation_type is not allowed" with pytest.raises(BackendParameterException, match=msg): backend.write(data=[b"foo"], operation_type=operation_type) diff --git a/tests/backends/http/test_async_lrs.py b/tests/backends/http/test_async_lrs.py index 096036b70..9c6b6df2f 100644 --- a/tests/backends/http/test_async_lrs.py +++ b/tests/backends/http/test_async_lrs.py @@ -713,7 +713,7 @@ async def test_backends_http_async_lrs_write_without_data(caplog): assert ( "ralph.backends.http.async_lrs", logging.INFO, - "Data Iterator is empty; skipping write to target.", + "Data Iterator is empty; skipping write to target", ) in caplog.record_tuples assert result == 0