Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XBPM feedback for i03 #160

Merged
merged 10 commits into from
Sep 15, 2023
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ classifiers = [
]
description = "Ophyd devices and other utils that could be used across DLS beamlines"
dependencies = [
"ophyd@git+https://github.com/Tom-Willemsen/ophyd@40e4b72c582e65e63d13a1650f76de709e5c70bb", # Switch back to just "ophyd" once <relevant PR> merged.
"ophyd@git+https://github.com/DominicOram/ophyd@31a1762435bcb275d2555068233549885eab02e7", # Switch back to just "ophyd" once ophyd#1148, ophyd#1155, ophyd#1140 merged.
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
"bluesky",
"pyepics",
"dataclasses-json",
"pillow",
"requests",
"graypy",
"pydantic<2.0",
"opencv-python-headless", # For pin-tip detection.
"aioca", # Required for CA support with Ophyd V2.
"p4p", # Required for PVA support with Ophyd V2.
"opencv-python-headless", # For pin-tip detection.
"aioca", # Required for CA support with Ophyd V2.
"p4p", # Required for PVA support with Ophyd V2.
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
16 changes: 16 additions & 0 deletions src/dodal/beamlines/i03.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dodal.devices.smargon import Smargon
from dodal.devices.synchrotron import Synchrotron
from dodal.devices.undulator import Undulator
from dodal.devices.xbpm_feedback import XBPMFeedback
from dodal.devices.xspress3_mini.xspress3_mini import Xspress3Mini
from dodal.devices.zebra import Zebra
from dodal.log import set_beamline as set_log_beamline
Expand Down Expand Up @@ -284,3 +285,18 @@ def flux(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) ->
wait_for_connection,
fake_with_ophyd_sim,
)


def xbpm_feedback(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> Flux:
"""Get the i03 XBPM feeback device, instantiate it if it hasn't already been.
If this is called when already instantiated in i03, it will return the existing object.
"""
return device_instantiation(
XBPMFeedback,
"xbpm_feedback",
"",
wait_for_connection,
fake_with_ophyd_sim,
)
17 changes: 17 additions & 0 deletions src/dodal/devices/xbpm_feedback.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,28 @@
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO
from ophyd.status import StableSubscriptionStatus, StatusBase


class XBPMFeedback(Device):
"""The XBPM feedback device is an IOC that moves the DCM, HFM and VFM to automatically
hold the beam into place, as measured by the XBPM sensor."""

# We need to wait for the beam to be locked into position for this amount of time
# until we are certain that it is stable.
STABILITY_TIME = 3

pos_ok: EpicsSignalRO = Component(EpicsSignalRO, "-EA-FDBK-01:XBPM2POSITION_OK")
pause_feedback: EpicsSignal = Component(EpicsSignal, "-EA-FDBK-01:FB_PAUSE")
x: EpicsSignalRO = Component(EpicsSignalRO, "-EA-XBPM-02:PosX:MeanValue_RBV")
y: EpicsSignalRO = Component(EpicsSignalRO, "-EA-XBPM-02:PosY:MeanValue_RBV")

def trigger(self) -> StatusBase:
status = StableSubscriptionStatus(
self.pos_ok,
lambda value, *args, **kwargs: value == 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit, ignorable]

Signature should be f(*, old_value, value, **kwargs)

I know this is technically correct but maybe it would be clearer to use the exact signature advised in the ophyd docs?

self.STABILITY_TIME,
timeout=60,
)

if self.pos_ok.get() == 1:
status.set_finished()
return status
33 changes: 33 additions & 0 deletions tests/devices/unit_tests/test_xbpm_feedback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest
from ophyd.sim import make_fake_device

from dodal.devices.xbpm_feedback import XBPMFeedback


@pytest.fixture
def fake_xbpm_feedback():
FakeXBPMFeedback = make_fake_device(XBPMFeedback)
return FakeXBPMFeedback(name="xbpm")


def test_given_pos_ok_when_xbpm_feedback_kickoff_then_return_immediately(
fake_xbpm_feedback: XBPMFeedback,
):
fake_xbpm_feedback.pos_ok.sim_put(1)
status = fake_xbpm_feedback.trigger()
status.wait(0.1)
assert status.done and status.success


def test_given_pos_not_ok_and_goes_ok_for_stability_time_when_xbpm_feedback_kickoff_then_return_immediately(
fake_xbpm_feedback: XBPMFeedback,
):
fake_xbpm_feedback.STABILITY_TIME = 0.1
fake_xbpm_feedback.pos_ok.sim_put(0)
status = fake_xbpm_feedback.trigger()

assert not status.done

fake_xbpm_feedback.pos_ok.sim_put(1)
status.wait(0.2)
assert status.done and status.success
Loading