Skip to content

Commit

Permalink
✨(backends) add simultaneous option for AsyncWritable data backends
Browse files Browse the repository at this point in the history
We try to align the data backend interface with the http backends.
The simultaneous option allows the caller to write data concurrently.
  • Loading branch information
SergioSim committed Nov 14, 2023
1 parent 38949c7 commit 27ef4ff
Show file tree
Hide file tree
Showing 14 changed files with 359 additions and 26 deletions.
14 changes: 13 additions & 1 deletion src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ async def write( # noqa: PLR0913
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
simultaneous: bool = False,
max_num_simultaneous: Optional[int] = None,
) -> int:
"""Write data documents to the target index and return their count.
Expand All @@ -234,6 +236,10 @@ async def write( # noqa: PLR0913
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
simultaneous (bool): If `True`, chunks will be written concurrently.
If `False` (default), chunks will be written sequentially.
max_num_simultaneous (int or None): If simultaneous is `True`, the maximum
number of chunks to write concurrently. If `None` it defaults to 1.
Return:
int: The number of documents written.
Expand All @@ -245,7 +251,13 @@ async def write( # noqa: PLR0913
supported.
"""
return await super().write(
data, target, chunk_size, ignore_errors, operation_type
data,
target,
chunk_size,
ignore_errors,
operation_type,
simultaneous,
max_num_simultaneous,
)

