diff --git a/src/dodal/devices/smargon.py b/src/dodal/devices/smargon.py index bed39f8883..603c8f9ce9 100644 --- a/src/dodal/devices/smargon.py +++ b/src/dodal/devices/smargon.py @@ -1,8 +1,43 @@ +from enum import Enum + from ophyd import Component as Cpt -from ophyd import EpicsMotor, EpicsSignal +from ophyd import Device, EpicsMotor from ophyd.epics_motor import MotorBundle +from ophyd.status import StatusBase from dodal.devices.motors import MotorLimitHelper, XYZLimitBundle +from dodal.devices.status import await_approx_value +from dodal.devices.utils import SetWhenEnabled + + +class StubPosition(Enum): + CURRENT_AS_CENTER = 0 + RESET_TO_ROBOT_LOAD = 1 + + +class StubOffsets(Device): + """Stub offsets are used to change the internal co-ordinate system of the smargon by + adding an offset to x, y, z. + This is useful as the smargon's centre of rotation is around (0, 0, 0). As such + changing stub offsets will change the centre of rotation. + In practice we don't need to change them manually, instead there are helper PVs to + set them so that the current position is zero or to pre-defined positions. + """ + + parent: "Smargon" + + center_at_current_position: SetWhenEnabled = Cpt(SetWhenEnabled, "CENTER_CS") + to_robot_load: SetWhenEnabled = Cpt(SetWhenEnabled, "SET_STUBS_TO_RL") + + def set(self, pos: StubPosition) -> StatusBase: + if pos == StubPosition.CURRENT_AS_CENTER: + status = self.center_at_current_position.set(1) + status &= await_approx_value(self.parent.x, 0.0, deadband=0.1) + status &= await_approx_value(self.parent.y, 0.0, deadband=0.1) + status &= await_approx_value(self.parent.z, 0.0, deadband=0.1) + return status + else: + return self.to_robot_load.set(1) class Smargon(MotorBundle): @@ -20,10 +55,6 @@ class Smargon(MotorBundle): phi: EpicsMotor = Cpt(EpicsMotor, "PHI") omega: EpicsMotor = Cpt(EpicsMotor, "OMEGA") - stub_offset_set: EpicsSignal = Cpt(EpicsSignal, "SET_STUBS_TO_RL.PROC") - """Stub offsets are calibration values that are required to move between calibration - pin position and spine pins. These are set in EPICS and applied via the proc.""" - real_x1: EpicsMotor = Cpt(EpicsMotor, "MOTOR_3") real_x2: EpicsMotor = Cpt(EpicsMotor, "MOTOR_4") real_y: EpicsMotor = Cpt(EpicsMotor, "MOTOR_1") @@ -31,6 +62,8 @@ class Smargon(MotorBundle): real_phi: EpicsMotor = Cpt(EpicsMotor, "MOTOR_5") real_chi: EpicsMotor = Cpt(EpicsMotor, "MOTOR_6") + stub_offsets: StubOffsets = Cpt(StubOffsets, "") + def get_xyz_limits(self) -> XYZLimitBundle: """Get the limits for the x, y and z axes. diff --git a/src/dodal/devices/status.py b/src/dodal/devices/status.py index d619d18d56..46ab1b6534 100644 --- a/src/dodal/devices/status.py +++ b/src/dodal/devices/status.py @@ -14,10 +14,12 @@ def value_is(value, **_): return SubscriptionStatus(subscribable, value_is, timeout=timeout) -# Returns a status which is completed when the subscriptable contains a value within the expected_value list def await_value_in_list( subscribable: Any, expected_value: list, timeout: Union[None, int] = None ) -> SubscriptionStatus: + """Returns a status which is completed when the subscriptable contains a value + within the expected_value list""" + def value_is(value, **_): return value in expected_value @@ -25,3 +27,15 @@ def value_is(value, **_): raise TypeError(f"expected value {expected_value} is not a list") else: return SubscriptionStatus(subscribable, value_is, timeout=timeout) + + +def await_approx_value( + subscribable: Any, + expected_value: T, + deadband: float = 1e-09, + timeout: Union[None, int] = None, +) -> SubscriptionStatus: + def value_is_approx(value, **_): + return abs(value - expected_value) <= deadband + + return SubscriptionStatus(subscribable, value_is_approx, timeout=timeout) diff --git a/src/dodal/devices/utils.py b/src/dodal/devices/utils.py index d7115bb560..e0bdcd8d02 100644 --- a/src/dodal/devices/utils.py +++ b/src/dodal/devices/utils.py @@ -4,6 +4,7 @@ from ophyd import Component, Device, EpicsSignal from ophyd.status import Status, StatusBase +from dodal.devices.status import await_value from dodal.log import LOGGER @@ -105,3 +106,18 @@ def set_global_exception_and_log(status: Status): # Initiate the chain of functions wrap_func(starting_status, functions_to_chain[0], wrapped_funcs[-1]) return full_status + + +class SetWhenEnabled(Device): + """A device that sets the proc field of a PV when it becomes enabled.""" + + proc: EpicsSignal = Component(EpicsSignal, ".PROC") + disp: EpicsSignal = Component(EpicsSignal, ".DISP") + + def set(self, proc: int) -> Status: + return run_functions_without_blocking( + [ + lambda: await_value(self.disp, 0), + lambda: self.proc.set(proc), + ] + ) diff --git a/tests/devices/unit_tests/test_smargon.py b/tests/devices/unit_tests/test_smargon.py new file mode 100644 index 0000000000..03c60d89d9 --- /dev/null +++ b/tests/devices/unit_tests/test_smargon.py @@ -0,0 +1,66 @@ +from time import sleep +from typing import Tuple + +import pytest +from ophyd.sim import make_fake_device + +from dodal.devices.smargon import Smargon, StubPosition + + +@pytest.fixture +def smargon() -> Smargon: + return make_fake_device(Smargon)(name="smargon") + + +def set_smargon_pos(smargon: Smargon, pos: Tuple[float, float, float]): + smargon.x.user_readback.sim_put(pos[0]) + smargon.y.user_readback.sim_put(pos[1]) + smargon.z.user_readback.sim_put(pos[2]) + + +def test_given_to_robot_disp_low_when_stub_offsets_set_to_robot_load_then_proc_set( + smargon: Smargon, +): + smargon.stub_offsets.to_robot_load.disp.sim_put(0) + + status = smargon.stub_offsets.set(StubPosition.RESET_TO_ROBOT_LOAD) + status.wait() + + assert smargon.stub_offsets.to_robot_load.proc.get() == 1 + assert smargon.stub_offsets.center_at_current_position.proc.get() == 0 + + +def test_given_center_disp_low_and_at_centre_when_stub_offsets_set_to_center_then_proc_set( + smargon: Smargon, +): + smargon.stub_offsets.center_at_current_position.disp.sim_put(0) + set_smargon_pos(smargon, (0, 0, 0)) + + status = smargon.stub_offsets.set(StubPosition.CURRENT_AS_CENTER) + status.wait() + + assert smargon.stub_offsets.to_robot_load.proc.get() == 0 + assert smargon.stub_offsets.center_at_current_position.proc.get() == 1 + + +def test_given_center_disp_low_when_stub_offsets_set_to_center_and_moved_to_0_0_0_then_proc_set( + smargon: Smargon, +): + smargon.stub_offsets.center_at_current_position.disp.sim_put(0) + + set_smargon_pos(smargon, (1.5, 0.5, 3.4)) + + status = smargon.stub_offsets.set(StubPosition.CURRENT_AS_CENTER) + + sleep(0.01) + + assert smargon.stub_offsets.center_at_current_position.proc.get() == 1 + + assert not status.done + + set_smargon_pos(smargon, (0, 0, 0)) + + status.wait() + + assert smargon.stub_offsets.to_robot_load.proc.get() == 0 + assert smargon.stub_offsets.center_at_current_position.proc.get() == 1 diff --git a/tests/devices/unit_tests/test_utils.py b/tests/devices/unit_tests/test_utils.py index 6a831c206e..0868569f97 100644 --- a/tests/devices/unit_tests/test_utils.py +++ b/tests/devices/unit_tests/test_utils.py @@ -1,9 +1,10 @@ from unittest.mock import MagicMock, patch import pytest +from ophyd.sim import make_fake_device from ophyd.status import Status -from dodal.devices.utils import run_functions_without_blocking +from dodal.devices.utils import SetWhenEnabled, run_functions_without_blocking from dodal.log import LOGGER, GELFTCPHandler, logging, set_up_logging_handlers @@ -105,3 +106,15 @@ def test_status_points_to_provided_device_object(): ) returned_status.wait(0.1) assert returned_status.obj == expected_obj + + +def test_given_disp_high_when_set_SetWhenEnabled_then_proc_not_set_until_disp_low(): + signal: SetWhenEnabled = make_fake_device(SetWhenEnabled)(name="test") + signal.disp.sim_put(1) + signal.proc.set = MagicMock(return_value=Status(done=True, success=True)) + + status = signal.set(1) + signal.proc.set.assert_not_called() + signal.disp.sim_put(0) + status.wait() + signal.proc.set.assert_called_once()