Skip to content

Commit

Permalink
Merge branch 'main' into allow_passing_kwargs_to_dodal_module
Browse files Browse the repository at this point in the history
  • Loading branch information
DominicOram authored Oct 10, 2023
2 parents 080f6e2 + 5aa646d commit 22e0a1c
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions src/blueapi/cli/amq.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import threading
from typing import Callable, Optional
from typing import Callable, Optional, Union

from bluesky.callbacks.best_effort import BestEffortCallback

from blueapi.core import DataEvent
from blueapi.messaging import MessageContext, MessagingTemplate
from blueapi.worker import WorkerEvent
from blueapi.worker import ProgressEvent, WorkerEvent

from .updates import CliEventRenderer


class BlueskyRemoteError(Exception):
Expand Down Expand Up @@ -33,12 +38,22 @@ def subscribe_to_topics(
) -> None:
"""Run callbacks on events/progress events with a given correlation id."""

def on_event_wrapper(ctx: MessageContext, event: WorkerEvent) -> None:
if (on_event is not None) and (ctx.correlation_id == correlation_id):
on_event(event)
progress_bar = CliEventRenderer(correlation_id)
callback = BestEffortCallback()

def on_event_wrapper(
ctx: MessageContext, event: Union[WorkerEvent, ProgressEvent, DataEvent]
) -> None:
if isinstance(event, WorkerEvent):
if (on_event is not None) and (ctx.correlation_id == correlation_id):
on_event(event)

if (event.is_complete()) and (ctx.correlation_id == correlation_id):
self.complete.set()
if (event.is_complete()) and (ctx.correlation_id == correlation_id):
self.complete.set()
elif isinstance(event, ProgressEvent):
progress_bar.on_progress_event(event)
elif isinstance(event, DataEvent):
callback(event.name, event.doc)

self.app.subscribe(
self.app.destinations.topic("public.worker.event"),
Expand Down

0 comments on commit 22e0a1c

Please sign in to comment.