Skip to content

Commit

Permalink
Implement max_messages on recieve_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Sep 25, 2024
1 parent 60a4504 commit e0f4e4b
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 8 deletions.
46 changes: 39 additions & 7 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ async def wait_message(self):

async def receive_batch(
self,
max_messages: typing.Union[int, None] = None,
) -> typing.Union[datatypes.PublicBatch, None]:
"""
Get one messages batch from reader.
Expand All @@ -105,7 +106,9 @@ async def receive_batch(
use asyncio.wait_for for wait with timeout.
"""
await self._reconnector.wait_message()
return self._reconnector.receive_batch_nowait()
return self._reconnector.receive_batch_nowait(
max_messages=max_messages,
)

async def receive_message(self) -> typing.Optional[datatypes.PublicMessage]:
"""
Expand Down Expand Up @@ -212,8 +215,10 @@ async def wait_message(self):
await self._state_changed.wait()
self._state_changed.clear()

def receive_batch_nowait(self):
return self._stream_reader.receive_batch_nowait()
def receive_batch_nowait(self, max_messages: Optional[int] = None):
return self._stream_reader.receive_batch_nowait(
max_messages=max_messages,
)

def receive_message_nowait(self):
return self._stream_reader.receive_message_nowait()
Expand Down Expand Up @@ -363,17 +368,44 @@ def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]:
first_id, batch = self._message_batches.popitem(last=False)
return first_id, batch

def receive_batch_nowait(self):
def _cut_batch_by_max_messages(
batch: datatypes.PublicBatch,
max_messages: int,
) -> typing.Tuple[datatypes.PublicBatch, datatypes.PublicBatch]:
initial_length = len(batch.messages)
one_message_size = batch._bytes_size // initial_length

new_batch = datatypes.PublicBatch(
messages=batch.messages[:max_messages],
_partition_session=batch._partition_session,
_bytes_size=one_message_size*max_messages,
_codec=batch._codec,
)

batch.messages = batch.messages[max_messages:]
batch._bytes_size = one_message_size * (initial_length - max_messages)

return new_batch, batch

def receive_batch_nowait(self, max_messages: Optional[int] = None):
if self._get_first_error():
raise self._get_first_error()

if not self._message_batches:
return None

_, batch = self._get_first_batch()
self._buffer_release_bytes(batch._bytes_size)
part_sess_id, batch = self._get_first_batch()

if max_messages is None or len(batch.messages) <= max_messages:
self._buffer_release_bytes(batch._bytes_size)
return batch

cutted_batch, remaining_batch = self._cut_batch_by_max_messages(batch, max_messages)

self._message_batches[part_sess_id] = remaining_batch
self._buffer_release_bytes(cutted_batch._bytes_size)

return batch
return cutted_batch

def receive_message_nowait(self):
if self._get_first_error():
Expand Down
4 changes: 3 additions & 1 deletion ydb/_topic_reader/topic_reader_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ def receive_batch(
self._check_closed()

return self._caller.safe_call_with_result(
self._async_reader.receive_batch(),
self._async_reader.receive_batch(
max_messages=max_messages,
),
timeout,
)

Expand Down

0 comments on commit e0f4e4b

Please sign in to comment.