-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to batch messages in topic reader #491
Conversation
7aa3545
to
2e6ad26
Compare
🌋 Here are results of SLO test for Python SDK over Table Service: |
🌋 Here are results of SLO test for Python SDK over Query Service: |
24f684c
to
60a4504
Compare
@@ -264,7 +264,7 @@ class ReaderStream: | |||
|
|||
_state_changed: asyncio.Event | |||
_closed: bool | |||
_message_batches: typing.Deque[datatypes.PublicBatch] | |||
_message_batches: typing.Dict[int, datatypes.PublicBatch] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_message_batches: typing.Dict[int, datatypes.PublicBatch] | |
_message_batches: typing.Dict[int, datatypes.PublicBatch] # keys are partition session ID |
@@ -359,29 +359,38 @@ async def wait_messages(self): | |||
await self._state_changed.wait() | |||
self._state_changed.clear() | |||
|
|||
def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]: | |||
first_id, batch = self._message_batches.popitem(last=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
first_id -> partition_session_id - for understand mean of the number
def _add_batch_to_queue(self, batch: datatypes.PublicBatch): | ||
part_sess_id = batch._partition_session.id | ||
if part_sess_id in self._message_batches: | ||
self._message_batches[part_sess_id].messages.extend(batch.messages) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about implement _push/_pop method for PublicBatch? it will be easer refctor internals in the future.
initial_batches = batch_count() | ||
initial_batch_size = batch_size() if not new_batch else 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about invert condition?
initial_batch_size = batch_size() if not new_batch else 0 | |
initial_batch_size = 0 if new_batch else batch_size() |
Pull request type
Please check the type of change your PR introduces:
What is the current behavior?
Issue Number: N/A
What is the new behavior?
Other information