Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Sep 27, 2024
1 parent a1888c7 commit 2560e5f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
10 changes: 9 additions & 1 deletion ydb/_topic_reader/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections import deque
from dataclasses import dataclass, field
import datetime
from typing import Union, Any, List, Dict, Deque, Optional
from typing import Union, Any, List, Dict, Deque, Optional, Tuple

from ydb._grpc.grpcwrapper.ydb_topic import OffsetsRange, Codec
from ydb._topic_reader import topic_reader_asyncio
Expand Down Expand Up @@ -171,3 +171,11 @@ def alive(self) -> bool:

def pop_message(self) -> PublicMessage:
return self.messages.pop(0)

def _extend(self, batch: PublicBatch) -> None:
self.messages.extend(batch.messages)
self._bytes_size += batch._bytes_size

def _pop(self) -> Tuple[List[PublicMessage], bool]:
msgs_left = True if len(self.messages) > 1 else False
return self.messages.pop(0), msgs_left
7 changes: 3 additions & 4 deletions ydb/_topic_reader/topic_reader_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,9 @@ def receive_message_nowait(self):

part_sess_id, batch = self._get_first_batch()

message = batch.messages.pop(0)
message, msgs_left = batch._pop()

if len(batch.messages) == 0:
if not msgs_left:
self._buffer_release_bytes(batch._bytes_size)
else:
# TODO: we should somehow release bytes from single message as well
Expand Down Expand Up @@ -620,8 +620,7 @@ async def _decode_batches_loop(self):
def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
part_sess_id = batch._partition_session.id
if part_sess_id in self._message_batches:
self._message_batches[part_sess_id].messages.extend(batch.messages)
self._message_batches[part_sess_id]._bytes_size += batch._bytes_size
self._message_batches[part_sess_id]._extend(batch)
return

self._message_batches[part_sess_id] = batch
Expand Down

0 comments on commit 2560e5f

Please sign in to comment.