From a4817be5241354d0a476af83fc51fcefbccb3d03 Mon Sep 17 00:00:00 2001 From: Callum Forrester Date: Thu, 9 Nov 2023 19:03:40 +0000 Subject: [PATCH] Add configuration option to disable status updates via message bus --- src/blueapi/config.py | 8 ++++++++ src/blueapi/service/handler.py | 2 +- src/blueapi/worker/reworker.py | 5 ++++- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 02560ea7f..25f8c72d0 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -46,6 +46,13 @@ class DataWritingConfig(BlueapiBaseModel): group_name: str = "example" +class WorkerEventConfig(BlueapiBaseModel): + """ + Config for event broadcasting via the message bus + """ + broadcast_status_events: bool = True + + class EnvironmentConfig(BlueapiBaseModel): """ Config for the RunEngine environment @@ -60,6 +67,7 @@ class EnvironmentConfig(BlueapiBaseModel): Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.stubs"), ] data_writing: DataWritingConfig = Field(default_factory=DataWritingConfig) + events: WorkerEventConfig = Field(default_factory=WorkerEventConfig) class LoggingConfig(BlueapiBaseModel): diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index 4a006a960..88bf1597c 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -37,7 +37,7 @@ def __init__( self.context.with_config(self.config.env) - self.worker = worker or RunEngineWorker(self.context) + self.worker = worker or RunEngineWorker(self.context, broadcast_statuses=self.config.env.events.broadcast_status_events,) self.messaging_template = ( messaging_template or StompMessagingTemplate.autoconfigured(self.config.stomp) diff --git a/src/blueapi/worker/reworker.py b/src/blueapi/worker/reworker.py index 7444abd79..0ff3a6ec2 100644 --- a/src/blueapi/worker/reworker.py +++ b/src/blueapi/worker/reworker.py @@ -69,6 +69,7 @@ def __init__( self, ctx: BlueskyContext, start_stop_timeout: float = DEFAULT_START_STOP_TIMEOUT, + broadcast_statuses: bool = True, ) -> None: self._ctx = ctx self._start_stop_timeout = start_stop_timeout @@ -90,6 +91,7 @@ def __init__( self._stopping = Event() self._stopped = Event() self._stopped.set() + self._broadcast_statuses = broadcast_statuses def clear_task(self, task_id: str) -> str: task = self._pending_tasks.pop(task_id) @@ -197,7 +199,8 @@ def run(self) -> None: LOGGER.info("Worker starting") self._ctx.run_engine.state_hook = self._on_state_change self._ctx.run_engine.subscribe(self._on_document) - self._ctx.run_engine.waiting_hook = self._waiting_hook + if self._broadcast_statuses: + self._ctx.run_engine.waiting_hook = self._waiting_hook self._stopped.clear() self._started.set()