Skip to content

Commit

Permalink
Move queue_update logic to new Subsystem class
Browse files Browse the repository at this point in the history
  • Loading branch information
jsouter committed Oct 3, 2024
1 parent dea79c4 commit 1a4ee6c
Showing 1 changed file with 106 additions and 72 deletions.
178 changes: 106 additions & 72 deletions src/fastcs_eiger/eiger_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,30 @@ class EigerHandler:
uri: str
update_period: float = 0.2

async def put(self, controller: "EigerController", _: AttrW, value: Any) -> None:
async def put(
self,
controller: "EigerSubsystemController | EigerSubController",
_: AttrW,
value: Any,
) -> None:
parameters_to_update = await controller.connection.put(self.uri, value)
if not parameters_to_update:
parameters_to_update = [self.uri.split("/", 4)[-1]]
print(f"Manually fetching parameter {parameters_to_update}")
elif "difference_mode" in parameters_to_update:
parameters_to_update[
parameters_to_update.index("difference_mode")
] = "threshold/difference/mode"
parameters_to_update[parameters_to_update.index("difference_mode")] = (
"threshold/difference/mode"
)
print(
f"Fetching parameters after setting {self.uri}: {parameters_to_update},"
" replacing incorrect key 'difference_mode'")
" replacing incorrect key 'difference_mode'"
)
else:
print(
f"Fetching parameters after setting {self.uri}: {parameters_to_update}"
)

await controller.queue_update(parameters_to_update) # TODO: this should be the top level controller??
await controller.subsystem.queue_update(parameters_to_update)

async def update(self, controller: "EigerController", attr: AttrR) -> None:
try:
Expand Down Expand Up @@ -176,9 +182,9 @@ def __init__(self, ip: str, port: int) -> None:
self._ip = ip
self._port = port
self.connection = HTTPConnection(self._ip, self._port)

self.subsystems: dict[str, Subsystem] = {}
# Parameter update logic
self._parameter_update_lock = asyncio.Lock()
self.parameter_update_lock = asyncio.Lock()

