From 390aa773cb2e2a2c6fa8fb718f08d3ceb91a9568 Mon Sep 17 00:00:00 2001 From: DiamondJoseph <53935796+DiamondJoseph@users.noreply.github.com> Date: Mon, 29 Apr 2024 15:59:24 +0100 Subject: [PATCH] Remove DirectoryProvider and pre-processor handling to Dodal (#376) --- pyproject.toml | 2 +- src/blueapi/config.py | 7 - src/blueapi/core/context.py | 14 +- src/blueapi/data_management/__init__.py | 0 .../visit_directory_provider.py | 127 ------ src/blueapi/preprocessors/attach_metadata.py | 41 -- src/blueapi/service/handler.py | 40 -- src/blueapi/worker/task.py | 4 +- .../test_visit_directory_provider.py | 66 --- tests/preprocessors/__init__.py | 0 tests/preprocessors/test_attach_metadata.py | 399 ------------------ 11 files changed, 3 insertions(+), 697 deletions(-) delete mode 100644 src/blueapi/data_management/__init__.py delete mode 100644 src/blueapi/data_management/visit_directory_provider.py delete mode 100644 src/blueapi/preprocessors/attach_metadata.py delete mode 100644 tests/data_management/test_visit_directory_provider.py delete mode 100644 tests/preprocessors/__init__.py delete mode 100644 tests/preprocessors/test_attach_metadata.py diff --git a/pyproject.toml b/pyproject.toml index cc0f65aeb..393a0f459 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ dependencies = [ "uvicorn", "requests", "dls-bluesky-core", #requires ophyd-async - "dls-dodal<1.21", + "dls-dodal", ] dynamic = ["version"] license.file = "LICENSE" diff --git a/src/blueapi/config.py b/src/blueapi/config.py index 235a0ab74..b66dbcf37 100644 --- a/src/blueapi/config.py +++ b/src/blueapi/config.py @@ -51,12 +51,6 @@ class StompConfig(BaseModel): auth: BasicAuthentication | None = None -class DataWritingConfig(BlueapiBaseModel): - visit_service_url: str | None = None # e.g. "http://localhost:8088/api" - visit_directory: Path = Path("/tmp/0-0") - group_name: str = "example" - - class WorkerEventConfig(BlueapiBaseModel): """ Config for event broadcasting via the message bus @@ -78,7 +72,6 @@ class EnvironmentConfig(BlueapiBaseModel): Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.plans"), Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.stubs"), ] - data_writing: DataWritingConfig = Field(default_factory=DataWritingConfig) events: WorkerEventConfig = Field(default_factory=WorkerEventConfig) diff --git a/src/blueapi/core/context.py b/src/blueapi/core/context.py index a6d5ba2ff..98493a913 100644 --- a/src/blueapi/core/context.py +++ b/src/blueapi/core/context.py @@ -1,6 +1,5 @@ -import functools import logging -from collections.abc import Callable, Sequence +from collections.abc import Callable from dataclasses import dataclass, field from importlib import import_module from inspect import Parameter, signature @@ -22,10 +21,8 @@ BLUESKY_PROTOCOLS, Device, HasName, - MsgGenerator, Plan, PlanGenerator, - PlanWrapper, is_bluesky_compatible_device, is_bluesky_plan_generator, ) @@ -45,7 +42,6 @@ class BlueskyContext: run_engine: RunEngine = field( default_factory=lambda: RunEngine(context_managers=[]) ) - plan_wrappers: Sequence[PlanWrapper] = field(default_factory=list) plans: dict[str, Plan] = field(default_factory=dict) devices: dict[str, Device] = field(default_factory=dict) plan_functions: dict[str, PlanGenerator] = field(default_factory=dict) @@ -53,14 +49,6 @@ class BlueskyContext: _reference_cache: dict[type, type] = field(default_factory=dict) - def wrap(self, plan: MsgGenerator) -> MsgGenerator: - wrapped_plan = functools.reduce( - lambda wrapped, next_wrapper: next_wrapper(wrapped), - self.plan_wrappers, - plan, - ) - yield from wrapped_plan - def find_device(self, addr: str | list[str]) -> Device | None: """ Find a device in this context, allows for recursive search. diff --git a/src/blueapi/data_management/__init__.py b/src/blueapi/data_management/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/blueapi/data_management/visit_directory_provider.py b/src/blueapi/data_management/visit_directory_provider.py deleted file mode 100644 index bc05040b7..000000000 --- a/src/blueapi/data_management/visit_directory_provider.py +++ /dev/null @@ -1,127 +0,0 @@ -import logging -from abc import ABC, abstractmethod -from pathlib import Path - -from aiohttp import ClientSession -from ophyd_async.core import DirectoryInfo, DirectoryProvider -from pydantic import BaseModel - - -class DataCollectionIdentifier(BaseModel): - collectionNumber: int - - -class VisitServiceClientBase(ABC): - """ - Object responsible for I/O in determining collection number - """ - - @abstractmethod - async def create_new_collection(self) -> DataCollectionIdentifier: - """Create new collection""" - - @abstractmethod - async def get_current_collection(self) -> DataCollectionIdentifier: - """Get current collection""" - - -class VisitServiceClient(VisitServiceClientBase): - _url: str - - def __init__(self, url: str) -> None: - self._url = url - - async def create_new_collection(self) -> DataCollectionIdentifier: - async with ClientSession() as session: - async with session.post(f"{self._url}/numtracker") as response: - if response.status == 200: - json = await response.json() - return DataCollectionIdentifier.parse_obj(json) - else: - raise Exception(response.status) - - async def get_current_collection(self) -> DataCollectionIdentifier: - async with ClientSession() as session: - async with session.get(f"{self._url}/numtracker") as response: - if response.status == 200: - json = await response.json() - return DataCollectionIdentifier.parse_obj(json) - else: - raise Exception(response.status) - - -class LocalVisitServiceClient(VisitServiceClientBase): - _count: int - - def __init__(self) -> None: - self._count = 0 - - async def create_new_collection(self) -> DataCollectionIdentifier: - self._count += 1 - return DataCollectionIdentifier(collectionNumber=self._count) - - async def get_current_collection(self) -> DataCollectionIdentifier: - return DataCollectionIdentifier(collectionNumber=self._count) - - -class VisitDirectoryProvider(DirectoryProvider): - """ - Gets information from a remote service to construct the path that detectors - should write to, and determine how their files should be named. - """ - - _data_group_name: str - _data_directory: Path - - _client: VisitServiceClientBase - _current_collection: DirectoryInfo | None - _session: ClientSession | None - - def __init__( - self, - data_group_name: str, - data_directory: Path, - client: VisitServiceClientBase, - ): - self._data_group_name = data_group_name - self._data_directory = data_directory - self._client = client - - self._current_collection = None - self._session = None - - async def update(self) -> None: - """ - Calls the visit service to create a new data collection in the current visit. - """ - # TODO: After visit service is more feature complete: - # TODO: Allow selecting visit as part of the request to BlueAPI - # TODO: Consume visit information from BlueAPI and pass down to this class - # TODO: Query visit service to get information about visit and data collection - # TODO: Use AuthN information as part of verification with visit service - - try: - collection_id_info = await self._client.create_new_collection() - self._current_collection = self._generate_directory_info(collection_id_info) - except Exception as ex: - # TODO: The catch all is needed because the RunEngine will not - # currently handle it, see - # https://github.com/bluesky/bluesky/pull/1623 - self._current_collection = None - logging.exception(ex) - - def _generate_directory_info( - self, - collection_id_info: DataCollectionIdentifier, - ) -> DirectoryInfo: - collection_id = collection_id_info.collectionNumber - file_prefix = f"{self._data_group_name}-{collection_id}" - return DirectoryInfo(str(self._data_directory), file_prefix) - - def __call__(self) -> DirectoryInfo: - if self._current_collection is not None: - return self._current_collection - else: - raise ValueError( - "No current collection, update() needs to be called at least once" - ) diff --git a/src/blueapi/preprocessors/attach_metadata.py b/src/blueapi/preprocessors/attach_metadata.py deleted file mode 100644 index 21d9ed8b4..000000000 --- a/src/blueapi/preprocessors/attach_metadata.py +++ /dev/null @@ -1,41 +0,0 @@ -import bluesky.plan_stubs as bps -import bluesky.preprocessors as bpp -from bluesky.utils import make_decorator - -from blueapi.core import MsgGenerator -from blueapi.data_management.visit_directory_provider import VisitDirectoryProvider - -DATA_SESSION = "data_session" -DATA_GROUPS = "data_groups" - - -def attach_metadata( - plan: MsgGenerator, - provider: VisitDirectoryProvider, -) -> MsgGenerator: - """ - Attach data session metadata to the runs within a plan and make it correlate - with an ophyd-async DirectoryProvider. - - This updates the directory provider (which in turn makes a call to to a service - to figure out which scan number we are using for such a scan), and ensures the - start document contains the correct data session. - - Args: - plan: The plan to preprocess - provider: The directory provider that participating detectors are aware of. - - Returns: - MsgGenerator: A plan - - Yields: - Iterator[Msg]: Plan messages - """ - yield from bps.wait_for([provider.update]) - directory_info = provider() - yield from bpp.inject_md_wrapper( - plan, md={DATA_SESSION: directory_info.filename_prefix} - ) - - -attach_metadata_decorator = make_decorator(attach_metadata) diff --git a/src/blueapi/service/handler.py b/src/blueapi/service/handler.py index 0280c5d3d..afa9818c1 100644 --- a/src/blueapi/service/handler.py +++ b/src/blueapi/service/handler.py @@ -5,15 +5,8 @@ from blueapi.config import ApplicationConfig from blueapi.core import BlueskyContext from blueapi.core.event import EventStream -from blueapi.data_management.visit_directory_provider import ( - LocalVisitServiceClient, - VisitDirectoryProvider, - VisitServiceClient, - VisitServiceClientBase, -) from blueapi.messaging import StompMessagingTemplate from blueapi.messaging.base import MessagingTemplate -from blueapi.preprocessors.attach_metadata import attach_metadata from blueapi.service.handler_base import BlueskyHandler from blueapi.service.model import DeviceModel, PlanModel, WorkerTask from blueapi.worker.event import WorkerState @@ -159,42 +152,9 @@ def setup_handler( ) -> None: global HANDLER - provider = None - plan_wrappers = [] - if config: - visit_service_client: VisitServiceClientBase - if config.env.data_writing.visit_service_url is not None: - visit_service_client = VisitServiceClient( - config.env.data_writing.visit_service_url - ) - else: - visit_service_client = LocalVisitServiceClient() - - provider = VisitDirectoryProvider( - data_group_name=config.env.data_writing.group_name, - data_directory=config.env.data_writing.visit_directory, - client=visit_service_client, - ) - - # Make all dodal devices created by the context use provider if they can - try: - from dodal.parameters.gda_directory_provider import ( - set_directory_provider_singleton, - ) - - set_directory_provider_singleton(provider) - except ImportError: - logging.error( - "Unable to set directory provider for ophyd-async devices, " - "a newer version of dodal is required" - ) - - plan_wrappers.append(lambda plan: attach_metadata(plan, provider)) - handler = Handler( config, context=BlueskyContext( - plan_wrappers=plan_wrappers, sim=False, ), ) diff --git a/src/blueapi/worker/task.py b/src/blueapi/worker/task.py index f080be875..1e48cb4d8 100644 --- a/src/blueapi/worker/task.py +++ b/src/blueapi/worker/task.py @@ -28,9 +28,7 @@ def do_task(self, ctx: BlueskyContext) -> None: func = ctx.plan_functions[self.name] prepared_params = self.prepare_params(ctx) - plan_generator = func(**prepared_params.dict()) - wrapped_plan_generator = ctx.wrap(plan_generator) - ctx.run_engine(wrapped_plan_generator) + ctx.run_engine(func(**prepared_params.dict())) def _lookup_params(ctx: BlueskyContext, task: Task) -> BaseModel: diff --git a/tests/data_management/test_visit_directory_provider.py b/tests/data_management/test_visit_directory_provider.py deleted file mode 100644 index 57d93d0ef..000000000 --- a/tests/data_management/test_visit_directory_provider.py +++ /dev/null @@ -1,66 +0,0 @@ -from pathlib import Path - -import pytest -from ophyd_async.core import DirectoryInfo - -from blueapi.data_management.visit_directory_provider import ( - DataCollectionIdentifier, - LocalVisitServiceClient, - VisitDirectoryProvider, - VisitServiceClientBase, -) - - -@pytest.fixture -def visit_service_client() -> VisitServiceClientBase: - return LocalVisitServiceClient() - - -@pytest.fixture -def visit_directory_provider( - visit_service_client: VisitServiceClientBase, -) -> VisitDirectoryProvider: - return VisitDirectoryProvider("example", Path("/tmp"), visit_service_client) - - -@pytest.mark.asyncio -async def test_client_can_view_collection( - visit_service_client: VisitServiceClientBase, -) -> None: - collection = await visit_service_client.get_current_collection() - assert collection == DataCollectionIdentifier(collectionNumber=0) - - -@pytest.mark.asyncio -async def test_client_can_create_collection( - visit_service_client: VisitServiceClientBase, -) -> None: - collection = await visit_service_client.create_new_collection() - assert collection == DataCollectionIdentifier(collectionNumber=1) - - -@pytest.mark.asyncio -async def test_update_sets_collection_number( - visit_directory_provider: VisitDirectoryProvider, -) -> None: - await visit_directory_provider.update() - assert visit_directory_provider() == DirectoryInfo( - directory_path="/tmp", - filename_prefix="example-1", - ) - - -@pytest.mark.asyncio -async def test_update_sets_collection_number_multi( - visit_directory_provider: VisitDirectoryProvider, -) -> None: - await visit_directory_provider.update() - assert visit_directory_provider() == DirectoryInfo( - directory_path="/tmp", - filename_prefix="example-1", - ) - await visit_directory_provider.update() - assert visit_directory_provider() == DirectoryInfo( - directory_path="/tmp", - filename_prefix="example-2", - ) diff --git a/tests/preprocessors/__init__.py b/tests/preprocessors/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/preprocessors/test_attach_metadata.py b/tests/preprocessors/test_attach_metadata.py deleted file mode 100644 index 8f879fc26..000000000 --- a/tests/preprocessors/test_attach_metadata.py +++ /dev/null @@ -1,399 +0,0 @@ -from collections.abc import Callable, Mapping -from pathlib import Path -from typing import Any - -import bluesky.plan_stubs as bps -import bluesky.plans as bp -import pytest -from bluesky import RunEngine -from bluesky.preprocessors import ( - run_decorator, - run_wrapper, - set_run_key_decorator, - set_run_key_wrapper, - stage_wrapper, -) -from bluesky.protocols import HasName, Readable, Reading, Status, Triggerable -from event_model.documents.event_descriptor import DataKey -from ophyd.status import StatusBase -from ophyd_async.core import DirectoryProvider - -from blueapi.core import DataEvent, MsgGenerator -from blueapi.data_management.visit_directory_provider import ( - DataCollectionIdentifier, - VisitDirectoryProvider, - VisitServiceClient, -) -from blueapi.preprocessors.attach_metadata import DATA_SESSION, attach_metadata - -DATA_DIRECTORY = Path("/tmp") -DATA_GROUP_NAME = "test" - - -RUN_0 = DATA_DIRECTORY / f"{DATA_GROUP_NAME}-0" -RUN_1 = DATA_DIRECTORY / f"{DATA_GROUP_NAME}-1" -RUN_2 = DATA_DIRECTORY / f"{DATA_GROUP_NAME}-2" - - -class MockVisitServiceClient(VisitServiceClient): - _count: int - _fail: bool - - def __init__(self) -> None: - super().__init__("http://example.com") - self._count = 0 - self._fail = False - - def always_fail(self) -> None: - self._fail = True - - async def create_new_collection(self) -> DataCollectionIdentifier: - if self._fail: - raise ConnectionError() - - count = self._count - self._count += 1 - return DataCollectionIdentifier(collectionNumber=count) - - async def get_current_collection(self) -> DataCollectionIdentifier: - if self._fail: - raise ConnectionError() - - return DataCollectionIdentifier(collectionNumber=self._count) - - -@pytest.fixture -def client() -> VisitServiceClient: - return MockVisitServiceClient() - - -@pytest.fixture -def provider(client: VisitServiceClient) -> VisitDirectoryProvider: - return VisitDirectoryProvider( - data_directory=DATA_DIRECTORY, - data_group_name=DATA_GROUP_NAME, - client=client, - ) - - -@pytest.fixture -def run_engine() -> RunEngine: - return RunEngine() - - -class FakeDetector(Readable, HasName, Triggerable): - _name: str - _provider: DirectoryProvider - - def __init__( - self, - name: str, - provider: DirectoryProvider, - ) -> None: - self._name = name - self._provider = provider - - async def read(self) -> dict[str, Reading]: - return { - f"{self.name}_data": { - "value": "test", - "timestamp": 0.0, - }, - } - - async def describe(self) -> dict[str, DataKey]: - directory_info = self._provider() - path = f"{directory_info.directory_path}/{directory_info.filename_prefix}" - return { - f"{self.name}_data": { - "dtype": "string", - "shape": [1], - "source": path, - } - } - - def trigger(self) -> Status: - status = StatusBase() - status.set_finished() - return status - - @property - def name(self) -> str: - return self._name - - @property - def parent(self) -> None: - return None - - -@pytest.fixture(params=[1, 2]) -def detectors(request, provider: VisitDirectoryProvider) -> list[Readable]: - number_of_detectors = request.param - return [ - FakeDetector( - name=f"test_detector_{i}", - provider=provider, - ) - for i in range(number_of_detectors) - ] - - -def simple_run(detectors: list[Readable]) -> MsgGenerator: - yield from bp.count(detectors) - - -def multi_run(detectors: list[Readable]) -> MsgGenerator: - yield from bp.count(detectors) - yield from bp.count(detectors) - - -def multi_nested_plan(detectors: list[Readable]) -> MsgGenerator: - yield from simple_run(detectors) - yield from simple_run(detectors) - - -def multi_run_single_stage(detectors: list[Readable]) -> MsgGenerator: - def stageless_count() -> MsgGenerator: - return (yield from bps.one_shot(detectors)) - - def inner_plan() -> MsgGenerator: - yield from run_wrapper(stageless_count()) - yield from run_wrapper(stageless_count()) - - yield from stage_wrapper(inner_plan(), detectors) - - -def multi_run_single_stage_multi_group( - detectors: list[Readable], -) -> MsgGenerator: - def stageless_count() -> MsgGenerator: - return (yield from bps.one_shot(detectors)) - - def inner_plan() -> MsgGenerator: - yield from run_wrapper(stageless_count(), md={DATA_SESSION: 1}) - yield from run_wrapper(stageless_count(), md={DATA_SESSION: 1}) - yield from run_wrapper(stageless_count(), md={DATA_SESSION: 2}) - yield from run_wrapper(stageless_count(), md={DATA_SESSION: 2}) - - yield from stage_wrapper(inner_plan(), detectors) - - -@run_decorator(md={DATA_SESSION: 12345}) -@set_run_key_decorator("outer") -def nested_run_with_metadata(detectors: list[Readable]) -> MsgGenerator: - yield from set_run_key_wrapper(bp.count(detectors), "inner") - yield from set_run_key_wrapper(bp.count(detectors), "inner") - - -@run_decorator() -@set_run_key_decorator("outer") -def nested_run_without_metadata( - detectors: list[Readable], -) -> MsgGenerator: - yield from set_run_key_wrapper(bp.count(detectors), "inner") - yield from set_run_key_wrapper(bp.count(detectors), "inner") - - -def test_simple_run_gets_scan_number( - run_engine: RunEngine, - detectors: list[Readable], - provider: DirectoryProvider, -) -> None: - docs = collect_docs( - run_engine, - simple_run(detectors), - provider, - ) - assert docs[0].name == "start" - assert docs[0].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert_all_detectors_used_collection_numbers(docs, detectors, [RUN_0]) - - -@pytest.mark.parametrize("plan", [multi_run, multi_nested_plan]) -def test_multi_run_gets_scan_numbers( - run_engine: RunEngine, - detectors: list[Readable], - plan: Callable[[list[Readable]], MsgGenerator], - provider: DirectoryProvider, -) -> None: - """Test is here to demonstrate that multi run plans will overwrite files.""" - docs = collect_docs( - run_engine, - plan(detectors), - provider, - ) - start_docs = find_start_docs(docs) - assert len(start_docs) == 2 - assert start_docs[0].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[1].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert_all_detectors_used_collection_numbers(docs, detectors, [RUN_0, RUN_0]) - - -def test_multi_run_single_stage( - run_engine: RunEngine, - detectors: list[Readable], - provider: DirectoryProvider, -) -> None: - docs = collect_docs( - run_engine, - multi_run_single_stage(detectors), - provider, - ) - start_docs = find_start_docs(docs) - assert len(start_docs) == 2 - assert start_docs[0].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[1].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert_all_detectors_used_collection_numbers( - docs, - detectors, - [ - RUN_0, - RUN_0, - ], - ) - - -def test_multi_run_single_stage_multi_group( - run_engine: RunEngine, - detectors: list[Readable], - provider: DirectoryProvider, -) -> None: - docs = collect_docs( - run_engine, - multi_run_single_stage_multi_group(detectors), - provider, - ) - start_docs = find_start_docs(docs) - assert len(start_docs) == 4 - assert start_docs[0].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[1].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[2].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[3].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert_all_detectors_used_collection_numbers( - docs, - detectors, - [ - RUN_0, - RUN_0, - RUN_0, - RUN_0, - ], - ) - - -def test_nested_run_with_metadata( - run_engine: RunEngine, - detectors: list[Readable], - provider: DirectoryProvider, -) -> None: - """Test is here to demonstrate that nested runs will be treated as a single run. - - That means detectors in such runs will overwrite files. - """ - docs = collect_docs( - run_engine, - nested_run_with_metadata(detectors), - provider, - ) - start_docs = find_start_docs(docs) - assert len(start_docs) == 3 - assert start_docs[0].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[1].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[2].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert_all_detectors_used_collection_numbers(docs, detectors, [RUN_0, RUN_0]) - - -def test_nested_run_without_metadata( - run_engine: RunEngine, - detectors: list[Readable], - provider: DirectoryProvider, -) -> None: - """Test is here to demonstrate that nested runs will be treated as a single run. - - That means detectors in such runs will overwrite files. - """ - docs = collect_docs( - run_engine, - nested_run_without_metadata(detectors), - provider, - ) - start_docs = find_start_docs(docs) - assert len(start_docs) == 3 - assert start_docs[0].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[1].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert start_docs[2].doc[DATA_SESSION] == f"{DATA_GROUP_NAME}-0" - assert_all_detectors_used_collection_numbers(docs, detectors, [RUN_0, RUN_0]) - - -def test_visit_directory_provider_fails( - run_engine: RunEngine, - detectors: list[Readable], - provider: DirectoryProvider, - client: MockVisitServiceClient, -) -> None: - client.always_fail() - with pytest.raises(ValueError): - collect_docs( - run_engine, - simple_run(detectors), - provider, - ) - - -def test_visit_directory_provider_fails_after_one_sucess( - run_engine: RunEngine, - detectors: list[Readable], - provider: DirectoryProvider, - client: MockVisitServiceClient, -) -> None: - collect_docs( - run_engine, - simple_run(detectors), - provider, - ) - client.always_fail() - with pytest.raises(ValueError): - collect_docs( - run_engine, - simple_run(detectors), - provider, - ) - - -def collect_docs( - run_engine: RunEngine, - plan: MsgGenerator, - provider: DirectoryProvider, -) -> list[DataEvent]: - events = [] - - def on_event(name: str, doc: Mapping[str, Any]) -> None: - events.append(DataEvent(name=name, doc=doc)) - - wrapped_plan = attach_metadata(plan, provider) - run_engine(wrapped_plan, on_event) - return events - - -def assert_all_detectors_used_collection_numbers( - docs: list[DataEvent], - detectors: list[Readable], - source_history: list[Path], -) -> None: - descriptors = find_descriptor_docs(docs) - assert len(descriptors) == len(source_history) - - for descriptor, expected_source in zip(descriptors, source_history, strict=False): - for detector in detectors: - source = descriptor.doc.get("data_keys", {}).get(f"{detector.name}_data")[ - "source" - ] - assert Path(source) == expected_source - - -def find_start_docs(docs: list[DataEvent]) -> list[DataEvent]: - return list(filter(lambda event: event.name == "start", docs)) - - -def find_descriptor_docs(docs: list[DataEvent]) -> list[DataEvent]: - return list(filter(lambda event: event.name == "descriptor", docs))