Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stub offsets to smargon #230

Merged
merged 4 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions src/dodal/devices/smargon.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,43 @@
from enum import Enum

from ophyd import Component as Cpt
from ophyd import EpicsMotor, EpicsSignal
from ophyd import Device, EpicsMotor
from ophyd.epics_motor import MotorBundle
from ophyd.status import StatusBase

from dodal.devices.motors import MotorLimitHelper, XYZLimitBundle
from dodal.devices.status import await_approx_value
from dodal.devices.utils import SetWhenEnabled


class StubPosition(Enum):
CURRENT_AS_CENTER = 0
RESET_TO_ROBOT_LOAD = 1


class StubOffsets(Device):
"""Stub offsets are used to change the internal co-ordinate system of the smargon by
adding an offset to x, y, z.
This is useful as the smargon's centre of rotation is around (0, 0, 0). As such
changing stub offsets will change the centre of rotation.
In practice we don't need to change them manually, instead there are helper PVs to
set them so that the current position is zero or to pre-defined positions.
"""

parent: "Smargon"

center_at_current_position: SetWhenEnabled = Cpt(SetWhenEnabled, "CENTER_CS")
to_robot_load: SetWhenEnabled = Cpt(SetWhenEnabled, "SET_STUBS_TO_RL")

def set(self, pos: StubPosition) -> StatusBase:
if pos == StubPosition.CURRENT_AS_CENTER:
status = self.center_at_current_position.set(1)
status &= await_approx_value(self.parent.x, 0.0, deadband=0.1)
status &= await_approx_value(self.parent.y, 0.0, deadband=0.1)
status &= await_approx_value(self.parent.z, 0.0, deadband=0.1)
return status
else:
return self.to_robot_load.set(1)


class Smargon(MotorBundle):
Expand All @@ -20,17 +55,15 @@ class Smargon(MotorBundle):
phi: EpicsMotor = Cpt(EpicsMotor, "PHI")
omega: EpicsMotor = Cpt(EpicsMotor, "OMEGA")

stub_offset_set: EpicsSignal = Cpt(EpicsSignal, "SET_STUBS_TO_RL.PROC")
"""Stub offsets are calibration values that are required to move between calibration
pin position and spine pins. These are set in EPICS and applied via the proc."""

real_x1: EpicsMotor = Cpt(EpicsMotor, "MOTOR_3")
real_x2: EpicsMotor = Cpt(EpicsMotor, "MOTOR_4")
real_y: EpicsMotor = Cpt(EpicsMotor, "MOTOR_1")
real_z: EpicsMotor = Cpt(EpicsMotor, "MOTOR_2")
real_phi: EpicsMotor = Cpt(EpicsMotor, "MOTOR_5")
real_chi: EpicsMotor = Cpt(EpicsMotor, "MOTOR_6")

stub_offsets: StubOffsets = Cpt(StubOffsets, "")

