Skip to content

Commit

Permalink
Restore tests and stubs previously moved to dls-bluesky-core
Browse files Browse the repository at this point in the history
- Wrapped Bluesky plans only required for BlueAPI, able to expose through new scratch dir
  • Loading branch information
DiamondJoseph committed Mar 6, 2024
1 parent 6e85e3d commit 8348b81
Show file tree
Hide file tree
Showing 10 changed files with 569 additions and 1 deletion.
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ dependencies = [
"fastapi[all]<0.99",
"uvicorn",
"requests",
"dls-bluesky-core", #requires ophyd-async
"dls-dodal",
"typing_extensions<4.6",
]
Expand Down
22 changes: 22 additions & 0 deletions src/blueapi/plans/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from .scanspec import scan
from .wrapped import count

"""
This package is intended to hold MsgGenerator functions which act as self-contained
experiments: they start runs, collect data, and close the runs. While they may be used
as building blocks for larger nested plans, they are primarily intended to be run as-is,
and any common functionality which may be useful to multiple plans extracted to stubs/.
Plans:
- Must have type hinted arguments, Should use the loosest sensible bounds
- Must have docstrings describing behaviour and arguments of the function
- Must not have variadic args or kwargs, Should pass collections instead
- Must have optional argument named 'metadata' to add metadata to run(s)
- Must add 'plan_args' to metadata with complete representation including defaults, None
- Must add 'detectors', 'motors' metadata with list of names of relevant devices
- Should pass 'shape' to metadata if the run's shape is knowable
"""

__all__ = [
"count",
"scan",
]
69 changes: 69 additions & 0 deletions src/blueapi/plans/scanspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import operator
from functools import reduce
from typing import Any, List, Mapping, Optional

import bluesky.plans as bp
from bluesky.protocols import Movable, Readable
from cycler import Cycler, cycler
from dodal.common import MsgGenerator
from scanspec.specs import Spec

"""
Plans related to the use of the `ScanSpec https://github.com/dls-controls/scanspec`
library for constructing arbitrarily complex N-dimensional trajectories, similar to
Diamond's "mapping scans" using ScanPointGenerator.
"""


def scan(
detectors: List[Readable],
axes_to_move: Mapping[str, Movable],
spec: Spec[str],
metadata: Optional[Mapping[str, Any]] = None,
) -> MsgGenerator:
"""
Scan wrapping `bp.scan_nd`
Args:
detectors: List of readable devices, will take a reading at each point
axes_to_move: All axes involved in this scan, names and objects
spec: ScanSpec modelling the path of the scan
metadata: Key-value metadata to include in exported data, defaults to None.
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""

_md = {
"plan_args": {
"detectors": list(map(repr, detectors)),
"axes_to_move": {k: repr(v) for k, v in axes_to_move.items()},
"spec": repr(spec),
},
"plan_name": "scan",
"shape": spec.shape(),
**(metadata or {}),
}

cycler = _scanspec_to_cycler(spec, axes_to_move)
yield from bp.scan_nd(detectors, cycler, md=_md)


def _scanspec_to_cycler(spec: Spec[str], axes: Mapping[str, Movable]) -> Cycler:
"""
Convert a scanspec to a cycler for compatibility with legacy Bluesky plans such as
`bp.scan_nd`. Use the midpoints of the scanspec since cyclers are normally used
for software triggered scans.
Args:
spec: A scanspec
axes: Names and axes to move
Returns:
Cycler: A new cycler
"""

midpoints = spec.frames().midpoints
midpoints = {axes[name]: points for name, points in midpoints.items()}

# Need to "add" the cyclers for all the axes together. The code below is
# effectively: cycler(motor1, [...]) + cycler(motor2, [...]) + ...
return reduce(operator.add, (cycler(*args) for args in midpoints.items()))
45 changes: 45 additions & 0 deletions src/blueapi/plans/wrapped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Any, List, Mapping, Optional, Union

import bluesky.plans as bp
from bluesky.protocols import Readable
from dodal.common import MsgGenerator

"""
Wrappers for Bluesky built-in plans with type hinting and consistently named metadata
Provided here until https://github.com/bluesky/bluesky/pull/1610 is merged
"""


def count(
detectors: List[Readable],
num: int = 1,
delay: Optional[Union[float, List[float]]] = None,
metadata: Optional[Mapping[str, Any]] = None,
) -> MsgGenerator:
"""
Take `n` readings from a device
Args:
detectors (List[Readable]): Readable devices to read
num (int, optional): Number of readings to take. Defaults to 1.
delay (Optional[Union[float, List[float]]], optional): Delay between readings.
Defaults to None.
metadata (Optional[Mapping[str, Any]], optional): Key-value metadata to include
in exported data. Defaults to None.
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""
plan_args = {
# Until release after https://github.com/bluesky/bluesky/pull/1655 is merged
"detectors": list(map(repr, detectors)),
"num": num,
"delay": delay,
}

