Skip to content

Commit

Permalink
Move threshold parameters to subcontroller
Browse files Browse the repository at this point in the history
Copy partition util from fastcs_odin
  • Loading branch information
jsouter committed Oct 3, 2024
1 parent 13339d5 commit dea79c4
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 51 deletions.
200 changes: 149 additions & 51 deletions src/fastcs_eiger/eiger_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from collections.abc import Coroutine
from dataclasses import dataclass
from io import BytesIO
from itertools import product
from typing import Any, Literal
from typing import Any, Callable, Literal

import numpy as np
from fastcs.attributes import Attribute, AttrR, AttrRW, AttrW
Expand All @@ -13,6 +12,7 @@
from PIL import Image

from fastcs_eiger.http_connection import HTTPConnection, HTTPRequestError
from fastcs_eiger.util import partition

# Keys to be ignored when introspecting the detector to create parameters
IGNORED_KEYS = [
Expand Down Expand Up @@ -82,7 +82,7 @@ async def put(self, controller: "EigerController", _: AttrW, value: Any) -> None
f"Fetching parameters after setting {self.uri}: {parameters_to_update}"
)

await controller.queue_update(parameters_to_update)
await controller.queue_update(parameters_to_update) # TODO: this should be the top level controller??

async def update(self, controller: "EigerController", attr: AttrR) -> None:
try:
Expand Down Expand Up @@ -151,6 +151,7 @@ def uri(self) -> str:
EIGER_PARAMETER_MODES = EigerParameter.__annotations__["mode"].__args__


# Flatten nested uri keys - e.g. threshold/1/mode -> threshold_1_mode
def _key_to_attribute_name(key: str):
return key.replace("/", "_")

Expand Down Expand Up @@ -195,9 +196,13 @@ async def initialise(self) -> None:

