Skip to content

Commit

Permalink
fixup! use concurrency instead of simultaneous/max_num_simultaneous
Browse files Browse the repository at this point in the history
  • Loading branch information
SergioSim committed Nov 29, 2023
1 parent e0221c8 commit c9c908e
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 123 deletions.
17 changes: 4 additions & 13 deletions src/ralph/backends/data/async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ async def write( # noqa: PLR0913
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
simultaneous: bool = False,
max_num_simultaneous: Optional[int] = None,
concurrency: Optional[int] = None,
) -> int:
"""Write data documents to the target index and return their count.
Expand All @@ -234,10 +233,8 @@ async def write( # noqa: PLR0913
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
simultaneous (bool): If `True`, chunks will be written concurrently.
If `False` (default), chunks will be written sequentially.
max_num_simultaneous (int or None): If simultaneous is `True`, the maximum
number of chunks to write concurrently. If `None` it defaults to 1.
concurrency (int): The number of chunks to write concurrently.
If `None` it defaults to `1`.
Return:
int: The number of documents written.
Expand All @@ -249,13 +246,7 @@ async def write( # noqa: PLR0913
supported.
"""
return await super().write(
data,
target,
chunk_size,
ignore_errors,
operation_type,
simultaneous,
max_num_simultaneous,
data, target, chunk_size, ignore_errors, operation_type, concurrency
)

