Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XBPM feedback for i03 #160

Merged
merged 10 commits into from
Sep 15, 2023
12 changes: 4 additions & 8 deletions src/dodal/devices/xbpm_feedback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO
from ophyd.status import StableSubscriptionStatus, StatusBase
from ophyd.status import StatusBase, SubscriptionStatus


class XBPMFeedback(Device):
Expand All @@ -11,18 +11,14 @@ class XBPMFeedback(Device):
STABILITY_TIME = 3

pos_ok: EpicsSignalRO = Component(EpicsSignalRO, "-EA-FDBK-01:XBPM2POSITION_OK")
pos_stable: EpicsSignalRO = Component(EpicsSignalRO, "-EA-FDBK-01:XBPM2_STABLE")
pause_feedback: EpicsSignal = Component(EpicsSignal, "-EA-FDBK-01:FB_PAUSE")
x: EpicsSignalRO = Component(EpicsSignalRO, "-EA-XBPM-02:PosX:MeanValue_RBV")
y: EpicsSignalRO = Component(EpicsSignalRO, "-EA-XBPM-02:PosY:MeanValue_RBV")

def trigger(self) -> StatusBase:
status = StableSubscriptionStatus(
self.pos_ok,
return SubscriptionStatus(
self.pos_stable,
lambda value, *args, **kwargs: value == 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit, ignorable]

Signature should be f(*, old_value, value, **kwargs)

I know this is technically correct but maybe it would be clearer to use the exact signature advised in the ophyd docs?

self.STABILITY_TIME,
timeout=60,
)

if self.pos_ok.get() == 1:
status.set_finished()
return status
13 changes: 6 additions & 7 deletions tests/devices/unit_tests/test_xbpm_feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,23 @@ def fake_xbpm_feedback():
return FakeXBPMFeedback(name="xbpm")


def test_given_pos_ok_when_xbpm_feedback_kickoff_then_return_immediately(
def test_given_pos_stable_when_xbpm_feedback_kickoff_then_return_immediately(
fake_xbpm_feedback: XBPMFeedback,
):
fake_xbpm_feedback.pos_ok.sim_put(1)
fake_xbpm_feedback.pos_stable.sim_put(1)
status = fake_xbpm_feedback.trigger()
status.wait(0.1)
assert status.done and status.success


def test_given_pos_not_ok_and_goes_ok_for_stability_time_when_xbpm_feedback_kickoff_then_return_immediately(
def test_given_pos_not_stable_and_goes_stable_when_xbpm_feedback_kickoff_then_return(
fake_xbpm_feedback: XBPMFeedback,
):
fake_xbpm_feedback.STABILITY_TIME = 0.1
fake_xbpm_feedback.pos_ok.sim_put(0)
fake_xbpm_feedback.pos_stable.sim_put(0)
status = fake_xbpm_feedback.trigger()

assert not status.done

fake_xbpm_feedback.pos_ok.sim_put(1)
status.wait(0.2)
fake_xbpm_feedback.pos_stable.sim_put(1)
status.wait(0.1)
assert status.done and status.success