async def _write_bytes( # noqa: PLR0913
Expand Down
14 changes: 13 additions & 1 deletion src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ async def write( # noqa: PLR0913
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
simultaneous: bool = False,
max_num_simultaneous: Optional[int] = None,
) -> int:
"""Write data documents to the target collection and return their count.
Expand All @@ -203,6 +205,10 @@ async def write( # noqa: PLR0913
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
simultaneous (bool): If `True`, chunks will be written concurrently.
If `False` (default), chunks will be written sequentially.
max_num_simultaneous (int or None): If simultaneous is `True`, the maximum
number of chunks to write concurrently. If `None` it defaults to 1.
Return:
int: The number of documents written.
Expand All @@ -214,7 +220,13 @@ async def write( # noqa: PLR0913
supported.
"""
return await super().write(
data, target, chunk_size, ignore_errors, operation_type
data,
target,
chunk_size,
ignore_errors,
operation_type,
simultaneous,
max_num_simultaneous,
)

async def _write_bytes( # noqa: PLR0913
Expand Down
39 changes: 34 additions & 5 deletions src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

from ralph.conf import BaseSettingsConfig, core_settings
from ralph.exceptions import BackendParameterException
from ralph.utils import gather_with_limited_concurrency, iter_by_batch


class BaseDataBackendSettings(BaseSettings):
Expand Down Expand Up @@ -132,15 +133,15 @@ def write( # noqa: PLR0913
operation_type = self.default_operation_type

if operation_type in self.unsupported_operation_types:
msg = f"{operation_type.value.capitalize()} operation_type is not allowed."
msg = f"{operation_type.value.capitalize()} operation_type is not allowed"
self.logger.error(msg)
raise BackendParameterException(msg)

data = iter(data)
try:
first_record = next(data)
except StopIteration:
self.logger.info("Data Iterator is empty; skipping write to target.")
self.logger.info("Data Iterator is empty; skipping write to target")
return 0
data = chain((first_record,), data)

Expand Down Expand Up @@ -356,6 +357,8 @@ async def write( # noqa: PLR0913
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
simultaneous: bool = False,
max_num_simultaneous: Optional[int] = None,
) -> int:
"""Write `data` records to the `target` container and return their count.
Expand All @@ -372,6 +375,10 @@ async def write( # noqa: PLR0913
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
simultaneous (bool): If `True`, chunks will be written concurrently.
If `False` (default), chunks will be written sequentially.
max_num_simultaneous (int or None): If simultaneous is `True`, the maximum
number of chunks to write concurrently. If `None` it defaults to 1.
Return:
int: The number of written records.
Expand All @@ -385,22 +392,44 @@ async def write( # noqa: PLR0913
operation_type = self.default_operation_type

if operation_type in self.unsupported_operation_types:
msg = f"{operation_type.value.capitalize()} operation_type is not allowed."
msg = f"{operation_type.value.capitalize()} operation_type is not allowed"
self.logger.error(msg)
raise BackendParameterException(msg)

data = iter(data)
try:
first_record = next(data)
except StopIteration:
self.logger.info("Data Iterator is empty; skipping write to target.")
self.logger.info("Data Iterator is empty; skipping write to target")
return 0
data = chain((first_record,), data)

chunk_size = chunk_size if chunk_size else self.settings.WRITE_CHUNK_SIZE
is_bytes = isinstance(first_record, bytes)
writer = self._write_bytes if is_bytes else self._write_dicts
return await writer(data, target, chunk_size, ignore_errors, operation_type)

max_num_simultaneous = max_num_simultaneous if max_num_simultaneous else 1
if not simultaneous or max_num_simultaneous == 1:
if max_num_simultaneous != 1:
msg = "max_num_simultaneous is ignored when `simultaneous=False`"
self.logger.warning(msg)
return await writer(data, target, chunk_size, ignore_errors, operation_type)

if max_num_simultaneous < 1:
msg = "max_num_simultaneous must be a strictly positive integer"
self.logger.error(msg)
raise BackendParameterException(msg)

count = 0
batches = iter_by_batch(iter_by_batch(data, chunk_size), max_num_simultaneous)
for batch in batches:
tasks = set()
for chunk in batch:
task = writer(chunk, target, chunk_size, ignore_errors, operation_type)
tasks.add(task)
result = await gather_with_limited_concurrency(max_num_simultaneous, *tasks)
count += sum(result)
return count

@abstractmethod
async def _write_bytes( # noqa: PLR0913
Expand Down
2 changes: 1 addition & 1 deletion src/ralph/backends/http/async_lrs.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ async def write( # noqa: PLR0913
try:
first_record = next(data)
except StopIteration:
logger.info("Data Iterator is empty; skipping write to target.")
logger.info("Data Iterator is empty; skipping write to target")
return 0

if not operation_type:
Expand Down
37 changes: 34 additions & 3 deletions tests/backends/data/test_async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,37 @@ async def test_backends_data_async_es_read_with_query(es, async_es_backend, capl
await backend.close()


@pytest.mark.anyio
async def test_backends_data_async_es_write_with_simultaneous(
es, async_es_backend, caplog
):
"""Test the `AsyncESDataBackend.write` method, given `simultaneous` set to `True`,
should insert the target documents concurrently.
"""
backend = async_es_backend()
data = ({"value": str(idx)} for idx in range(5))

with caplog.at_level(logging.INFO):
assert (
await backend.write(
data, chunk_size=3, simultaneous=True, max_num_simultaneous=2
)
== 5
)

assert (
"ralph.backends.data.async_es",
logging.INFO,
"Finished writing 3 documents with success",
) in caplog.record_tuples

assert (
"ralph.backends.data.async_es",
logging.INFO,
"Finished writing 2 documents with success",
) in caplog.record_tuples


@pytest.mark.anyio
async def test_backends_data_async_es_write_with_create_operation(
es, async_es_backend, caplog
Expand All @@ -494,7 +525,7 @@ async def test_backends_data_async_es_write_with_create_operation(
assert (
"ralph.backends.data.async_es",
logging.INFO,
"Data Iterator is empty; skipping write to target.",
"Data Iterator is empty; skipping write to target",
) in caplog.record_tuples

# Given an iterator with multiple documents, the write method should write the
Expand Down Expand Up @@ -610,15 +641,15 @@ async def test_backends_data_async_es_write_with_append_operation(
should raise a `BackendParameterException`.
"""
backend = async_es_backend()
msg = "Append operation_type is not allowed."
msg = "Append operation_type is not allowed"
with pytest.raises(BackendParameterException, match=msg):
with caplog.at_level(logging.ERROR):
await backend.write(data=[{}], operation_type=BaseOperationType.APPEND)

assert (
"ralph.backends.data.async_es",
logging.ERROR,
"Append operation_type is not allowed.",
"Append operation_type is not allowed",
) in caplog.record_tuples

await backend.close()
Expand Down
36 changes: 34 additions & 2 deletions tests/backends/data/test_async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,38 @@ async def test_backends_data_async_mongo_read_with_query(
] == expected


@pytest.mark.anyio
async def test_backends_data_async_mongo_write_with_simultaneous(
mongo, async_mongo_backend, caplog
):
"""Test the `AsyncMongoDataBackend.write` method, given `simultaneous` set to
`True`, should insert the target documents concurrently.
"""
backend = async_mongo_backend()
timestamp = {"timestamp": "2022-06-27T15:36:50"}
documents = [{"id": str(i), **timestamp} for i in range(5)]

with caplog.at_level(logging.INFO):
assert (
await backend.write(
documents, chunk_size=3, simultaneous=True, max_num_simultaneous=2
)
== 5
)

assert (
"ralph.backends.data.async_mongo",
logging.INFO,
"Inserted 3 documents with success",
) in caplog.record_tuples

assert (
"ralph.backends.data.async_mongo",
logging.INFO,
"Inserted 2 documents with success",
) in caplog.record_tuples


@pytest.mark.anyio
async def test_backends_data_async_mongo_write_with_target(
mongo,
Expand Down Expand Up @@ -864,7 +896,7 @@ async def test_backends_data_async_mongo_write_with_append_operation(
`operation_type`, should raise a `BackendParameterException`.
"""
backend = async_mongo_backend()
msg = "Append operation_type is not allowed."
msg = "Append operation_type is not allowed"
with pytest.raises(BackendParameterException, match=msg):
with caplog.at_level(logging.ERROR):
await backend.write(data=[], operation_type=BaseOperationType.APPEND)
Expand Down Expand Up @@ -973,7 +1005,7 @@ async def test_backends_data_async_mongo_write_with_no_data(
with caplog.at_level(logging.INFO):
assert await backend.write(data=[]) == 0

msg = "Data Iterator is empty; skipping write to target."
msg = "Data Iterator is empty; skipping write to target"
assert (
"ralph.backends.data.async_mongo",
logging.INFO,
Expand Down
Loading

0 comments on commit 27ef4ff

Please sign in to comment.