diff --git a/src/dodal/beamlines/i03.py b/src/dodal/beamlines/i03.py index 8f77e8451f..549b8468d0 100644 --- a/src/dodal/beamlines/i03.py +++ b/src/dodal/beamlines/i03.py @@ -17,6 +17,7 @@ from dodal.devices.smargon import Smargon from dodal.devices.synchrotron import Synchrotron from dodal.devices.undulator import Undulator +from dodal.devices.xbpm_feedback import XBPMFeedback from dodal.devices.xspress3_mini.xspress3_mini import Xspress3Mini from dodal.devices.zebra import Zebra from dodal.log import set_beamline as set_log_beamline @@ -284,3 +285,18 @@ def flux(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) -> wait_for_connection, fake_with_ophyd_sim, ) + + +def xbpm_feedback( + wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False +) -> Flux: + """Get the i03 XBPM feeback device, instantiate it if it hasn't already been. + If this is called when already instantiated in i03, it will return the existing object. + """ + return device_instantiation( + XBPMFeedback, + "xbpm_feedback", + "", + wait_for_connection, + fake_with_ophyd_sim, + ) diff --git a/src/dodal/devices/xbpm_feedback.py b/src/dodal/devices/xbpm_feedback.py index 6c67bc91f8..fd30363019 100644 --- a/src/dodal/devices/xbpm_feedback.py +++ b/src/dodal/devices/xbpm_feedback.py @@ -1,11 +1,27 @@ from ophyd import Component, Device, EpicsSignal, EpicsSignalRO +from ophyd.status import StableSubscriptionStatus, StatusBase class XBPMFeedback(Device): """The XBPM feedback device is an IOC that moves the DCM, HFM and VFM to automatically hold the beam into place, as measured by the XBPM sensor.""" + # The time that the feedback needs to be high to be considered stable + STABILITY_TIME = 3 + pos_ok: EpicsSignalRO = Component(EpicsSignalRO, "-EA-FDBK-01:XBPM2POSITION_OK") pause_feedback: EpicsSignal = Component(EpicsSignal, "-EA-FDBK-01:FB_PAUSE") x: EpicsSignalRO = Component(EpicsSignalRO, "-EA-XBPM-02:PosX:MeanValue_RBV") y: EpicsSignalRO = Component(EpicsSignalRO, "-EA-XBPM-02:PosY:MeanValue_RBV") + + def trigger(self) -> StatusBase: + status = StableSubscriptionStatus( + self.pos_ok, + lambda value, *args, **kwargs: value == 1, + self.STABILITY_TIME, + timeout=60, + ) + + if self.pos_ok.get() == 1: + status.set_finished() + return status diff --git a/tests/devices/unit_tests/test_xbpm_feedback.py b/tests/devices/unit_tests/test_xbpm_feedback.py new file mode 100644 index 0000000000..4a90ce7d4b --- /dev/null +++ b/tests/devices/unit_tests/test_xbpm_feedback.py @@ -0,0 +1,33 @@ +import pytest +from ophyd.sim import make_fake_device + +from dodal.devices.xbpm_feedback import XBPMFeedback + + +@pytest.fixture +def fake_xbpm_feedback(): + FakeXBPMFeedback = make_fake_device(XBPMFeedback) + return FakeXBPMFeedback(name="xbpm") + + +def test_given_pos_ok_when_xbpm_feedback_kickoff_then_return_immediately( + fake_xbpm_feedback: XBPMFeedback, +): + fake_xbpm_feedback.pos_ok.sim_put(1) + status = fake_xbpm_feedback.trigger() + status.wait(0.1) + assert status.done and status.success + + +def test_given_pos_not_ok_and_goes_ok_for_stability_time_when_xbpm_feedback_kickoff_then_return_immediately( + fake_xbpm_feedback: XBPMFeedback, +): + fake_xbpm_feedback.STABILITY_TIME = 0.1 + fake_xbpm_feedback.pos_ok.sim_put(0) + status = fake_xbpm_feedback.trigger() + + assert not status.done + + fake_xbpm_feedback.pos_ok.sim_put(1) + status.wait(0.2) + assert status.done and status.success