Skip to content

Commit

Permalink
Add configuration option to disable status updates via message bus
Browse files Browse the repository at this point in the history
  • Loading branch information
callumforrester committed Nov 9, 2023
1 parent 7994906 commit a4817be
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 2 deletions.
8 changes: 8 additions & 0 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ class DataWritingConfig(BlueapiBaseModel):
group_name: str = "example"


class WorkerEventConfig(BlueapiBaseModel):
"""
Config for event broadcasting via the message bus
"""
broadcast_status_events: bool = True


class EnvironmentConfig(BlueapiBaseModel):
"""
Config for the RunEngine environment
Expand All @@ -60,6 +67,7 @@ class EnvironmentConfig(BlueapiBaseModel):
Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.stubs"),
]
data_writing: DataWritingConfig = Field(default_factory=DataWritingConfig)
events: WorkerEventConfig = Field(default_factory=WorkerEventConfig)


class LoggingConfig(BlueapiBaseModel):
Expand Down
2 changes: 1 addition & 1 deletion src/blueapi/service/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(

self.context.with_config(self.config.env)

self.worker = worker or RunEngineWorker(self.context)
self.worker = worker or RunEngineWorker(self.context, broadcast_statuses=self.config.env.events.broadcast_status_events,)
self.messaging_template = (
messaging_template
or StompMessagingTemplate.autoconfigured(self.config.stomp)
Expand Down
5 changes: 4 additions & 1 deletion src/blueapi/worker/reworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(
self,
ctx: BlueskyContext,
start_stop_timeout: float = DEFAULT_START_STOP_TIMEOUT,
broadcast_statuses: bool = True,
) -> None:
self._ctx = ctx
self._start_stop_timeout = start_stop_timeout
Expand All @@ -90,6 +91,7 @@ def __init__(
self._stopping = Event()
self._stopped = Event()
self._stopped.set()
self._broadcast_statuses = broadcast_statuses

def clear_task(self, task_id: str) -> str:
task = self._pending_tasks.pop(task_id)
Expand Down Expand Up @@ -197,7 +199,8 @@ def run(self) -> None:
LOGGER.info("Worker starting")
self._ctx.run_engine.state_hook = self._on_state_change
self._ctx.run_engine.subscribe(self._on_document)
self._ctx.run_engine.waiting_hook = self._waiting_hook
if self._broadcast_statuses:
self._ctx.run_engine.waiting_hook = self._waiting_hook

self._stopped.clear()
self._started.set()
Expand Down

0 comments on commit a4817be

Please sign in to comment.