Skip to content

Commit

Permalink
Move ADAravis detectors to ophyd-async
Browse files Browse the repository at this point in the history
  • Loading branch information
DiamondJoseph committed Apr 10, 2024
1 parent 1936186 commit 4758850
Show file tree
Hide file tree
Showing 5 changed files with 446 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/ophyd_async/epics/areadetector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .aravis import ADAravisDetector
from .single_trigger_det import SingleTriggerDet
from .utils import (
FileWriteMode,
Expand All @@ -9,6 +10,7 @@
)

__all__ = [
"ADAravisDetector",
"SingleTriggerDet",
"FileWriteMode",
"ImageMode",
Expand Down
71 changes: 71 additions & 0 deletions src/ophyd_async/epics/areadetector/aravis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from typing import get_args

from bluesky.protocols import HasHints, Hints
from ophyd_async.core import (
DirectoryProvider,
StandardDetector,
TriggerInfo,
)
from ophyd_async.epics.areadetector.controllers.aravis_controller import (
ADAravisController,
)
from ophyd_async.epics.areadetector.drivers import ADBaseShapeProvider
from ophyd_async.epics.areadetector.drivers.aravis_driver import ADAravisDriver
from ophyd_async.epics.areadetector.writers import HDFWriter, NDFileHDF


class ADAravisDetector(StandardDetector, HasHints):
"""
Ophyd-async implementation of an ADAravis Detector.
The detector may be configured for an external trigger on a GPIO port,
which must be done prior to preparing the detector
"""

_controller: ADAravisController
_writer: HDFWriter

def __init__(
self,
prefix: str,
directory_provider: DirectoryProvider,
name: str,
gpio_number: ADAravisController.GPIO_NUMBER = 1,
**scalar_sigs: str,
):
# Must be child of Detector to pick up connect()
self.drv = ADAravisDriver(prefix + "DET:")
self.hdf = NDFileHDF(prefix + "HDF5:")

super().__init__(
ADAravisController(self.drv, gpio_number=gpio_number),
HDFWriter(
self.hdf,
directory_provider,
lambda: self.name,
ADBaseShapeProvider(self.drv),
**scalar_sigs,
),
config_sigs=(self.drv.acquire_time, self.drv.acquire),
name=name,
)

async def _prepare(self, value: TriggerInfo) -> None:
await self._controller._fetch_deadtime()
await super()._prepare(value)

def get_external_trigger_gpio(self):
return self._controller.gpio_number

def set_external_trigger_gpio(self, gpio_number: ADAravisController.GPIO_NUMBER):
supported_gpio_numbers = get_args(ADAravisController.GPIO_NUMBER)
if gpio_number not in supported_gpio_numbers:
raise ValueError(
f"{self.__class__.__name__} only supports the following GPIO "
f"indices: {supported_gpio_numbers} but was asked to "
f"use {gpio_number}"
)
self._controller.gpio_number = gpio_number

@property
def hints(self) -> Hints:
return self._writer.hints
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import asyncio
from typing import Literal, Optional, Tuple

from ophyd_async.core import (
AsyncStatus,
DetectorControl,
DetectorTrigger,
set_and_wait_for_value,
)
from ophyd_async.epics.areadetector.drivers.aravis_driver import (
ADAravisDriver,
ADAravisTriggerMode,
ADAravisTriggerSource,
)
from ophyd_async.epics.areadetector.utils import (
ImageMode,
stop_busy_record,
)


class ADAravisController(DetectorControl):
GPIO_NUMBER = Literal[1, 2, 3, 4]

def __init__(self, driver: ADAravisDriver, gpio_number: GPIO_NUMBER) -> None:
self._drv = driver
self.gpio_number = gpio_number
self.dead_time: Optional[float] = None

def get_deadtime(self, exposure: float) -> float:
return self.dead_time or 0

async def _fetch_deadtime(self) -> None:
self.dead_time = await self._drv._fetch_deadtime()

async def arm(
self,
num: int = 0,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
if num == 0:
image_mode = ImageMode.continuous
else:
image_mode = ImageMode.multiple
if exposure is not None:
await self._drv.acquire_time.set(exposure)

trigger_mode, trigger_source = self._get_trigger_info(trigger)
# trigger mode must be set first and on it's own!
await self._drv.trigger_mode.set(trigger_mode)

await asyncio.gather(
self._drv.trigger_source.set(trigger_source),
self._drv.num_images.set(num),
self._drv.image_mode.set(image_mode),
)

status = await set_and_wait_for_value(self._drv.acquire, True)
return status

def _get_trigger_info(
self, trigger: DetectorTrigger
) -> Tuple[ADAravisTriggerMode, ADAravisTriggerSource]:
supported_trigger_types = (
DetectorTrigger.constant_gate,
DetectorTrigger.edge_trigger,
)
if trigger not in supported_trigger_types:
raise ValueError(
f"{self.__class__.__name__} only supports the following trigger "
f"types: {supported_trigger_types} but was asked to "
f"use {trigger}"
)
if trigger == DetectorTrigger.internal:
return ADAravisTriggerMode.off, ADAravisTriggerSource.fixed_rate
else:
return (
ADAravisTriggerMode.on,
ADAravisTriggerSource(f"line_{self.gpio_number}"),
)

async def disarm(self):
await stop_busy_record(self._drv.acquire, False, timeout=1)
156 changes: 156 additions & 0 deletions src/ophyd_async/epics/areadetector/drivers/aravis_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
from enum import Enum
from math import nan
from typing import Callable, Dict, Tuple

from ophyd_async.epics.areadetector.drivers import ADBase
from ophyd_async.epics.areadetector.utils import (
ad_r,
ad_rw,
)


class ADAravisTriggerMode(str, Enum):
"""GigEVision GenICAM standard: on=externally triggered"""

on = "On"
off = "Off"


class ADAravisTriggerSource(str, Enum):
# While not all enum elements may be physically supported by the hardware,
# DB record templates suggest they are valid options for underlying record
# cite: https://github.com/areaDetector/ADGenICam/tree/master/GenICamApp/Db
# (e.g. Mako G-125B TriggerSource has Line1-4)
freerun = "Freerun"
fixed_rate = "FixedRate"
software = "Software"
action_0 = "Action0"
action_1 = "Action1"
# Externally triggered on GPIO N
line_1 = "Line1"
line_2 = "Line2"
line_3 = "Line3"
line_4 = "Line4"


def _reverse_lookup(model_deadtimes: Dict[float, Tuple[str, ...]]) -> Callable[[str], float]:
def inner(pixel_format: str) -> float:
for deadtime, pixel_formats in model_deadtimes.items():
if pixel_format in pixel_formats:
return deadtime
return nan

return inner


_deadtimes: Dict[str, Callable[[str], float]] = {
# cite: https://cdn.alliedvision.com/fileadmin/content/documents/products/cameras/Manta/techman/Manta_TechMan.pdf retrieved 2024-04-05
"Manta G-125": lambda _: 63e-6,
"Manta G-145": lambda _: 106e-6,
"Manta G-235": _reverse_lookup(
{
118e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12",
"BayerRG12Packed",
"YUV411Packed",
),
256e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
390e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
"Manta G-895": _reverse_lookup(
{
404e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12Packed",
"YUV411Packed",
),
542e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
822e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
"Manta G-2460": _reverse_lookup(
{
979e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12Packed",
"YUV411Packed",
),
1304e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
1961e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
# cite: https://cdn.alliedvision.com/fileadmin/content/documents/products/cameras/various/appnote/GigE/GigE-Cameras_AppNote_PIV-Min-Time-Between-Exposures.pdf retrieved 2024-04-05
"Manta G-609": lambda _: 47e-6,
# cite: https://cdn.alliedvision.com/fileadmin/content/documents/products/cameras/Mako/techman/Mako_TechMan_en.pdf retrieved 2024-04-05
"Mako G-040": _reverse_lookup(
{
101e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12Packed",
"YUV411Packed",
),
140e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
217e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
"Mako G-125": lambda _: 70e-6,
# Assume 12 bits: 10 bits = 275e-6
"Mako G-234": _reverse_lookup(
{
356e-6: (
"Mono8",
"BayerRG8",
"BayerRG12",
"BayerRG12Packed",
"YUV411Packed",
"YUV422Packed",
),
# Assume 12 bits: 10 bits = 563e-6
726e-6: ("RGB8Packed", "BRG8Packed", "YUV444Packed"),
}
),
"Mako G-507": _reverse_lookup(
{
270e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12Packed",
"YUV411Packed",
),
363e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
554e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
}


class ADAravisDriver(ADBase):
# If instantiating a new instance, ensure it is supported in the _deadtimes dict
"""Generic Driver supporting the Manta and Mako drivers.
Requires driver firmware up to date such that the Model_RBV is of the form "^(Mako|Manta) (model)$"
Fetches deadtime prior to use in a Streaming scan.
"""

def __init__(self, prefix: str, name: str = "") -> None:
self.trigger_mode = ad_rw(ADAravisTriggerMode, prefix + "TriggerMode")
self.trigger_source = ad_rw(ADAravisTriggerSource, prefix + "TriggerSource")
self.model = ad_r(str, prefix + "Model")
self.pixel_format = ad_rw(str, prefix + "PixelFormat")
super().__init__(prefix, name=name)

async def _fetch_deadtime(self) -> float:
# All known in-use version B/C have same deadtime as non-B/C
model: str = (await self.model.get_value()).removesuffix("B").removesuffix("C")
pixel_format: str = await self.pixel_format.get_value()
return _deadtimes.get(model, lambda _: nan)(pixel_format)
Loading

0 comments on commit 4758850

Please sign in to comment.