Skip to content

Commit

Permalink
(DiamondLightSource/hyperion#794) Tidy up stub offsets and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
DominicOram committed Nov 9, 2023
1 parent 3d7d056 commit f5e7954
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 25 deletions.
38 changes: 15 additions & 23 deletions src/dodal/devices/smargon.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
from enum import Enum

from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignal
from ophyd import Device, EpicsMotor
from ophyd.epics_motor import MotorBundle
from ophyd.status import Status
from ophyd.status import StatusBase

from dodal.devices.motors import MotorLimitHelper, XYZLimitBundle
from dodal.devices.status import await_value
from dodal.devices.utils import run_functions_without_blocking
from dodal.devices.status import await_approx_value
from dodal.devices.utils import SetWhenEnabled


class StubPosition(Enum):
CURRENT_AS_CENTER = 0
RESEET_TO_ROBOT_LOAD = 1
RESET_TO_ROBOT_LOAD = 1


class StubOffsets(Device):
Expand All @@ -24,28 +24,20 @@ class StubOffsets(Device):
set them so that the current position is zero or to pre-defined positions.
"""

center_at_current_position: EpicsSignal = Cpt(EpicsSignal, "CENTER_CS.PROC")
center_at_current_position_enabled: EpicsSignal = Cpt(EpicsSignal, "CENTER_CS.DISP")
parent: "Smargon"

to_robot_load: EpicsSignal = Cpt(EpicsSignal, "SET_STUBS_TO_RL.PROC")
to_robot_load_enabled: EpicsSignal = Cpt(EpicsSignal, "SET_STUBS_TO_RL.DISP")
center_at_current_position: SetWhenEnabled = Cpt(SetWhenEnabled, "CENTER_CS")
to_robot_load: SetWhenEnabled = Cpt(SetWhenEnabled, "SET_STUBS_TO_RL")

def set(self, pos: StubPosition) -> Status:
def set(self, pos: StubPosition) -> StatusBase:
if pos == StubPosition.CURRENT_AS_CENTER:
status = run_functions_without_blocking(
[
lambda: await_value(self.center_at_current_position_enabled, 0),
lambda: self.center_at_current_position.set(1),
]
)
status = self.center_at_current_position.set(1)
status &= await_approx_value(self.parent.x, 0.0, deadband=0.1)
status &= await_approx_value(self.parent.y, 0.0, deadband=0.1)
status &= await_approx_value(self.parent.z, 0.0, deadband=0.1)
return status

Check warning on line 38 in src/dodal/devices/smargon.py

View check run for this annotation

Codecov / codecov/patch

src/dodal/devices/smargon.py#L33-L38

Added lines #L33 - L38 were not covered by tests
else:
status = run_functions_without_blocking(
[
lambda: await_value(self.to_robot_load_enabled, 0),
lambda: self.to_robot_load.set(1),
]
)
return status
return self.to_robot_load.set(1)

Check warning on line 40 in src/dodal/devices/smargon.py

View check run for this annotation

Codecov / codecov/patch

src/dodal/devices/smargon.py#L40

Added line #L40 was not covered by tests


class Smargon(MotorBundle):
Expand Down
16 changes: 15 additions & 1 deletion src/dodal/devices/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,28 @@ def value_is(value, **_):
return SubscriptionStatus(subscribable, value_is, timeout=timeout)


# Returns a status which is completed when the subscriptable contains a value within the expected_value list
def await_value_in_list(
subscribable: Any, expected_value: list, timeout: Union[None, int] = None
) -> SubscriptionStatus:
"""Returns a status which is completed when the subscriptable contains a value
within the expected_value list"""

def value_is(value, **_):
return value in expected_value

if not isinstance(expected_value, list):
raise TypeError(f"expected value {expected_value} is not a list")
else:
return SubscriptionStatus(subscribable, value_is, timeout=timeout)


def await_approx_value(
subscribable: Any,
expected_value: T,
deadband: float = 1e-09,
timeout: Union[None, int] = None,
) -> SubscriptionStatus:
def value_is_approx(value, **_):
return abs(value - expected_value) <= deadband

Check warning on line 39 in src/dodal/devices/status.py

View check run for this annotation

Codecov / codecov/patch

src/dodal/devices/status.py#L38-L39

Added lines #L38 - L39 were not covered by tests

return SubscriptionStatus(subscribable, value_is_approx, timeout=timeout)

Check warning on line 41 in src/dodal/devices/status.py

View check run for this annotation

Codecov / codecov/patch

src/dodal/devices/status.py#L41

Added line #L41 was not covered by tests
16 changes: 16 additions & 0 deletions src/dodal/devices/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ophyd import Component, Device, EpicsSignal
from ophyd.status import Status, StatusBase

from dodal.devices.status import await_value
from dodal.log import LOGGER


Expand Down Expand Up @@ -105,3 +106,18 @@ def set_global_exception_and_log(status: Status):
# Initiate the chain of functions
wrap_func(starting_status, functions_to_chain[0], wrapped_funcs[-1])
return full_status


class SetWhenEnabled(Device):
"""A device that sets the proc field of a PV when it becomes enabled."""

proc: EpicsSignal = Component(EpicsSignal, ".PROC")
disp: EpicsSignal = Component(EpicsSignal, ".DISP")

def set(self, proc: int) -> Status:
return run_functions_without_blocking(
[
lambda: await_value(self.disp, 0),
lambda: self.proc.set(proc),
]
)
15 changes: 14 additions & 1 deletion tests/devices/unit_tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from unittest.mock import MagicMock, patch

import pytest
from ophyd.sim import make_fake_device
from ophyd.status import Status

from dodal.devices.utils import run_functions_without_blocking
from dodal.devices.utils import SetWhenEnabled, run_functions_without_blocking
from dodal.log import LOGGER, GELFTCPHandler, logging, set_up_logging_handlers


Expand Down Expand Up @@ -105,3 +106,15 @@ def test_status_points_to_provided_device_object():
)
returned_status.wait(0.1)
assert returned_status.obj == expected_obj


def test_given_disp_high_when_set_SetWhenEnabled_then_proc_not_set_until_disp_low():
signal: SetWhenEnabled = make_fake_device(SetWhenEnabled)(name="test")
signal.disp.sim_put(1)
signal.proc.set = MagicMock(return_value=Status(done=True, success=True))

status = signal.set(1)
signal.proc.set.assert_not_called()
signal.disp.sim_put(0)
status.wait()
signal.proc.set.assert_called_once()

0 comments on commit f5e7954

Please sign in to comment.