def get_xyz_limits(self) -> XYZLimitBundle:
"""Get the limits for the x, y and z axes.

Expand Down
16 changes: 15 additions & 1 deletion src/dodal/devices/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,28 @@ def value_is(value, **_):
return SubscriptionStatus(subscribable, value_is, timeout=timeout)


# Returns a status which is completed when the subscriptable contains a value within the expected_value list
def await_value_in_list(
subscribable: Any, expected_value: list, timeout: Union[None, int] = None
) -> SubscriptionStatus:
"""Returns a status which is completed when the subscriptable contains a value
within the expected_value list"""

def value_is(value, **_):
return value in expected_value

if not isinstance(expected_value, list):
raise TypeError(f"expected value {expected_value} is not a list")
else:
return SubscriptionStatus(subscribable, value_is, timeout=timeout)


def await_approx_value(
subscribable: Any,
expected_value: T,
deadband: float = 1e-09,
timeout: Union[None, int] = None,
) -> SubscriptionStatus:
def value_is_approx(value, **_):
return abs(value - expected_value) <= deadband

return SubscriptionStatus(subscribable, value_is_approx, timeout=timeout)
16 changes: 16 additions & 0 deletions src/dodal/devices/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from ophyd import Component, Device, EpicsSignal
from ophyd.status import Status, StatusBase

from dodal.devices.status import await_value
from dodal.log import LOGGER


Expand Down Expand Up @@ -105,3 +106,18 @@ def set_global_exception_and_log(status: Status):
# Initiate the chain of functions
wrap_func(starting_status, functions_to_chain[0], wrapped_funcs[-1])
return full_status


class SetWhenEnabled(Device):
"""A device that sets the proc field of a PV when it becomes enabled."""

proc: EpicsSignal = Component(EpicsSignal, ".PROC")
disp: EpicsSignal = Component(EpicsSignal, ".DISP")

def set(self, proc: int) -> Status:
return run_functions_without_blocking(
[
lambda: await_value(self.disp, 0),
lambda: self.proc.set(proc),
]
)
66 changes: 66 additions & 0 deletions tests/devices/unit_tests/test_smargon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from time import sleep
from typing import Tuple

import pytest
from ophyd.sim import make_fake_device

from dodal.devices.smargon import Smargon, StubPosition


@pytest.fixture
def smargon() -> Smargon:
return make_fake_device(Smargon)(name="smargon")


def set_smargon_pos(smargon: Smargon, pos: Tuple[float, float, float]):
smargon.x.user_readback.sim_put(pos[0])
smargon.y.user_readback.sim_put(pos[1])
smargon.z.user_readback.sim_put(pos[2])


def test_given_to_robot_disp_low_when_stub_offsets_set_to_robot_load_then_proc_set(
smargon: Smargon,
):
smargon.stub_offsets.to_robot_load.disp.sim_put(0)

status = smargon.stub_offsets.set(StubPosition.RESET_TO_ROBOT_LOAD)
status.wait()

assert smargon.stub_offsets.to_robot_load.proc.get() == 1
assert smargon.stub_offsets.center_at_current_position.proc.get() == 0


def test_given_center_disp_low_and_at_centre_when_stub_offsets_set_to_center_then_proc_set(
smargon: Smargon,
):
smargon.stub_offsets.center_at_current_position.disp.sim_put(0)
set_smargon_pos(smargon, (0, 0, 0))

status = smargon.stub_offsets.set(StubPosition.CURRENT_AS_CENTER)
status.wait()

assert smargon.stub_offsets.to_robot_load.proc.get() == 0
assert smargon.stub_offsets.center_at_current_position.proc.get() == 1


def test_given_center_disp_low_when_stub_offsets_set_to_center_and_moved_to_0_0_0_then_proc_set(
smargon: Smargon,
):
smargon.stub_offsets.center_at_current_position.disp.sim_put(0)

set_smargon_pos(smargon, (1.5, 0.5, 3.4))

status = smargon.stub_offsets.set(StubPosition.CURRENT_AS_CENTER)

sleep(0.01)

assert smargon.stub_offsets.center_at_current_position.proc.get() == 1

assert not status.done

set_smargon_pos(smargon, (0, 0, 0))

status.wait()

assert smargon.stub_offsets.to_robot_load.proc.get() == 0
assert smargon.stub_offsets.center_at_current_position.proc.get() == 1
15 changes: 14 additions & 1 deletion tests/devices/unit_tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from unittest.mock import MagicMock, patch

import pytest
from ophyd.sim import make_fake_device
from ophyd.status import Status

from dodal.devices.utils import run_functions_without_blocking
from dodal.devices.utils import SetWhenEnabled, run_functions_without_blocking
from dodal.log import LOGGER, GELFTCPHandler, logging, set_up_logging_handlers


Expand Down Expand Up @@ -105,3 +106,15 @@ def test_status_points_to_provided_device_object():
)
returned_status.wait(0.1)
assert returned_status.obj == expected_obj


def test_given_disp_high_when_set_SetWhenEnabled_then_proc_not_set_until_disp_low():
signal: SetWhenEnabled = make_fake_device(SetWhenEnabled)(name="test")
signal.disp.sim_put(1)
signal.proc.set = MagicMock(return_value=Status(done=True, success=True))

status = signal.set(1)
signal.proc.set.assert_not_called()
signal.disp.sim_put(0)
status.wait()
signal.proc.set.assert_called_once()