async def initialise(self) -> None:
"""Create attributes by introspecting detector.
Expand All @@ -195,14 +201,16 @@ async def initialise(self) -> None:
await self.initialize()

try:
for subsystem in EIGER_PARAMETER_SUBSYSTEMS:
subsystem_cls = (
EigerDetectorController if subsystem == "detector"
for subsystem_name in EIGER_PARAMETER_SUBSYSTEMS:
subsystem = Subsystem(subsystem_name, self.parameter_update_lock)
self.subsystems[subsystem_name] = subsystem
subsystem_controller_cls = (
EigerDetectorController
if subsystem_name == "detector"
else EigerSubsystemController
)
controller = subsystem_cls(
subsystem, self.connection, self._parameter_update_lock)
self.register_sub_controller(subsystem.upper(), controller)
controller = subsystem_controller_cls(subsystem, self.connection)
self.register_sub_controller(subsystem_name.upper(), controller)
await controller.initialise()
except HTTPRequestError:
print("\nAn HTTP request failed while introspecting detector:\n")
Expand Down Expand Up @@ -242,7 +250,8 @@ async def cancel(self):
async def update(self):
"""Periodically check for parameters that need updating from the detector."""
await self.stale_parameters.set(
any(c.stale_parameters.get() for c in self.get_sub_controllers().values()))
any(c.stale_parameters.get() for c in self.get_sub_controllers().values())
)
controller_updates = [c.update() for c in self.get_sub_controllers().values()]
await asyncio.gather(*controller_updates)

Expand All @@ -261,9 +270,11 @@ async def handle_monitor(self):
print(np.array(image))


def _create_attributes(parameters: list[EigerParameter],
attr_namer: Callable[[str], str] | None = None,
group_namer: Callable[[EigerParameter], str] | None = None):
def _create_attributes(
parameters: list[EigerParameter],
attr_namer: Callable[[str], str] | None = None,
group_namer: Callable[[EigerParameter], str] | None = None,
):
"""Create ``Attribute``s from ``EigerParameter``s.
Args:
Expand Down Expand Up @@ -305,79 +316,87 @@ def _create_attributes(parameters: list[EigerParameter],
return attributes


class EigerSubsystemController(SubController):

stale_parameters = AttrR(Bool())
_subcontroller_mapping: dict[str, "EigerSubController"]

def __init__(self, subsystem: str, connection: HTTPConnection, lock: asyncio.Lock):
self._subsystem = subsystem
self.connection = connection
self._parameter_update_lock = lock
self._parameter_updates: set[str] = set()
self._subcontroller_mapping = {}
super().__init__()
class Subsystem:
def __init__(
self, name: Literal["detector", "stream", "monitor"], lock: asyncio.Lock
):
self.name = name
self.parameter_update_lock = lock
self.parameter_updates: set[str] = set()
self.stale = False

async def __introspect_detector_subsystem(self) -> list[EigerParameter]:
async def introspect_detector_subsystem(
self, connection: HTTPConnection
) -> list[EigerParameter]:
parameters = []
for mode in EIGER_PARAMETER_MODES:
subsystem_keys = [
parameter
for parameter in await self.connection.get(
f"{self._subsystem}/api/1.8.0/{mode}/keys"
for parameter in await connection.get(
f"{self.name}/api/1.8.0/{mode}/keys"
)
if parameter not in IGNORED_KEYS
] + MISSING_KEYS[self._subsystem][mode]
] + MISSING_KEYS[self.name][mode]
requests = [
self.connection.get(f"{self._subsystem}/api/1.8.0/{mode}/{key}")
connection.get(f"{self.name}/api/1.8.0/{mode}/{key}")
for key in subsystem_keys
]
responses = await asyncio.gather(*requests)

parameters.extend(
[
EigerParameter(
key=key, subsystem=self._subsystem, mode=mode, response=response
key=key, subsystem=self.name, mode=mode, response=response
)
for key, response in zip(subsystem_keys, responses, strict=False)
]
)

return parameters

async def initialise(self) -> None:
parameters = await self.__introspect_detector_subsystem()
parameters = await self._create_subcontrollers(parameters)
attributes = _create_attributes(parameters, _key_to_attribute_name)

for name, attribute in attributes.items():
setattr(self, name, attribute)

async def _create_subcontrollers(self, parameters: list[EigerParameter]):
# TODO: write proper docstring with args and returns
return parameters

async def queue_update(self, parameters: list[str]):
"""Add the given parameters to the list of parameters to update.
Args:
parameters: Parameters to be updated
"""
async with self._parameter_update_lock:
async with self.parameter_update_lock:
for parameter in parameters:
self._parameter_updates.add(parameter)
self.parameter_updates.add(parameter)

self.stale = True


class EigerSubsystemController(SubController):

stale_parameters = AttrR(Bool())
_subcontroller_mapping: dict[str, "EigerSubController"]

def __init__(self, subsystem: Subsystem, connection: HTTPConnection):
self.subsystem = subsystem
self.connection = connection
self._subcontroller_mapping = {}
super().__init__()

async def initialise(self) -> None:
parameters = await self.subsystem.introspect_detector_subsystem(self.connection)
parameters = await self._create_subcontrollers(parameters)
attributes = _create_attributes(parameters, _key_to_attribute_name)

await self.stale_parameters.set(True)
for name, attribute in attributes.items():
setattr(self, name, attribute)

async def update(self):
if not self._parameter_updates:
if self.stale_parameters.get():
await self.stale_parameters.set(False)
if not self.subsystem.parameter_updates:
if self.subsystem.stale:
self.subsystem.stale = False

async with self._parameter_update_lock:
parameters = self._parameter_updates.copy()
self._parameter_updates.clear()
await self.stale_parameters.set(self.subsystem.stale)

async with self.subsystem.parameter_update_lock:
parameters = self.subsystem.parameter_updates.copy()
self.subsystem.parameter_updates.clear()

# Release lock while fetching parameters - this may be slow
parameter_updates: list[Coroutine] = []
Expand All @@ -393,6 +412,21 @@ async def update(self):
print(f"Failed to handle update for {parameter}: {attr}")
await asyncio.gather(*parameter_updates)

async def _create_subcontrollers(self, parameters: list[EigerParameter]):
"""Create and register subcontrollers to logically group attributes within
the subsystem.
Args:
parameters: list of ``EigerParameter``s to be filtered and passed into
``EigerSubController``s
Returns:
a list of ``EigerParameter``s which is the same as that passed in as
argument with the parameters moved to an ``EigerSubController`` removed
"""
return parameters

def get_attribute(self, key: str):
attr_name = _key_to_attribute_name(key)
if attr := getattr(self, attr_name, None):
Expand All @@ -409,12 +443,11 @@ async def _create_subcontrollers(self, parameters: list[EigerParameter]):
def __threshold_parameter(parameter: EigerParameter):
return "threshold" in parameter.key

threshold_parameters, parameters = partition(
parameters, __threshold_parameter
)

threshold_parameters, parameters = partition(parameters, __threshold_parameter)

threshold_controller = EigerThresholdController(threshold_parameters, self)
threshold_controller = EigerThresholdController(
threshold_parameters, self.connection, self.subsystem
)
self.register_sub_controller("THRESHOLD", threshold_controller)
await threshold_controller.initialise()

Expand All @@ -425,10 +458,15 @@ def __threshold_parameter(parameter: EigerParameter):


class EigerSubController(SubController): # for smaller parts of submodules
def __init__(self, parameters: list[EigerParameter], parent: EigerSubsystemController):
def __init__(
self,
parameters: list[EigerParameter],
connection: HTTPConnection,
subsystem: Subsystem,
):
self._parameters = parameters
self.__parent = parent
self.connection = parent.connection
self.subsystem = subsystem
self.connection = connection
self._attribute_mapping: dict[str, AttrR] = {}
super().__init__()

Expand All @@ -437,9 +475,6 @@ async def initialise(self):
for name, attribute in attributes.items():
setattr(self, name, attribute)

async def queue_update(self, parameters: list[str]):
await self.__parent.queue_update(parameters)

def get_attribute(self, key: str):
if key in self._attribute_mapping:
return self._attribute_mapping[key]
Expand All @@ -455,16 +490,15 @@ def __is_index(parameter: EigerParameter):
parts = parameter.key.split("/")
return len(parts) == 3 and parts[1].isnumeric()

index_parameters, other_parameters = partition(
self._parameters, __is_index
)
index_parameters, other_parameters = partition(self._parameters, __is_index)

def __idx_group_name(parameter: EigerParameter) -> str:
return "Threshold" + parameter.key.split("/")[1]
# return parameter.key

index_attributes = _create_attributes(
index_parameters, group_namer=__idx_group_name)
index_parameters, group_namer=__idx_group_name
)
for name, attribute in index_attributes.items():
_, index, field = name.split("/")
attr_name = f"{field}_{index}"
Expand Down

0 comments on commit 1a4ee6c

Please sign in to comment.