_md = {
"plan_args": plan_args,
**(metadata or {}),
}

yield from bp.count(detectors, num, delay=delay, md=_md)
29 changes: 29 additions & 0 deletions src/blueapi/stubs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from typing import List

from .wrapped import move, move_relative, set_absolute, set_relative, sleep, wait

"""
This package is intended to hold MsgGenerator functions which are not self-contained
data collections: while they may start runs, collect data, or close runs, they are
blocks for larger nested plans, and may not make sense to be run as-is. Functions that
may make sense as isolated blocks of functionality (e.g. moving a motor) should be added
to the __export__ list: without this list, it is assumed that all MsgGenerator functions
in the package should be imported by any services which respect it.
Functions that yield from multiple stubs and offer a complete workflow
should be moved to plans/.
This package should never have a dependency on plans/.
Stubs:
- Must have type hinted arguments, Should use the loosest sensible bounds
- Must have docstrings describing behaviour and arguments of the function
- Must not have variadic args or kwargs, Should pass collections instead
- Allow metadata to be propagated through if calling other stubs that take metadata
"""

__all__: List[str] = [ # Available for import by BlueAPI and other modules
"set_absolute",
"set_relative",
"move",
"move_relative",
"sleep",
"wait",
]
71 changes: 71 additions & 0 deletions src/blueapi/stubs/flyable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import time
from typing import Protocol, runtime_checkable

import bluesky.plan_stubs as bps
from bluesky.protocols import Collectable, Flyable
from dodal.common import MsgGenerator, group_uuid


@runtime_checkable
class FlyableCollectable(Flyable, Collectable, Protocol):
"""
A Device which implements both the Flyable and Collectable protocols.
i.e., a device which can be set off, then polled repeatedly to construct documents
with the data it has collected so far. A typical pattern for "hardware" scans.
"""


