Skip to content

Commit

Permalink
Use external service to configure ophyd-async detector data writing (#…
Browse files Browse the repository at this point in the history
…315)

Production version of #283 

This PR allows coordination with a central service for creating unique
groups of data (called collections) and configures ophyd-async detectors
to write their data to the same location for a given collection.

Changes:
- Add mechanism to preprocess all plans with set bluesky
[preprocessors](https://blueskyproject.io/bluesky/plans.html#plan-preprocessors)
- Create directory provider that knows how to talk to GDA's visit
directory API and provide a unique collection number to group data files
- Create dummy directory provider that works in a similar way without
the need for an external server (useful for development)
- Create preprocessor that uses the directory provider and groups
detectors by staging, also bundles the data group information into run
start documents on a best effort basis.
- Add tests

---------

Co-authored-by: Rose Yemelyanova <[email protected]>
  • Loading branch information
callumforrester and Rose Yemelyanova authored Oct 25, 2023
1 parent c3b2c83 commit d80eca0
Show file tree
Hide file tree
Showing 22 changed files with 791 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/actions/install_requirements/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ runs:
- name: Create lockfile
run: |
mkdir -p lockfiles
pip freeze --exclude-editable --exclude dodal > lockfiles/${{ inputs.requirements_file }}
pip freeze --exclude-editable --exclude ophyd-async --exclude dls-dodal --exclude dls-bluesky-core --exclude bluesky > lockfiles/${{ inputs.requirements_file }}
# delete the self referencing line and make sure it isn't blank
sed -i '/file:/d' lockfiles/${{ inputs.requirements_file }}
shell: bash
Expand Down
7 changes: 3 additions & 4 deletions .github/workflows/code.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,13 @@ jobs:
# https://github.com/pytest-dev/pytest/issues/2042
PY_IGNORE_IMPORTMISMATCH: "1"
BLUEAPI_TEST_STOMP_PORTS: "[61613,61614]"


steps:
- name: Start RabbitMQ
uses: namoshek/rabbitmq-github-action@v1
with:
ports: '61614:61613'
plugins: rabbitmq_stomp
ports: "61614:61613"
plugins: rabbitmq_stomp

- name: Checkout
uses: actions/checkout@v3
Expand Down Expand Up @@ -165,7 +164,7 @@ jobs:
uses: docker/build-push-action@v3
with:
build-args: |
PIP_OPTIONS=-r lockfiles/requirements.txt dist/*.whl
PIP_OPTIONS=-r lockfiles/requirements.txt git+https://github.com/bluesky/bluesky.git git+https://github.com/DiamondLightSource/dls-bluesky-core.git git+https://github.com/DiamondLightSource/dodal.git@directory_provider dist/*.whl
push: ${{ github.event_name == 'push' && startsWith(github.ref, 'refs/tags') }}
load: ${{ ! (github.event_name == 'push' && startsWith(github.ref, 'refs/tags')) }}
tags: ${{ steps.meta.outputs.tags }}
Expand Down
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
},
"esbonio.server.enabled": true,
"esbonio.sphinx.confDir": "",
}
}
3 changes: 2 additions & 1 deletion docs/developer/explanations/lifecycle.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ of being written, loaded and run. Take the following plan.
from typing import Any, List, Mapping, Optional, Union
import bluesky.plans as bp
from blueapi.core import MsgGenerator, inject
from blueapi.core import MsgGenerator
from dls_bluesky_core.core import inject
from bluesky.protocols import Readable
Expand Down
8 changes: 5 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,20 @@ classifiers = [
]
description = "Lightweight Bluesky-as-a-service wrapper application. Also usable as a library."
dependencies = [
"bluesky<1.11",
"bluesky @ git+https://github.com/bluesky/bluesky.git",
"ophyd",
"nslsii",
"pyepics",
"pydantic<2.0",
"stomp.py",
"aiohttp",
"PyYAML",
"click<8.1.4",
"fastapi[all]<0.100",
"uvicorn",
"requests",
"dls_bluesky_core",
"dls-dodal",
"dls-bluesky-core @ git+https://github.com/DiamondLightSource/dls-bluesky-core.git", #requires ophyd-async
"dls-dodal @ git+https://github.com/DiamondLightSource/dodal.git@directory_provider", # requires aioca...
"typing_extensions<4.6",
]
dynamic = ["version"]
Expand All @@ -43,6 +44,7 @@ dev = [
"pre-commit",
"pydata-sphinx-theme>=0.12",
"pytest-cov",
"pytest-asyncio",
"sphinx-autobuild",
"sphinx-copybutton",
"sphinx-click",
Expand Down
12 changes: 7 additions & 5 deletions src/blueapi/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class ScratchConfig(BlueapiBaseModel):
auto_make_directory: bool = Field(default=False)


class DataWritingConfig(BlueapiBaseModel):
visit_service_url: Optional[str] = None # e.g. "http://localhost:8088/api"
visit_directory: Path = Path("/tmp/0-0")
group_name: str = "example"


class EnvironmentConfig(BlueapiBaseModel):
"""
Config for the RunEngine environment
Expand All @@ -63,11 +69,7 @@ class EnvironmentConfig(BlueapiBaseModel):
Source(kind=SourceKind.PLAN_FUNCTIONS, module="dls_bluesky_core.stubs"),
]
scratch: Optional[ScratchConfig] = Field(default=None)

def __eq__(self, other: object) -> bool:
if isinstance(other, EnvironmentConfig):
return str(self.sources) == str(other.sources)
return False
data_writing: DataWritingConfig = Field(default_factory=DataWritingConfig)


class LoggingConfig(BlueapiBaseModel):
Expand Down
2 changes: 0 additions & 2 deletions src/blueapi/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
is_bluesky_plan_generator,
)
from .context import BlueskyContext
from .device_lookup import inject
from .event import EventPublisher, EventStream

__all__ = [
Expand All @@ -26,7 +25,6 @@
"EventStream",
"DataEvent",
"WatchableStatus",
"inject",
"is_bluesky_compatible_device",
"is_bluesky_plan_generator",
"is_bluesky_compatible_device_type",
Expand Down
4 changes: 4 additions & 0 deletions src/blueapi/core/bluesky_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
WritesExternalAssets,
)
from dls_bluesky_core.core import MsgGenerator, PlanGenerator
from ophyd_async.core import Device as AsyncDevice
from pydantic import BaseModel, Field

from blueapi.utils import BlueapiBaseModel
Expand All @@ -28,6 +29,8 @@
except ImportError:
from typing_extensions import Protocol, runtime_checkable # type: ignore

PlanWrapper = Callable[[MsgGenerator], MsgGenerator]

#: An object that encapsulates the device to do useful things to produce
# data (e.g. move and read)
Device = Union[
Expand All @@ -45,6 +48,7 @@
WritesExternalAssets,
Configurable,
Triggerable,
AsyncDevice,
]

#: Protocols defining interface to hardware
Expand Down
36 changes: 33 additions & 3 deletions src/blueapi/core/context.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import logging
from dataclasses import dataclass, field
from importlib import import_module
Expand All @@ -10,6 +11,7 @@
Generic,
List,
Optional,
Sequence,
Tuple,
Type,
TypeVar,
Expand All @@ -19,19 +21,24 @@
get_type_hints,
)

from bluesky import RunEngine
from bluesky.run_engine import RunEngine, call_in_bluesky_event_loop
from ophyd_async.core import Device as AsyncDevice
from ophyd_async.core import wait_for_connection
from pydantic import create_model
from pydantic.fields import FieldInfo, ModelField

from blueapi.config import EnvironmentConfig, SourceKind
from blueapi.data_management.gda_directory_provider import VisitDirectoryProvider
from blueapi.utils import BlueapiPlanModelConfig, load_module_all

from .bluesky_types import (
BLUESKY_PROTOCOLS,
Device,
HasName,
MsgGenerator,
Plan,
PlanGenerator,
PlanWrapper,
is_bluesky_compatible_device,
is_bluesky_plan_generator,
)
Expand All @@ -51,12 +58,23 @@ class BlueskyContext:
run_engine: RunEngine = field(
default_factory=lambda: RunEngine(context_managers=[])
)
plan_wrappers: Sequence[PlanWrapper] = field(default_factory=list)
plans: Dict[str, Plan] = field(default_factory=dict)
devices: Dict[str, Device] = field(default_factory=dict)
plan_functions: Dict[str, PlanGenerator] = field(default_factory=dict)
directory_provider: Optional[VisitDirectoryProvider] = field(default=None)
sim: bool = field(default=False)

_reference_cache: Dict[Type, Type] = field(default_factory=dict)

def wrap(self, plan: MsgGenerator) -> MsgGenerator:
wrapped_plan = functools.reduce(
lambda wrapped, next_wrapper: next_wrapper(wrapped),
self.plan_wrappers,
plan,
)
yield from wrapped_plan

def find_device(self, addr: Union[str, List[str]]) -> Optional[Device]:
"""
Find a device in this context, allows for recursive search.
Expand Down Expand Up @@ -86,6 +104,18 @@ def with_config(self, config: EnvironmentConfig) -> None:
elif source.kind is SourceKind.DODAL:
self.with_dodal_module(mod)

call_in_bluesky_event_loop(self.connect_devices(self.sim))

async def connect_devices(self, sim: bool = False) -> None:
coros = {}
for device_name, device in self.devices.items():
if isinstance(device, AsyncDevice):
device.set_name(device_name)
coros[device_name] = device.connect(sim)

if len(coros) > 0:
await wait_for_connection(**coros)

def with_plan_module(self, module: ModuleType) -> None:
"""
Register all functions in the module supplied as plans.
Expand Down Expand Up @@ -113,10 +143,10 @@ def plan_2(...) -> MsgGenerator:
def with_device_module(self, module: ModuleType) -> None:
self.with_dodal_module(module)

def with_dodal_module(self, module: ModuleType, **kwargs) -> None:
def with_dodal_module(self, module: ModuleType) -> None:
from dodal.utils import make_all_devices

for device in make_all_devices(module, **kwargs).values():
for device in make_all_devices(module).values():
self.device(device)

def plan(self, plan: PlanGenerator) -> PlanGenerator:
Expand Down
18 changes: 0 additions & 18 deletions src/blueapi/core/device_lookup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,3 @@ def find_component(obj: Any, addr: List[str]) -> Optional[D]:
f"Found {component} in {obj} while searching for {addr} "
"but it is not a device"
)


def inject(name: str):
"""
Function to mark a default argument of a plan method as a reference to a device
that is stored in the Blueapi context.
Bypasses mypy linting, returning x as Any and therefore valid as a default
argument.
Args:
name (str): Name of a device to be fetched from the Blueapi context
Returns:
Any: name but without typing checking, valid as any default type
"""

return name
Empty file.
128 changes: 128 additions & 0 deletions src/blueapi/data_management/gda_directory_provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional

from aiohttp import ClientSession
from ophyd_async.core import DirectoryInfo, DirectoryProvider
from pydantic import BaseModel


class DataCollectionIdentifier(BaseModel):
collectionNumber: int


class VisitServiceClientBase(ABC):
"""
Object responsible for I/O in determining collection number
"""

@abstractmethod
async def create_new_collection(self) -> DataCollectionIdentifier:
...

@abstractmethod
async def get_current_collection(self) -> DataCollectionIdentifier:
...


class VisitServiceClient(VisitServiceClientBase):
_url: str

def __init__(self, url: str) -> None:
self._url = url

async def create_new_collection(self) -> DataCollectionIdentifier:
async with ClientSession() as session:
async with session.post(f"{self._url}/numtracker") as response:
if response.status == 200:
json = await response.json()
return DataCollectionIdentifier.parse_obj(json)
else:
raise Exception(response.status)

async def get_current_collection(self) -> DataCollectionIdentifier:
async with ClientSession() as session:
async with session.get(f"{self._url}/numtracker") as response:
if response.status == 200:
json = await response.json()
return DataCollectionIdentifier.parse_obj(json)
else:
raise Exception(response.status)


class LocalVisitServiceClient(VisitServiceClientBase):
_count: int

def __init__(self) -> None:
self._count = 0

async def create_new_collection(self) -> DataCollectionIdentifier:
self._count += 1
return DataCollectionIdentifier(collectionNumber=self._count)

async def get_current_collection(self) -> DataCollectionIdentifier:
return DataCollectionIdentifier(collectionNumber=self._count)


class VisitDirectoryProvider(DirectoryProvider):
"""
Gets information from a remote service to construct the path that detectors
should write to, and determine how their files should be named.
"""

_data_group_name: str
_data_directory: Path

_client: VisitServiceClientBase
_current_collection: Optional[DirectoryInfo]
_session: Optional[ClientSession]

def __init__(
self,
data_group_name: str,
data_directory: Path,
client: VisitServiceClientBase,
):
self._data_group_name = data_group_name
self._data_directory = data_directory
self._client = client

self._current_collection = None
self._session = None

async def update(self) -> None:
"""
Calls the visit service to create a new data collection in the current visit.
"""
# TODO: After visit service is more feature complete:
# TODO: Allow selecting visit as part of the request to BlueAPI
# TODO: Consume visit information from BlueAPI and pass down to this class
# TODO: Query visit service to get information about visit and data collection
# TODO: Use AuthN information as part of verification with visit service

try:
collection_id_info = await self._client.create_new_collection()
self._current_collection = self._generate_directory_info(collection_id_info)
except Exception as ex:
# TODO: The catch all is needed because the RunEngine will not
# currently handle it, see
# https://github.com/bluesky/bluesky/pull/1623
self._current_collection = None
logging.exception(ex)

def _generate_directory_info(
self,
collection_id_info: DataCollectionIdentifier,
) -> DirectoryInfo:
collection_id = collection_id_info.collectionNumber
file_prefix = f"{self._data_group_name}-{collection_id}"
return DirectoryInfo(str(self._data_directory), file_prefix)

def __call__(self) -> DirectoryInfo:
if self._current_collection is not None:
return self._current_collection
else:
raise ValueError(
"No current collection, update() needs to be called at least once"
)
Loading

0 comments on commit d80eca0

Please sign in to comment.