diff --git a/pyproject.toml b/pyproject.toml index 68dc1817f..68cfde65f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,6 @@ dependencies = [ "fastapi[all]<0.99", "uvicorn", "requests", - "dls-bluesky-core", #requires ophyd-async "dls-dodal", "typing_extensions<4.6", ] diff --git a/src/blueapi/plans/__init__.py b/src/blueapi/plans/__init__.py new file mode 100644 index 000000000..d7521474c --- /dev/null +++ b/src/blueapi/plans/__init__.py @@ -0,0 +1,22 @@ +from .scanspec import scan +from .wrapped import count + +""" +This package is intended to hold MsgGenerator functions which act as self-contained +experiments: they start runs, collect data, and close the runs. While they may be used +as building blocks for larger nested plans, they are primarily intended to be run as-is, +and any common functionality which may be useful to multiple plans extracted to stubs/. +Plans: +- Must have type hinted arguments, Should use the loosest sensible bounds +- Must have docstrings describing behaviour and arguments of the function +- Must not have variadic args or kwargs, Should pass collections instead +- Must have optional argument named 'metadata' to add metadata to run(s) +- Must add 'plan_args' to metadata with complete representation including defaults, None +- Must add 'detectors', 'motors' metadata with list of names of relevant devices +- Should pass 'shape' to metadata if the run's shape is knowable +""" + +__all__ = [ + "count", + "scan", +] diff --git a/src/blueapi/plans/scanspec.py b/src/blueapi/plans/scanspec.py new file mode 100644 index 000000000..aedd9cf7a --- /dev/null +++ b/src/blueapi/plans/scanspec.py @@ -0,0 +1,69 @@ +import operator +from functools import reduce +from typing import Any, List, Mapping, Optional + +import bluesky.plans as bp +from bluesky.protocols import Movable, Readable +from cycler import Cycler, cycler +from dodal.common import MsgGenerator +from scanspec.specs import Spec + +""" +Plans related to the use of the `ScanSpec https://github.com/dls-controls/scanspec` +library for constructing arbitrarily complex N-dimensional trajectories, similar to +Diamond's "mapping scans" using ScanPointGenerator. +""" + + +def scan( + detectors: List[Readable], + axes_to_move: Mapping[str, Movable], + spec: Spec[str], + metadata: Optional[Mapping[str, Any]] = None, +) -> MsgGenerator: + """ + Scan wrapping `bp.scan_nd` + Args: + detectors: List of readable devices, will take a reading at each point + axes_to_move: All axes involved in this scan, names and objects + spec: ScanSpec modelling the path of the scan + metadata: Key-value metadata to include in exported data, defaults to None. + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + + _md = { + "plan_args": { + "detectors": list(map(repr, detectors)), + "axes_to_move": {k: repr(v) for k, v in axes_to_move.items()}, + "spec": repr(spec), + }, + "plan_name": "scan", + "shape": spec.shape(), + **(metadata or {}), + } + + cycler = _scanspec_to_cycler(spec, axes_to_move) + yield from bp.scan_nd(detectors, cycler, md=_md) + + +def _scanspec_to_cycler(spec: Spec[str], axes: Mapping[str, Movable]) -> Cycler: + """ + Convert a scanspec to a cycler for compatibility with legacy Bluesky plans such as + `bp.scan_nd`. Use the midpoints of the scanspec since cyclers are normally used + for software triggered scans. + Args: + spec: A scanspec + axes: Names and axes to move + Returns: + Cycler: A new cycler + """ + + midpoints = spec.frames().midpoints + midpoints = {axes[name]: points for name, points in midpoints.items()} + + # Need to "add" the cyclers for all the axes together. The code below is + # effectively: cycler(motor1, [...]) + cycler(motor2, [...]) + ... + return reduce(operator.add, (cycler(*args) for args in midpoints.items())) diff --git a/src/blueapi/plans/wrapped.py b/src/blueapi/plans/wrapped.py new file mode 100644 index 000000000..10716db69 --- /dev/null +++ b/src/blueapi/plans/wrapped.py @@ -0,0 +1,45 @@ +from typing import Any, List, Mapping, Optional, Union + +import bluesky.plans as bp +from bluesky.protocols import Readable +from dodal.common import MsgGenerator + +""" +Wrappers for Bluesky built-in plans with type hinting and consistently named metadata +Provided here until https://github.com/bluesky/bluesky/pull/1610 is merged +""" + + +def count( + detectors: List[Readable], + num: int = 1, + delay: Optional[Union[float, List[float]]] = None, + metadata: Optional[Mapping[str, Any]] = None, +) -> MsgGenerator: + """ + Take `n` readings from a device + Args: + detectors (List[Readable]): Readable devices to read + num (int, optional): Number of readings to take. Defaults to 1. + delay (Optional[Union[float, List[float]]], optional): Delay between readings. + Defaults to None. + metadata (Optional[Mapping[str, Any]], optional): Key-value metadata to include + in exported data. Defaults to None. + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + plan_args = { + # Until release after https://github.com/bluesky/bluesky/pull/1655 is merged + "detectors": list(map(repr, detectors)), + "num": num, + "delay": delay, + } + + _md = { + "plan_args": plan_args, + **(metadata or {}), + } + + yield from bp.count(detectors, num, delay=delay, md=_md) diff --git a/src/blueapi/stubs/__init__.py b/src/blueapi/stubs/__init__.py new file mode 100644 index 000000000..dc2dabc22 --- /dev/null +++ b/src/blueapi/stubs/__init__.py @@ -0,0 +1,29 @@ +from typing import List + +from .wrapped import move, move_relative, set_absolute, set_relative, sleep, wait + +""" +This package is intended to hold MsgGenerator functions which are not self-contained +data collections: while they may start runs, collect data, or close runs, they are +blocks for larger nested plans, and may not make sense to be run as-is. Functions that +may make sense as isolated blocks of functionality (e.g. moving a motor) should be added +to the __export__ list: without this list, it is assumed that all MsgGenerator functions +in the package should be imported by any services which respect it. +Functions that yield from multiple stubs and offer a complete workflow +should be moved to plans/. +This package should never have a dependency on plans/. +Stubs: +- Must have type hinted arguments, Should use the loosest sensible bounds +- Must have docstrings describing behaviour and arguments of the function +- Must not have variadic args or kwargs, Should pass collections instead +- Allow metadata to be propagated through if calling other stubs that take metadata +""" + +__all__: List[str] = [ # Available for import by BlueAPI and other modules + "set_absolute", + "set_relative", + "move", + "move_relative", + "sleep", + "wait", +] diff --git a/src/blueapi/stubs/flyable.py b/src/blueapi/stubs/flyable.py new file mode 100644 index 000000000..3cdd0dacd --- /dev/null +++ b/src/blueapi/stubs/flyable.py @@ -0,0 +1,71 @@ +import time +from typing import Protocol, runtime_checkable + +import bluesky.plan_stubs as bps +from bluesky.protocols import Collectable, Flyable +from dodal.common import MsgGenerator, group_uuid + + +@runtime_checkable +class FlyableCollectable(Flyable, Collectable, Protocol): + """ + A Device which implements both the Flyable and Collectable protocols. + i.e., a device which can be set off, then polled repeatedly to construct documents + with the data it has collected so far. A typical pattern for "hardware" scans. + """ + + +def fly_and_collect( + flyer: FlyableCollectable, + flush_period: float = 0.5, + checkpoint_every_collect: bool = False, + stream_name: str = "primary", + timeout: float = 7_200, # 2 hours +) -> MsgGenerator: + """Fly and collect a flyer, waiting for collect to finish with a period. + flyer.kickoff and complete are called, which starts the fly scanning process. + bps.wait is called, which finishes after each flush period and then repeats, until + complete finishes. At this point, bps.collect is called to gather the documents + produced. + For some flyers, this plan will need to be called in succession in order to, for + example, set up a flyer to send triggers multiple times and collect data. For such + a use case, this plan can be setup to checkpoint for each collect. + Note: this plan must be wrapped with calls to open and close run, and the flyer + must implement the Collectable protocol. See tests/stubs/test_flyables for an + example. + Args: + flyer (FlyableCollectable): ophyd-async device which implements + Flyable and Collectable. + flush_period (float): How often to check if flyer.complete has finished in + seconds. Default 0.5 + checkpoint_every_collect (bool): whether or not to checkpoint after + flyer.collect has been called. Default False. + stream_name (str): name of the stream to collect from. Default "primary". + timeout (float): total time allowed for this stub before timing out in seconds. + Default 7,200 (2 hours). + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + yield from bps.kickoff(flyer) + complete_group = group_uuid("complete") + yield from bps.complete(flyer, group=complete_group) + start_time = time.time() + done = False + + while not done: + if time.time() - start_time < timeout: + try: + yield from bps.wait(group=complete_group, timeout=flush_period) + except TimeoutError: + pass + else: + done = True + yield from bps.collect( + flyer, stream=True, return_payload=False, name=stream_name + ) + if checkpoint_every_collect: + yield from bps.checkpoint() + else: + raise TimeoutError("fly_and_collect took longer than {timeout} to complete") diff --git a/src/blueapi/stubs/wrapped.py b/src/blueapi/stubs/wrapped.py new file mode 100644 index 000000000..a88422221 --- /dev/null +++ b/src/blueapi/stubs/wrapped.py @@ -0,0 +1,122 @@ +import itertools +from typing import Any, Mapping, Optional, TypeVar + +import bluesky.plan_stubs as bps +from bluesky.protocols import Movable +from dodal.common.types import Group, MsgGenerator + +""" +Wrappers for Bluesky built-in plan stubs with type hinting +Provided here until https://github.com/bluesky/bluesky/pull/1610 is merged +""" + +T = TypeVar("T") + + +def set_absolute( + movable: Movable, value: T, group: Optional[Group] = None, wait: bool = False +) -> MsgGenerator: + """ + Set a device, wrapper for `bp.abs_set`. + Args: + movable (Movable): The device to set + value (T): The new value + group (Optional[Group], optional): The message group to associate with the + setting, for sequencing. Defaults to None. + wait (bool, optional): The group should wait until all setting is complete + (e.g. a motor has finished moving). Defaults to False. + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + + return (yield from bps.abs_set(movable, value, group=group, wait=wait)) + + +def set_relative( + movable: Movable, value: T, group: Optional[Group] = None, wait: bool = False +) -> MsgGenerator: + """ + Change a device, wrapper for `bp.rel_set`. + Args: + movable (Movable): The device to set + value (T): The new value + group (Optional[Group], optional): The message group to associate with the + setting, for sequencing. Defaults to None. + wait (bool, optional): The group should wait until all setting is complete + (e.g. a motor has finished moving). Defaults to False. + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + + return (yield from bps.rel_set(movable, value, group=group, wait=wait)) + + +def move(moves: Mapping[Movable, Any], group: Optional[Group] = None) -> MsgGenerator: + """ + Move a device, wrapper for `bp.mv`. + Args: + moves (Mapping[Movable, Any]): Mapping of Movables to target positions + group (Optional[Group], optional): The message group to associate with the + setting, for sequencing. Defaults to None. + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + + return ( + yield from bps.mv(*itertools.chain.from_iterable(moves.items()), group=group) + ) + + +def move_relative( + moves: Mapping[Movable, Any], group: Optional[Group] = None +) -> MsgGenerator: + """ + Move a device relative to its current position, wrapper for `bp.mvr`. + Args: + moves (Mapping[Movable, Any]): Mapping of Movables to target deltas + group (Optional[Group], optional): The message group to associate with the + setting, for sequencing. Defaults to None. + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + + return ( + yield from bps.mvr(*itertools.chain.from_iterable(moves.items()), group=group) + ) + + +def sleep(time: float) -> MsgGenerator: + """ + Suspend all action for a given time, wrapper for `bp.sleep` + Args: + time (float): Time to wait in seconds + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + + return (yield from bps.sleep(time)) + + +def wait(group: Optional[Group] = None) -> MsgGenerator: + """ + Wait for a group status to complete, wrapper for `bp.wait` + Args: + group (Optional[Group], optional): The name of the group to wait for, defaults + to None. + Returns: + MsgGenerator: Plan + Yields: + Iterator[MsgGenerator]: Bluesky messages + """ + + return (yield from bps.wait(group)) diff --git a/tests/plans/test_compliance.py b/tests/plans/test_compliance.py new file mode 100644 index 000000000..346fc09c5 --- /dev/null +++ b/tests/plans/test_compliance.py @@ -0,0 +1,68 @@ +import inspect +from types import ModuleType +from typing import Any, List, Mapping, Optional, get_type_hints + +import dodal.plans as plans +import dodal.stubs as stubs +from dodal.common import MsgGenerator, PlanGenerator + + +def is_bluesky_plan_generator(func: Any) -> bool: + try: + return get_type_hints(func).get("return") == MsgGenerator + except TypeError: + # get_type_hints fails on some objects (such as Union or Optional) + return False + + +def get_all_available_generators(mod: ModuleType): + def get_named_subset(names: List[str]): + for name in names: + yield getattr(mod, name) + + if "__export__" in mod.__dict__: + yield from get_named_subset(getattr(mod, "__export__")) + elif "__all__" in mod.__dict__: + yield from get_named_subset(getattr(mod, "__all__")) + else: + for name, value in mod.__dict__.items(): + if not name.startswith("_"): + yield value + + +def assert_hard_requirements(plan: PlanGenerator, signature: inspect.Signature): + assert plan.__doc__ is not None, f"'{plan.__name__}' has no docstring" + for parameter in signature.parameters.values(): + assert ( + parameter.kind is not parameter.VAR_POSITIONAL + and parameter.kind is not parameter.VAR_KEYWORD + ), f"'{plan.__name__}' has variadic arguments" + + +def assert_metadata_requirements(plan: PlanGenerator, signature: inspect.Signature): + assert ( + "metadata" in signature.parameters + ), f"'{plan.__name__}' does not allow metadata" + metadata = signature.parameters["metadata"] + assert ( + metadata.annotation == Optional[Mapping[str, Any]] + and metadata.default is not inspect.Parameter.empty + ), f"'{plan.__name__}' metadata is not optional" + assert metadata.default is None, f"'{plan.__name__}' metadata default is mutable" + + +def test_plans_comply(): + for plan in get_all_available_generators(plans): + if is_bluesky_plan_generator(plan): + signature = inspect.Signature.from_callable(plan) + assert_hard_requirements(plan, signature) + assert_metadata_requirements(plan, signature) + + +def test_stubs_comply(): + for plan in get_all_available_generators(stubs): + if is_bluesky_plan_generator(plan): + signature = inspect.Signature.from_callable(plan) + assert_hard_requirements(plan, signature) + if "metadata" in signature.parameters: + assert_metadata_requirements(plan, signature) diff --git a/tests/plans/test_scanspec.py b/tests/plans/test_scanspec.py new file mode 100644 index 000000000..d1b4d0d9f --- /dev/null +++ b/tests/plans/test_scanspec.py @@ -0,0 +1,88 @@ +from asyncio import Future + +import pytest +from bluesky import RunEngine +from ophyd.sim import Syn2DGauss, SynAxis, SynGauss +from scanspec.specs import Line, Spiral + +from blueapi.plans.scanspec import scan + + +@pytest.fixture +def x() -> SynAxis: + return SynAxis(name="x") + + +@pytest.fixture +def y() -> SynAxis: + return SynAxis(name="y") + + +def capture_document_return_token(future: Future, run_engine: RunEngine, name) -> int: + def set_result(_, doc): + future.set_result(doc) + + return run_engine.subscribe(name=name, func=set_result) + + +def test_metadata_of_simple_spec(RE, x, event_loop): + det = SynGauss( + name="det", + motor=x, + motor_field=x.name, + center=0.0, + Imax=1, + labels={"detectors"}, + ) + spec = Line(axis=x.name, start=1, stop=2, num=3) + + start_future = event_loop.create_future() + tok = capture_document_return_token(start_future, RE, "start") + RE(scan([det], {x.name: x}, spec)) + RE.unsubscribe(tok) + + start_document = start_future.result() + plan_args = start_document["plan_args"] + + assert len(plan_args) == 3 + assert plan_args["detectors"] == [repr(det)] + assert plan_args["spec"] == repr(spec) + assert plan_args["axes_to_move"] == { + x.name: repr(x), + } + + assert start_document["motors"] == [x.name] + assert start_document["detectors"] == [det.name] + + +def test_metadata_of_spiral_spec(RE, x, y, event_loop): + det = Syn2DGauss( + name="det", + motor0=x, + motor_field0=x.name, + motor1=y, + motor_field1=y.name, + center=(0.0, 0.0), + Imax=1, + labels={"detectors"}, + ) + spec = Spiral.spaced(x.name, y.name, 0, 0, 5, 1) + + start_future = event_loop.create_future() + tok = capture_document_return_token(start_future, RE, "start") + RE(scan([det], {x.name: x, y.name: y}, spec)) + RE.unsubscribe(tok) + + start_document = start_future.result() + plan_args = start_document["plan_args"] + + assert len(plan_args) == 3 + assert plan_args["detectors"] == [repr(det)] + assert plan_args["spec"] == repr(spec) + assert plan_args["axes_to_move"] == {x.name: repr(x), y.name: repr(y)} + + assert set(start_document["motors"]) == { + x.name, + y.name, + } # Order of motors in scan_nd not guaranteed + assert start_document["detectors"] == [det.name] diff --git a/tests/stubs/test_flyable.py b/tests/stubs/test_flyable.py new file mode 100644 index 000000000..0efc14fed --- /dev/null +++ b/tests/stubs/test_flyable.py @@ -0,0 +1,55 @@ +import asyncio +from typing import Dict + +import bluesky.plan_stubs as bps +import pytest +from bluesky.protocols import Collectable, Descriptor, Flyable +from ophyd_async.core import AsyncStatus + +from blueapi.stubs.flyable import fly_and_collect + + +class DummyFlyer(Flyable, Collectable): + def __init__(self, name: str) -> None: + self._name = name + self.has_flown = False + + @property + def name(self) -> str: + return self._name + + @AsyncStatus.wrap + async def kickoff(self) -> None: + self._fly_status = AsyncStatus(self._fly()) + + async def _fly(self) -> None: + self.has_flown = True + await asyncio.sleep(0.1) + + def complete(self) -> AsyncStatus: + return self._fly_status + + def describe_collect(self) -> Dict[str, Descriptor]: + return { + self.name: Descriptor( + source="some:source", shape=[], dtype="array", external="STREAM:" + ) + } + + +@pytest.fixture +def flyer() -> Flyable: + return DummyFlyer("test") + + +@pytest.mark.asyncio +async def test_fly_and_collect(RE, flyer: DummyFlyer): + def open_and_close_run_for_fly_and_collect(): + yield from bps.open_run() + yield from fly_and_collect( + flyer, flush_period=0.01, checkpoint_every_collect=True + ) + yield from bps.close_run() + + RE(open_and_close_run_for_fly_and_collect()) + assert flyer.has_flown is True