def fly_and_collect(
flyer: FlyableCollectable,
flush_period: float = 0.5,
checkpoint_every_collect: bool = False,
stream_name: str = "primary",
timeout: float = 7_200, # 2 hours
) -> MsgGenerator:
"""Fly and collect a flyer, waiting for collect to finish with a period.
flyer.kickoff and complete are called, which starts the fly scanning process.
bps.wait is called, which finishes after each flush period and then repeats, until
complete finishes. At this point, bps.collect is called to gather the documents
produced.
For some flyers, this plan will need to be called in succession in order to, for
example, set up a flyer to send triggers multiple times and collect data. For such
a use case, this plan can be setup to checkpoint for each collect.
Note: this plan must be wrapped with calls to open and close run, and the flyer
must implement the Collectable protocol. See tests/stubs/test_flyables for an
example.
Args:
flyer (FlyableCollectable): ophyd-async device which implements
Flyable and Collectable.
flush_period (float): How often to check if flyer.complete has finished in
seconds. Default 0.5
checkpoint_every_collect (bool): whether or not to checkpoint after
flyer.collect has been called. Default False.
stream_name (str): name of the stream to collect from. Default "primary".
timeout (float): total time allowed for this stub before timing out in seconds.
Default 7,200 (2 hours).
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""
yield from bps.kickoff(flyer)
complete_group = group_uuid("complete")
yield from bps.complete(flyer, group=complete_group)
start_time = time.time()
done = False

while not done:
if time.time() - start_time < timeout:
try:
yield from bps.wait(group=complete_group, timeout=flush_period)
except TimeoutError:
pass
else:
done = True
yield from bps.collect(
flyer, stream=True, return_payload=False, name=stream_name
)
if checkpoint_every_collect:
yield from bps.checkpoint()
else:
raise TimeoutError("fly_and_collect took longer than {timeout} to complete")
122 changes: 122 additions & 0 deletions src/blueapi/stubs/wrapped.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import itertools
from typing import Any, Mapping, Optional, TypeVar

import bluesky.plan_stubs as bps
from bluesky.protocols import Movable
from dodal.common.types import Group, MsgGenerator

"""
Wrappers for Bluesky built-in plan stubs with type hinting
Provided here until https://github.com/bluesky/bluesky/pull/1610 is merged
"""

T = TypeVar("T")


def set_absolute(
movable: Movable, value: T, group: Optional[Group] = None, wait: bool = False
) -> MsgGenerator:
"""
Set a device, wrapper for `bp.abs_set`.
Args:
movable (Movable): The device to set
value (T): The new value
group (Optional[Group], optional): The message group to associate with the
setting, for sequencing. Defaults to None.
wait (bool, optional): The group should wait until all setting is complete
(e.g. a motor has finished moving). Defaults to False.
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""

return (yield from bps.abs_set(movable, value, group=group, wait=wait))


def set_relative(
movable: Movable, value: T, group: Optional[Group] = None, wait: bool = False
) -> MsgGenerator:
"""
Change a device, wrapper for `bp.rel_set`.
Args:
movable (Movable): The device to set
value (T): The new value
group (Optional[Group], optional): The message group to associate with the
setting, for sequencing. Defaults to None.
wait (bool, optional): The group should wait until all setting is complete
(e.g. a motor has finished moving). Defaults to False.
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""

return (yield from bps.rel_set(movable, value, group=group, wait=wait))


def move(moves: Mapping[Movable, Any], group: Optional[Group] = None) -> MsgGenerator:
"""
Move a device, wrapper for `bp.mv`.
Args:
moves (Mapping[Movable, Any]): Mapping of Movables to target positions
group (Optional[Group], optional): The message group to associate with the
setting, for sequencing. Defaults to None.
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""

return (
yield from bps.mv(*itertools.chain.from_iterable(moves.items()), group=group)
)


def move_relative(
moves: Mapping[Movable, Any], group: Optional[Group] = None
) -> MsgGenerator:
"""
Move a device relative to its current position, wrapper for `bp.mvr`.
Args:
moves (Mapping[Movable, Any]): Mapping of Movables to target deltas
group (Optional[Group], optional): The message group to associate with the
setting, for sequencing. Defaults to None.
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""

return (
yield from bps.mvr(*itertools.chain.from_iterable(moves.items()), group=group)
)


def sleep(time: float) -> MsgGenerator:
"""
Suspend all action for a given time, wrapper for `bp.sleep`
Args:
time (float): Time to wait in seconds
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""

return (yield from bps.sleep(time))


def wait(group: Optional[Group] = None) -> MsgGenerator:
"""
Wait for a group status to complete, wrapper for `bp.wait`
Args:
group (Optional[Group], optional): The name of the group to wait for, defaults
to None.
Returns:
MsgGenerator: Plan
Yields:
Iterator[MsgGenerator]: Bluesky messages
"""

return (yield from bps.wait(group))
Loading

0 comments on commit 8348b81

Please sign in to comment.