try:
for subsystem in EIGER_PARAMETER_SUBSYSTEMS:
controller = EigerSubsystemController(
subsystem_cls = (
EigerDetectorController if subsystem == "detector"
else EigerSubsystemController
)
controller = subsystem_cls(
subsystem, self.connection, self._parameter_update_lock)
self.register_sub_controller(subsystem, controller)
self.register_sub_controller(subsystem.upper(), controller)
await controller.initialise()
except HTTPRequestError:
print("\nAn HTTP request failed while introspecting detector:\n")
Expand Down Expand Up @@ -256,18 +261,64 @@ async def handle_monitor(self):
print(np.array(image))


def _create_attributes(parameters: list[EigerParameter],
attr_namer: Callable[[str], str] | None = None,
group_namer: Callable[[EigerParameter], str] | None = None):
"""Create ``Attribute``s from ``EigerParameter``s.
Args:
parameters: ``EigerParameter``s to create ``Attributes`` from
"""
attributes: dict[str, Attribute] = {}
for parameter in parameters:
group = group_namer(parameter) if group_namer else parameter.mode.capitalize()
match parameter.response["value_type"]:
case "float":
datatype = Float()
case "int" | "uint":
datatype = Int()
case "bool":
datatype = Bool()
case "string" | "datetime" | "State" | "string[]":
datatype = String()
case _:
print(f"Failed to handle {parameter}")

attribute_name = attr_namer(parameter.key) if attr_namer else parameter.key

match parameter.response["access_mode"]:
case "r":
attributes[attribute_name] = AttrR(
datatype,
handler=EIGER_HANDLERS[parameter.mode](parameter.uri),
group=group,
)
case "rw":
attributes[attribute_name] = AttrRW(
datatype,
handler=EIGER_HANDLERS[parameter.mode](parameter.uri),
group=group,
allowed_values=parameter.response.get("allowed_values", None),
)

return attributes


class EigerSubsystemController(SubController):

stale_parameters = AttrR(Bool())
_subcontroller_mapping: dict[str, "EigerSubController"]

def __init__(self, subsystem: str, connection: HTTPConnection, lock: asyncio.Lock):
self._subsystem = subsystem
self.connection = connection
self._parameter_update_lock = lock
self._parameter_updates: set[str] = set()
self._subcontroller_mapping = {}
super().__init__()

async def _introspect_detector_subsystem(self) -> list[EigerParameter]:
async def __introspect_detector_subsystem(self) -> list[EigerParameter]:
parameters = []
for mode in EIGER_PARAMETER_MODES:
subsystem_keys = [
Expand Down Expand Up @@ -295,53 +346,16 @@ async def _introspect_detector_subsystem(self) -> list[EigerParameter]:
return parameters

async def initialise(self) -> None:
parameters = await self._introspect_detector_subsystem()
attributes = self._create_attributes(parameters)
parameters = await self.__introspect_detector_subsystem()
parameters = await self._create_subcontrollers(parameters)
attributes = _create_attributes(parameters, _key_to_attribute_name)

for name, attribute in attributes.items():
setattr(self, name, attribute)

def _create_attributes(self, parameters: list[EigerParameter]):
"""Create ``Attribute``s from ``EigerParameter``s.
Args:
parameters: ``EigerParameter``s to create ``Attributes`` from
"""
attributes: dict[str, Attribute] = {}
for parameter in parameters:
group = f"{parameter.subsystem.capitalize()}{parameter.mode.capitalize()}"
match parameter.response["value_type"]:
case "float":
datatype = Float()
case "int" | "uint":
datatype = Int()
case "bool":
datatype = Bool()
case "string" | "datetime" | "State" | "string[]":
datatype = String()
case _:
print(f"Failed to handle {parameter}")

# Flatten nested uri keys - e.g. threshold/1/mode -> threshold_1_mode
attribute_name = _key_to_attribute_name(parameter.key)

match parameter.response["access_mode"]:
case "r":
attributes[attribute_name] = AttrR(
datatype,
handler=EIGER_HANDLERS[parameter.mode](parameter.uri),
group=group,
)
case "rw":
attributes[attribute_name] = AttrRW(
datatype,
handler=EIGER_HANDLERS[parameter.mode](parameter.uri),
group=group,
allowed_values=parameter.response.get("allowed_values", None),
)

return attributes
async def _create_subcontrollers(self, parameters: list[EigerParameter]):
# TODO: write proper docstring with args and returns
return parameters

async def queue_update(self, parameters: list[str]):
"""Add the given parameters to the list of parameters to update.
Expand Down Expand Up @@ -370,12 +384,96 @@ async def update(self):
for parameter in parameters:
if parameter in IGNORED_KEYS:
continue
attr_name = _key_to_attribute_name(parameter)
match getattr(self, attr_name, None):
match self.get_attribute(parameter):
# TODO: mypy doesn't understand AttrR as a type for some reason:
# `error: Expected type in class pattern; found "Any" [misc]`
case AttrR(updater=EigerConfigHandler() as updater) as attr: # type: ignore [misc]
parameter_updates.append(updater.config_update(self, attr))
case _ as attr:
print(f"Failed to handle update for {parameter}: {attr}")
await asyncio.gather(*parameter_updates)

def get_attribute(self, key: str):
attr_name = _key_to_attribute_name(key)
if attr := getattr(self, attr_name, None):
# attribute belongs to controller
return attr
elif key in self._subcontroller_mapping:
controller = self._subcontroller_mapping[key]
return controller.get_attribute(key)


class EigerDetectorController(EigerSubsystemController):
async def _create_subcontrollers(self, parameters: list[EigerParameter]):

def __threshold_parameter(parameter: EigerParameter):
return "threshold" in parameter.key

threshold_parameters, parameters = partition(
parameters, __threshold_parameter
)


threshold_controller = EigerThresholdController(threshold_parameters, self)
self.register_sub_controller("THRESHOLD", threshold_controller)
await threshold_controller.initialise()

for parameter in threshold_parameters:
self._subcontroller_mapping[parameter.key] = threshold_controller

return parameters


class EigerSubController(SubController): # for smaller parts of submodules
def __init__(self, parameters: list[EigerParameter], parent: EigerSubsystemController):
self._parameters = parameters
self.__parent = parent
self.connection = parent.connection
self._attribute_mapping: dict[str, AttrR] = {}
super().__init__()

async def initialise(self):
attributes = _create_attributes(self._parameters, _key_to_attribute_name)
for name, attribute in attributes.items():
setattr(self, name, attribute)

async def queue_update(self, parameters: list[str]):
await self.__parent.queue_update(parameters)

def get_attribute(self, key: str):
if key in self._attribute_mapping:
return self._attribute_mapping[key]
else:
attr_name = _key_to_attribute_name(key)
return getattr(self, attr_name, None)


class EigerThresholdController(EigerSubController):
async def initialise(self):

def __is_index(parameter: EigerParameter):
parts = parameter.key.split("/")
return len(parts) == 3 and parts[1].isnumeric()

index_parameters, other_parameters = partition(
self._parameters, __is_index
)

def __idx_group_name(parameter: EigerParameter) -> str:
return "Threshold" + parameter.key.split("/")[1]
# return parameter.key

index_attributes = _create_attributes(
index_parameters, group_namer=__idx_group_name)
for name, attribute in index_attributes.items():
_, index, field = name.split("/")
attr_name = f"{field}_{index}"
setattr(self, attr_name, attribute)
self._attribute_mapping[name] = attribute

other_attributes = _create_attributes(other_parameters)
for name, attribute in other_attributes.items():
attr_name = _key_to_attribute_name(name)
attr_name = attr_name.removeprefix("threshold_")
setattr(self, attr_name, attribute)
self._attribute_mapping[name] = attribute
30 changes: 30 additions & 0 deletions src/fastcs_eiger/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Callable, TypeVar

T = TypeVar("T")


def partition(
elements: list[T], predicate: Callable[[T], bool]
) -> tuple[list[T], list[T]]:
"""Split a list of elements in two based on predicate.
If the predicate returns ``True``, the element will be placed in the truthy list,
if it does not, it will be placed in the falsy list.
Args:
elements: List of T
predicate: Predicate to filter the list with
Returns:
(truthy, falsy)
"""
truthy: list[T] = []
falsy: list[T] = []
for parameter in elements:
if predicate(parameter):
truthy.append(parameter)
else:
falsy.append(parameter)

return truthy, falsy

0 comments on commit dea79c4

Please sign in to comment.