Skip to content

Commit

Permalink
refactor cutting new batch
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Sep 27, 2024
1 parent 56e3700 commit ffdd843
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
16 changes: 16 additions & 0 deletions ydb/_topic_reader/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,3 +179,19 @@ def _extend(self, batch: PublicBatch) -> None:
def _pop(self) -> Tuple[List[PublicMessage], bool]:
msgs_left = True if len(self.messages) > 1 else False
return self.messages.pop(0), msgs_left

def _pop_batch(self, size: int) -> PublicBatch:
initial_length = len(self.messages)
one_message_size = self._bytes_size // initial_length

new_batch = PublicBatch(
messages=self.messages[:size],
_partition_session=self._partition_session,
_bytes_size=one_message_size * size,
_codec=self._codec,
)

self.messages = self.messages[size:]
self._bytes_size = one_message_size * (initial_length - size)

return new_batch
4 changes: 2 additions & 2 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,9 +421,9 @@ def receive_batch_nowait(self, max_messages: Optional[int] = None):
self._buffer_release_bytes(batch._bytes_size)
return batch

cutted_batch, remaining_batch = self._cut_batch_by_max_messages(batch, max_messages)
cutted_batch = batch._pop_batch(size=max_messages)

self._message_batches[part_sess_id] = remaining_batch
self._message_batches[part_sess_id] = batch
self._buffer_release_bytes(cutted_batch._bytes_size)

return cutted_batch
Expand Down

0 comments on commit ffdd843

Please sign in to comment.