diff --git a/helm/blueapi/templates/deployment.yaml b/helm/blueapi/templates/deployment.yaml index 7032e816a..92063d3b0 100644 --- a/helm/blueapi/templates/deployment.yaml +++ b/helm/blueapi/templates/deployment.yaml @@ -70,6 +70,21 @@ spec: env: - name: SCRATCH_AREA value: {{ .Values.scratch.containerPath }} + {{- if .Values.listener.enabled -}} + - name: {{ .Chart.Name }}-document-listener + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + resources: + {{- toYaml .Values.listener.resources | nindent 12 }} + args: + - "-c" + - "/config/config.yaml" + {{- with .Values.existingSecret }} + - "-c" + - "/config/secret.yaml" + {{- end }} + - "listen" + {{- end }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/helm/blueapi/values.yaml b/helm/blueapi/values.yaml index a449e3c3f..4e7bbb259 100644 --- a/helm/blueapi/values.yaml +++ b/helm/blueapi/values.yaml @@ -70,6 +70,10 @@ affinity: {} hostNetwork: false # May be needed for talking to arcane protocols such as EPICS +listener: + enabled: true + resources: {} + scratch: hostPath: "" # example: /usr/local/blueapi-software-scratch containerPath: /blueapi-plugins/scratch diff --git a/src/blueapi/cli/amq.py b/src/blueapi/cli/amq.py index abd85678d..b9b6c0b3c 100644 --- a/src/blueapi/cli/amq.py +++ b/src/blueapi/cli/amq.py @@ -15,6 +15,9 @@ def __init__(self, message: str) -> None: super().__init__(message) +_Event = Union[WorkerEvent, ProgressEvent, DataEvent] + + class AmqClient: app: MessagingTemplate complete: threading.Event @@ -42,7 +45,8 @@ def subscribe_to_topics( callback = BestEffortCallback() def on_event_wrapper( - ctx: MessageContext, event: Union[WorkerEvent, ProgressEvent, DataEvent] + ctx: MessageContext, + event: _Event, ) -> None: if isinstance(event, WorkerEvent): if (on_event is not None) and (ctx.correlation_id == correlation_id): @@ -55,9 +59,15 @@ def on_event_wrapper( elif isinstance(event, DataEvent): callback(event.name, event.doc) + self.subscribe_to_all_events(on_event_wrapper) + + def subscribe_to_all_events( + self, + on_event: Callable[[MessageContext, _Event], None], + ) -> None: self.app.subscribe( self.app.destinations.topic("public.worker.event"), - on_event_wrapper, + on_event, ) def wait_for_complete(self, timeout: Optional[float] = None) -> None: diff --git a/src/blueapi/cli/cli.py b/src/blueapi/cli/cli.py index 3c90effb8..3e2e458c9 100644 --- a/src/blueapi/cli/cli.py +++ b/src/blueapi/cli/cli.py @@ -12,6 +12,8 @@ from blueapi import __version__ from blueapi.cli.amq import AmqClient from blueapi.config import ApplicationConfig, ConfigLoader +from blueapi.core import DataEvent +from blueapi.messaging import MessageContext from blueapi.messaging.stomptemplate import StompMessagingTemplate from blueapi.service.main import start from blueapi.service.model import WorkerTask @@ -21,7 +23,7 @@ print_schema_as_yaml, write_schema_as_yaml, ) -from blueapi.worker import RunPlan, WorkerEvent, WorkerState +from blueapi.worker import ProgressEvent, RunPlan, WorkerEvent, WorkerState from .rest import BlueapiRestClient @@ -127,6 +129,30 @@ def get_devices(obj: dict) -> None: pprint(client.get_devices().dict()) +@controller.command(name="listen") +@check_connection +@click.pass_obj +def listen_to_events(obj: dict) -> None: + """Listen to events output by blueapi""" + config: ApplicationConfig = obj["config"] + amq_client = AmqClient(StompMessagingTemplate.autoconfigured(config.stomp)) + + def on_event( + context: MessageContext, + event: Union[WorkerEvent, ProgressEvent, DataEvent], + ) -> None: + converted = json.dumps(event.dict(), indent=2) + print(converted) + + print( + "Subscribing to all bluesky events from " + f"{config.stomp.host}:{config.stomp.port}" + ) + with amq_client: + amq_client.subscribe_to_all_events(on_event) + input("Press enter to exit") + + @controller.command(name="run") @click.argument("name", type=str) @click.argument("parameters", type=str, required=False)