Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to batch messages in topic reader #491

Merged
merged 5 commits into from
Sep 27, 2024
Merged

Conversation

vgvoleg
Copy link
Collaborator

@vgvoleg vgvoleg commented Sep 24, 2024

Pull request type

Please check the type of change your PR introduces:

  • Bugfix
  • Feature
  • Code style update (formatting, renaming)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • Documentation content changes
  • Other (please describe):

What is the current behavior?

Issue Number: N/A

What is the new behavior?

Other information

Copy link

github-actions bot commented Sep 24, 2024

🌋 Here are results of SLO test for Python SDK over Table Service:

Grafana Dashboard

SLO-sync-python-table

Copy link

github-actions bot commented Sep 24, 2024

🌋 Here are results of SLO test for Python SDK over Query Service:

Grafana Dashboard

SLO-sync-python-query

@@ -264,7 +264,7 @@ class ReaderStream:

_state_changed: asyncio.Event
_closed: bool
_message_batches: typing.Deque[datatypes.PublicBatch]
_message_batches: typing.Dict[int, datatypes.PublicBatch]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_message_batches: typing.Dict[int, datatypes.PublicBatch]
_message_batches: typing.Dict[int, datatypes.PublicBatch] # keys are partition session ID

@@ -359,29 +359,38 @@ async def wait_messages(self):
await self._state_changed.wait()
self._state_changed.clear()

def _get_first_batch(self) -> typing.Tuple[int, datatypes.PublicBatch]:
first_id, batch = self._message_batches.popitem(last=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first_id -> partition_session_id - for understand mean of the number

def _add_batch_to_queue(self, batch: datatypes.PublicBatch):
part_sess_id = batch._partition_session.id
if part_sess_id in self._message_batches:
self._message_batches[part_sess_id].messages.extend(batch.messages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about implement _push/_pop method for PublicBatch? it will be easer refctor internals in the future.

initial_batches = batch_count()
initial_batch_size = batch_size() if not new_batch else 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about invert condition?

Suggested change
initial_batch_size = batch_size() if not new_batch else 0
initial_batch_size = 0 if new_batch else batch_size()

@vgvoleg vgvoleg merged commit 4877cc8 into main Sep 27, 2024
11 checks passed
@vgvoleg vgvoleg deleted the topic_batch_messages branch September 27, 2024 10:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants