From 2560e5fdf71f55a93ef2b6123e245abaafdc8377 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Fri, 27 Sep 2024 11:59:01 +0300 Subject: [PATCH] review fixes --- ydb/_topic_reader/datatypes.py | 10 +++++++++- ydb/_topic_reader/topic_reader_asyncio.py | 7 +++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/ydb/_topic_reader/datatypes.py b/ydb/_topic_reader/datatypes.py index 28155ea7..0f15ff85 100644 --- a/ydb/_topic_reader/datatypes.py +++ b/ydb/_topic_reader/datatypes.py @@ -7,7 +7,7 @@ from collections import deque from dataclasses import dataclass, field import datetime -from typing import Union, Any, List, Dict, Deque, Optional +from typing import Union, Any, List, Dict, Deque, Optional, Tuple from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange, Codec from ydb._topic_reader import topic_reader_asyncio @@ -171,3 +171,11 @@ def alive(self) -> bool: def pop_message(self) -> PublicMessage: return self.messages.pop(0) + + def _extend(self, batch: PublicBatch) -> None: + self.messages.extend(batch.messages) + self._bytes_size += batch._bytes_size + + def _pop(self) -> Tuple[List[PublicMessage], bool]: + msgs_left = True if len(self.messages) > 1 else False + return self.messages.pop(0), msgs_left diff --git a/ydb/_topic_reader/topic_reader_asyncio.py b/ydb/_topic_reader/topic_reader_asyncio.py index 62751c35..92cd78c2 100644 --- a/ydb/_topic_reader/topic_reader_asyncio.py +++ b/ydb/_topic_reader/topic_reader_asyncio.py @@ -384,9 +384,9 @@ def receive_message_nowait(self): part_sess_id, batch = self._get_first_batch() - message = batch.messages.pop(0) + message, msgs_left = batch._pop() - if len(batch.messages) == 0: + if not msgs_left: self._buffer_release_bytes(batch._bytes_size) else: # TODO: we should somehow release bytes from single message as well @@ -620,8 +620,7 @@ async def _decode_batches_loop(self): def _add_batch_to_queue(self, batch: datatypes.PublicBatch): part_sess_id = batch._partition_session.id if part_sess_id in self._message_batches: - self._message_batches[part_sess_id].messages.extend(batch.messages) - self._message_batches[part_sess_id]._bytes_size += batch._bytes_size + self._message_batches[part_sess_id]._extend(batch) return self._message_batches[part_sess_id] = batch