async def _write_bytes( # noqa: PLR0913
Expand Down
17 changes: 4 additions & 13 deletions src/ralph/backends/data/async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ async def write( # noqa: PLR0913
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
simultaneous: bool = False,
max_num_simultaneous: Optional[int] = None,
concurrency: Optional[int] = None,
) -> int:
"""Write data documents to the target collection and return their count.
Expand All @@ -212,10 +211,8 @@ async def write( # noqa: PLR0913
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
simultaneous (bool): If `True`, chunks will be written concurrently.
If `False` (default), chunks will be written sequentially.
max_num_simultaneous (int or None): If simultaneous is `True`, the maximum
number of chunks to write concurrently. If `None` it defaults to 1.
concurrency (int): The number of chunks to write concurrently.
If `None` it defaults to `1`.
Return:
int: The number of documents written.
Expand All @@ -227,13 +224,7 @@ async def write( # noqa: PLR0913
supported.
"""
return await super().write(
data,
target,
chunk_size,
ignore_errors,
operation_type,
simultaneous,
max_num_simultaneous,
data, target, chunk_size, ignore_errors, operation_type, concurrency
)

async def _write_bytes( # noqa: PLR0913
Expand Down
25 changes: 9 additions & 16 deletions src/ralph/backends/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,7 @@ async def write( # noqa: PLR0913
chunk_size: Optional[int] = None,
ignore_errors: bool = False,
operation_type: Optional[BaseOperationType] = None,
simultaneous: bool = False,
max_num_simultaneous: Optional[int] = None,
concurrency: Optional[int] = None,
) -> int:
"""Write `data` records to the `target` container and return their count.
Expand All @@ -374,10 +373,8 @@ async def write( # noqa: PLR0913
operation_type (BaseOperationType or None): The mode of the write operation.
If `operation_type` is `None`, the `default_operation_type` is used
instead. See `BaseOperationType`.
simultaneous (bool): If `True`, chunks will be written concurrently.
If `False` (default), chunks will be written sequentially.
max_num_simultaneous (int or None): If simultaneous is `True`, the maximum
number of chunks to write concurrently. If `None` it defaults to 1.
concurrency (int): The number of chunks to write concurrently.
If `None` it defaults to `1`.
Return:
int: The number of written records.
Expand Down Expand Up @@ -407,26 +404,22 @@ async def write( # noqa: PLR0913
is_bytes = isinstance(first_record, bytes)
writer = self._write_bytes if is_bytes else self._write_dicts

max_num_simultaneous = max_num_simultaneous if max_num_simultaneous else 1
if not simultaneous or max_num_simultaneous == 1:
if max_num_simultaneous != 1:
msg = "max_num_simultaneous is ignored when `simultaneous=False`"
self.logger.warning(msg)
concurrency = concurrency if concurrency else 1
if concurrency == 1:
return await writer(data, target, chunk_size, ignore_errors, operation_type)

if max_num_simultaneous < 1:
msg = "max_num_simultaneous must be a strictly positive integer"
if concurrency < 1:
msg = "concurrency must be a strictly positive integer"
self.logger.error(msg)
raise BackendParameterException(msg)

count = 0
batches = iter_by_batch(iter_by_batch(data, chunk_size), max_num_simultaneous)
for batch in batches:
for batch in iter_by_batch(iter_by_batch(data, chunk_size), concurrency):
tasks = set()
for chunk in batch:
task = writer(chunk, target, chunk_size, ignore_errors, operation_type)
tasks.add(task)
result = await gather_with_limited_concurrency(max_num_simultaneous, *tasks)
result = await gather_with_limited_concurrency(concurrency, *tasks)
count += sum(result)
return count

Expand Down
39 changes: 14 additions & 25 deletions tests/backends/data/test_async_es.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
ESDataBackendSettings,
ESQuery,
)
from ralph.backends.data.base import BaseOperationType, DataBackendStatus
from ralph.backends.data.base import AsyncWritable, BaseOperationType, DataBackendStatus
from ralph.backends.data.es import ESClientOptions
from ralph.exceptions import BackendException, BackendParameterException
from ralph.utils import now
Expand Down Expand Up @@ -492,34 +492,23 @@ async def test_backends_data_async_es_read_with_query(es, async_es_backend, capl


@pytest.mark.anyio
async def test_backends_data_async_es_write_with_simultaneous(
es, async_es_backend, caplog
async def test_backends_data_async_es_write_with_concurrency(
async_es_backend, monkeypatch
):
"""Test the `AsyncESDataBackend.write` method, given `simultaneous` set to `True`,
should insert the target documents concurrently.
"""Test the `AsyncESDataBackend.write` method, given `concurrency` set,
should pass the `concurrency` value to `AsyncWritable.write`.
"""
backend = async_es_backend()
data = ({"value": str(idx)} for idx in range(5))

with caplog.at_level(logging.INFO):
assert (
await backend.write(
data, chunk_size=3, simultaneous=True, max_num_simultaneous=2
)
== 5
)

assert (
"ralph.backends.data.async_es",
logging.INFO,
"Finished writing 3 documents with success",
) in caplog.record_tuples
async def mock_write( # noqa: PLR0913
self, data, target, chunk_size, ignore_errors, operation_type, concurrency
):
"""Mock the AsyncWritable `write` method."""
assert concurrency == 4
return 3

assert (
"ralph.backends.data.async_es",
logging.INFO,
"Finished writing 2 documents with success",
) in caplog.record_tuples
backend = async_es_backend()
monkeypatch.setattr(AsyncWritable, "write", mock_write)
assert await backend.write([b"bar"], concurrency=4) == 3


@pytest.mark.anyio
Expand Down
40 changes: 14 additions & 26 deletions tests/backends/data/test_async_mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
MongoDataBackendSettings,
MongoQuery,
)
from ralph.backends.data.base import BaseOperationType, DataBackendStatus
from ralph.backends.data.base import AsyncWritable, BaseOperationType, DataBackendStatus
from ralph.backends.data.mongo import MongoClientOptions
from ralph.exceptions import BackendException, BackendParameterException

Expand Down Expand Up @@ -592,35 +592,23 @@ async def test_backends_data_async_mongo_read_with_query(


@pytest.mark.anyio
async def test_backends_data_async_mongo_write_with_simultaneous(
mongo, async_mongo_backend, caplog
async def test_backends_data_async_mongo_write_with_concurrency(
async_es_backend, monkeypatch
):
"""Test the `AsyncMongoDataBackend.write` method, given `simultaneous` set to
`True`, should insert the target documents concurrently.
"""Test the `AsyncMongoDataBackend.write` method, given `concurrency` set,
should pass the `concurrency` value to `AsyncWritable.write`.
"""
backend = async_mongo_backend()
timestamp = {"timestamp": "2022-06-27T15:36:50"}
documents = [{"id": str(i), **timestamp} for i in range(5)]

with caplog.at_level(logging.INFO):
assert (
await backend.write(
documents, chunk_size=3, simultaneous=True, max_num_simultaneous=2
)
== 5
)
async def mock_write( # noqa: PLR0913
self, data, target, chunk_size, ignore_errors, operation_type, concurrency
):
"""Mock the AsyncWritable `write` method."""
assert concurrency == 4
return 3

assert (
"ralph.backends.data.async_mongo",
logging.INFO,
"Inserted 3 documents with success",
) in caplog.record_tuples

assert (
"ralph.backends.data.async_mongo",
logging.INFO,
"Inserted 2 documents with success",
) in caplog.record_tuples
backend = async_es_backend()
monkeypatch.setattr(AsyncWritable, "write", mock_write)
assert await backend.write([b"bar"], concurrency=4) == 3


@pytest.mark.anyio
Expand Down
66 changes: 36 additions & 30 deletions tests/backends/data/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
get_backend_generic_argument,
)
from ralph.exceptions import BackendParameterException
from ralph.utils import gather_with_limited_concurrency


@pytest.mark.parametrize(
Expand Down Expand Up @@ -203,10 +204,10 @@ async def close(self):


@pytest.mark.parametrize(
"chunk_size,max_num_simultaneous,expected_item_count,expected_write_calls",
"chunk_size,concurrency,expected_item_count,expected_write_calls",
[
# Given a chunk size equal to the size of the data, only one write call should
# be performed, regardless of how many simultaneous requests are allowed.
# be performed, regardless of how many concurrent requests are allowed.
(4, None, {4}, 1),
(4, 1, {4}, 1),
(4, 20, {4}, 1),
Expand All @@ -216,25 +217,28 @@ async def close(self):
(2, 20, {2}, 2),
(3, 2, {1, 3}, 2),
(3, 20, {1, 3}, 2),
# However, given a limit of one simultaneous request, only one write call is
# However, given a limit of one concurrent request, only one write call is
# allowed.
(2, 1, {4}, 1),
(3, 1, {4}, 1),
# Given a chunk size equal to one, up to four simultaneous write calls can be
# Given a chunk size equal to one, up to four concurrent write calls can be
# performed.
(1, 1, {4}, 1),
(1, 2, {1}, 4),
(1, 20, {1}, 4),
],
)
@pytest.mark.anyio
async def test_backends_data_base_async_write_with_simultaneous(
chunk_size, max_num_simultaneous, expected_item_count, expected_write_calls
async def test_backends_data_base_async_write_with_concurrency(
chunk_size, concurrency, expected_item_count, expected_write_calls, monkeypatch
):
"""Test the async `AsyncWritable.write` method with `simultaneous` parameter."""
"""Test the async `AsyncWritable.write` method with `concurrency` argument."""

write_calls = {"count": 0}
gather_calls = {"count": 0}
data = (i for i in range(4))
expected_data = {0, 1, 2, 3}
expected_concurrency = concurrency if concurrency else 1

class MockAsyncBaseDataBackend(
BaseAsyncDataBackend[BaseDataBackendSettings, BaseQuery], AsyncWritable
Expand Down Expand Up @@ -266,19 +270,33 @@ async def status(self):
async def close(self):
pass

async def mock_gather_with_limited_concurrency(num_tasks, *tasks):
"""Mock the gather_with_limited_concurrency method."""
assert len(tasks) <= expected_concurrency
assert num_tasks == expected_concurrency
gather_calls["count"] += 1
return await gather_with_limited_concurrency(num_tasks, *tasks)

backend = MockAsyncBaseDataBackend()
monkeypatch.setattr(
"ralph.backends.data.base.gather_with_limited_concurrency",
mock_gather_with_limited_concurrency,
)

assert (
await backend.write(
(i for i in range(4)),
chunk_size=chunk_size,
simultaneous=True,
max_num_simultaneous=max_num_simultaneous,
)
) == 4
await backend.write(data, chunk_size=chunk_size, concurrency=concurrency) == 4
)
# All data should be consumed.
assert not expected_data
assert write_calls["count"] == expected_write_calls

if expected_concurrency == 1:
assert not gather_calls["count"]
else:
assert gather_calls["count"] == max(
1, int(4 / chunk_size / expected_concurrency)
)


@pytest.mark.anyio
async def test_backends_data_base_write_with_invalid_parameters(caplog):
Expand Down Expand Up @@ -366,31 +384,19 @@ async def close(self):

backend = MockAsyncBaseDataBackend()

# Given `max_num_simultation` is set to a negative value and `simultaneous` is True,
# the write method should raise a `BackendParameterException` and log an error.
msg = "max_num_simultaneous must be a strictly positive integer"
# Given `concurrency` is set to a negative value, the write method should raise a
# `BackendParameterException` and produce an error log.
msg = "concurrency must be a strictly positive integer"
with pytest.raises(BackendParameterException, match=msg):
with caplog.at_level(logging.ERROR):
assert await backend.write([{}], simultaneous=True, max_num_simultaneous=-1)
assert await backend.write([{}], concurrency=-1)

assert (
"tests.backends.data.test_base",
logging.ERROR,
msg,
) in caplog.record_tuples

# Given `max_num_simultation` is set and `simultaneous` is False, the write method
# should log a warning.
msg = "max_num_simultaneous is ignored when `simultaneous=False`"
with caplog.at_level(logging.WARNING):
assert await backend.write([{}], max_num_simultaneous=-1) == 1

assert (
"tests.backends.data.test_base",
logging.WARNING,
msg,
) in caplog.record_tuples

# Given an unsupported `operation_type`, the write method should raise a
# `BackendParameterException` and log an error.
msg = "Delete operation_type is not allowed"
Expand Down

0 comments on commit c9c908e

Please sign in to comment.