-
Notifications
You must be signed in to change notification settings - Fork 178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(robot-server, api): Wire up protocol engine event bubbling to robot server #14766
Conversation
…notification client Instead of having the robot-server notification publishers poll the engine constantly for state updates, let's have protocol engine notify the notification client that a state update occurred. Publishers can then diff the engine store for state changes of interest and emit publish events as necessary.
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## edge #14766 +/- ##
==========================================
- Coverage 67.19% 67.19% -0.01%
==========================================
Files 2495 2495
Lines 71518 71516 -2
Branches 9020 9020
==========================================
- Hits 48054 48052 -2
Misses 21342 21342
Partials 2122 2122
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, very nice!
@pytest.mark.parametrize("_test_repetition", range(10)) | ||
async def test_multiple_subscribers(_test_repetition: int) -> None: | ||
"""Test that multiple subscribers can wait for a notification. | ||
|
||
This test checks that the subscribers are awoken in the order they | ||
subscribed. This may or may not be guarenteed according to the | ||
implementations of both ChangeNotifier and the event loop. | ||
This test functions as a canary, given that our code may relies | ||
on this ordering for determinism. | ||
|
||
This test runs multiple times to check for flakyness. | ||
""" | ||
subject = ChangeNotifier() | ||
results = [] | ||
|
||
async def _do_task_1() -> None: | ||
await subject.wait() | ||
results.append(1) | ||
|
||
async def _do_task_2() -> None: | ||
await subject.wait() | ||
results.append(2) | ||
|
||
async def _do_task_3() -> None: | ||
await subject.wait() | ||
results.append(3) | ||
|
||
task_1 = asyncio.create_task(_do_task_1()) | ||
task_2 = asyncio.create_task(_do_task_2()) | ||
task_3 = asyncio.create_task(_do_task_3()) | ||
|
||
asyncio.get_running_loop().call_soon(subject.notify) | ||
await asyncio.gather(task_1, task_2, task_3) | ||
|
||
assert results == [1, 2, 3] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good idea. I like it
…o robot server (#14766) Closes EXEC-358 Wire up PE event bubbling to the robot server for notifications as an alternative to the current polling that occurs. There are no functional changes. PublisherNotifier is the new interface that handles event management for publishers, using a generic ChangeNotifier that is given to PE as a callback. When PE reports a change in state, the callback fires. PublisherNotifier then iterates through each callback, invoking them. In the future, each publisher that requires access to PE state updates (eg, RunsPublisher) will add relevant callbacks during their initialization via register_publish_callbacks. Each callback will contain the conditional logic required for an MQTT publish to occur.
Closes EXEC-358
Overview
This PR serves to wire up PE event bubbling to the robot server for notifications as an alternative to the current polling that occurs. There are no functional changes.
PublisherNotifier
is the new interface that handles event management for publishers, using a genericChangeNotifier
that is given to PE as a callback. When PE reports a change in state, the callback fires.PublisherNotifier
then iterates through each callback, invoking them.In the future, each publisher that requires access to PE state updates (eg,
RunsPublisher
) will add relevant callbacks during their initialization viaregister_publish_callbacks
. Each callback will contain the conditional logic required for an MQTT publish to occur.Test Plan
Risk assessment
low