Skip to content

Commit

Permalink
Add Aravis AD implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DiamondJoseph committed Apr 22, 2024
1 parent 5e4e2a7 commit 84aa40a
Show file tree
Hide file tree
Showing 5 changed files with 547 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/ophyd_async/epics/areadetector/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .aravis import AravisDetector
from .pilatus import PilatusDetector
from .single_trigger_det import SingleTriggerDet
from .utils import (
Expand All @@ -10,6 +11,7 @@
)

__all__ = [
"AravisDetector",
"SingleTriggerDet",
"FileWriteMode",
"ImageMode",
Expand Down
69 changes: 69 additions & 0 deletions src/ophyd_async/epics/areadetector/aravis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from typing import get_args

from bluesky.protocols import HasHints, Hints

from ophyd_async.core import DirectoryProvider, StandardDetector, TriggerInfo
from ophyd_async.epics.areadetector.controllers.aravis_controller import (
AravisController,
)
from ophyd_async.epics.areadetector.drivers import ADBaseShapeProvider
from ophyd_async.epics.areadetector.drivers.aravis_driver import AravisDriver
from ophyd_async.epics.areadetector.writers import HDFWriter, NDFileHDF


class AravisDetector(StandardDetector, HasHints):
"""
Ophyd-async implementation of an ADAravis Detector.
The detector may be configured for an external trigger on a GPIO port,
which must be done prior to preparing the detector
"""

_controller: AravisController
_writer: HDFWriter

def __init__(
self,
name: str,
directory_provider: DirectoryProvider,
driver: AravisDriver,
hdf: NDFileHDF,
gpio_number: AravisController.GPIO_NUMBER = 1,
**scalar_sigs: str,
):
# Must be child of Detector to pick up connect()
self.drv = driver
self.hdf = hdf

super().__init__(
AravisController(self.drv, gpio_number=gpio_number),
HDFWriter(
self.hdf,
directory_provider,
lambda: self.name,
ADBaseShapeProvider(self.drv),
**scalar_sigs,
),
config_sigs=(self.drv.acquire_time, self.drv.acquire),
name=name,
)

async def _prepare(self, value: TriggerInfo) -> None:
await self.drv.fetch_deadtime()
await super()._prepare(value)

def get_external_trigger_gpio(self):
return self._controller.gpio_number

def set_external_trigger_gpio(self, gpio_number: AravisController.GPIO_NUMBER):
supported_gpio_numbers = get_args(AravisController.GPIO_NUMBER)
if gpio_number not in supported_gpio_numbers:
raise ValueError(
f"{self.__class__.__name__} only supports the following GPIO "
f"indices: {supported_gpio_numbers} but was asked to "
f"use {gpio_number}"
)
self._controller.gpio_number = gpio_number

@property
def hints(self) -> Hints:
return self._writer.hints
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import asyncio
from typing import Literal, Optional, Tuple

from ophyd_async.core import (
AsyncStatus,
DetectorControl,
DetectorTrigger,
set_and_wait_for_value,
)
from ophyd_async.epics.areadetector.drivers.aravis_driver import (
AravisDriver,
AravisTriggerMode,
AravisTriggerSource,
)
from ophyd_async.epics.areadetector.utils import ImageMode, stop_busy_record


class AravisController(DetectorControl):
GPIO_NUMBER = Literal[1, 2, 3, 4]

def __init__(self, driver: AravisDriver, gpio_number: GPIO_NUMBER) -> None:
self._drv = driver
self.gpio_number = gpio_number

def get_deadtime(self, exposure: float) -> float:
return self._drv.dead_time or 0

async def arm(
self,
num: int = 0,
trigger: DetectorTrigger = DetectorTrigger.internal,
exposure: Optional[float] = None,
) -> AsyncStatus:
if num == 0:
image_mode = ImageMode.continuous
else:
image_mode = ImageMode.multiple
if exposure is not None:
await self._drv.acquire_time.set(exposure)

trigger_mode, trigger_source = self._get_trigger_info(trigger)
# trigger mode must be set first and on it's own!
await self._drv.trigger_mode.set(trigger_mode)

await asyncio.gather(
self._drv.trigger_source.set(trigger_source),
self._drv.num_images.set(num),
self._drv.image_mode.set(image_mode),
)

status = await set_and_wait_for_value(self._drv.acquire, True)
return status

def _get_trigger_info(
self, trigger: DetectorTrigger
) -> Tuple[AravisTriggerMode, AravisTriggerSource]:
supported_trigger_types = (
DetectorTrigger.constant_gate,
DetectorTrigger.edge_trigger,
)
if trigger not in supported_trigger_types:
raise ValueError(
f"{self.__class__.__name__} only supports the following trigger "
f"types: {supported_trigger_types} but was asked to "
f"use {trigger}"
)
if trigger == DetectorTrigger.internal:
return AravisTriggerMode.off, AravisTriggerSource.fixed_rate
else:
return (
AravisTriggerMode.on,
AravisTriggerSource[f"line_{self.gpio_number}"],
)

async def disarm(self):
await stop_busy_record(self._drv.acquire, False, timeout=1)
165 changes: 165 additions & 0 deletions src/ophyd_async/epics/areadetector/drivers/aravis_driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
from enum import Enum
from typing import Callable, Dict, Optional, Tuple

from ophyd_async.epics.areadetector.drivers import ADBase
from ophyd_async.epics.areadetector.utils import ad_r, ad_rw


class AravisTriggerMode(str, Enum):
"""GigEVision GenICAM standard: on=externally triggered"""

on = "On"
off = "Off"


class AravisTriggerSource(str, Enum):
"""A minimal set of TriggerSources that must be supported by the underlying record.
To enable hardware triggered scanning, line_N must support each N in GPIO_NUMBER.
To enable software triggered scanning, freerun must be supported.
Other enumerated values may or may not be preset.
To prevent requiring one Enum class per possible configuration, we set as this Enum
but read from the underlying signal as a str.
"""

freerun = "Freerun"
# While not all enum elements may be physically supported by the hardware,
# DB record templates suggest they are valid options for underlying record
# cite: https://github.com/areaDetector/ADGenICam/tree/master/GenICamApp/Db
# (e.g. Mako G-125B TriggerSource has Line1-4)
# Externally triggered on GPIO N
line_1 = "Line1"
line_2 = "Line2"
line_3 = "Line3"
line_4 = "Line4"


def _reverse_lookup(
model_deadtimes: Dict[float, Tuple[str, ...]],
) -> Callable[[str], float]:
def inner(pixel_format: str, model_name: str) -> float:
for deadtime, pixel_formats in model_deadtimes.items():
if pixel_format in pixel_formats:
return deadtime
raise ValueError(
f"Model {model_name} does not have a defined deadtime "
f"for pixel format {pixel_format}"
)

return inner


_deadtimes: Dict[str, Callable[[str, str], float]] = {
# cite: https://cdn.alliedvision.com/fileadmin/content/documents/products/cameras/Manta/techman/Manta_TechMan.pdf retrieved 2024-04-05 # noqa: E501
"Manta G-125": lambda _, __: 63e-6,
"Manta G-145": lambda _, __: 106e-6,
"Manta G-235": _reverse_lookup(
{
118e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12",
"BayerRG12Packed",
"YUV411Packed",
),
256e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
390e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
"Manta G-895": _reverse_lookup(
{
404e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12Packed",
"YUV411Packed",
),
542e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
822e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
"Manta G-2460": _reverse_lookup(
{
979e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12Packed",
"YUV411Packed",
),
1304e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
1961e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
# cite: https://cdn.alliedvision.com/fileadmin/content/documents/products/cameras/various/appnote/GigE/GigE-Cameras_AppNote_PIV-Min-Time-Between-Exposures.pdf retrieved 2024-04-05 # noqa: E501
"Manta G-609": lambda _, __: 47e-6,
# cite: https://cdn.alliedvision.com/fileadmin/content/documents/products/cameras/Mako/techman/Mako_TechMan_en.pdf retrieved 2024-04-05 # noqa: E501
"Mako G-040": _reverse_lookup(
{
101e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12Packed",
"YUV411Packed",
),
140e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
217e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
"Mako G-125": lambda _, __: 70e-6,
# Assume 12 bits: 10 bits = 275e-6
"Mako G-234": _reverse_lookup(
{
356e-6: (
"Mono8",
"BayerRG8",
"BayerRG12",
"BayerRG12Packed",
"YUV411Packed",
"YUV422Packed",
),
# Assume 12 bits: 10 bits = 563e-6
726e-6: ("RGB8Packed", "BRG8Packed", "YUV444Packed"),
}
),
"Mako G-507": _reverse_lookup(
{
270e-6: (
"Mono8",
"Mono12Packed",
"BayerRG8",
"BayerRG12Packed",
"YUV411Packed",
),
363e-6: ("Mono12", "BayerRG12", "YUV422Packed"),
554e-6: ("RGB8Packed", "BGR8Packed", "YUV444Packed"),
}
),
}


class AravisDriver(ADBase):
# If instantiating a new instance, ensure it is supported in the _deadtimes dict
"""Generic Driver supporting the Manta and Mako drivers.
Fetches deadtime prior to use in a Streaming scan.
Requires driver firmware up to date:
- Model_RBV must be of the form "^(Mako|Manta) (model)$"
"""

def __init__(self, prefix: str, name: str = "") -> None:
self.trigger_mode = ad_rw(AravisTriggerMode, prefix + "TriggerMode")
self.trigger_source = ad_rw(AravisTriggerSource, prefix + "TriggerSource", str)
self.model = ad_r(str, prefix + "Model")
self.pixel_format = ad_rw(str, prefix + "PixelFormat")
self.dead_time: Optional[float] = None
super().__init__(prefix, name=name)

async def fetch_deadtime(self) -> None:
# All known in-use version B/C have same deadtime as non-B/C
model: str = (await self.model.get_value()).removesuffix("B").removesuffix("C")
if model not in _deadtimes:
raise ValueError(f"Model {model} does not have defined deadtimes")
pixel_format: str = await self.pixel_format.get_value()
self.dead_time = _deadtimes.get(model)(pixel_format, model)
Loading

0 comments on commit 84aa40a

Please sign in to comment.