Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XBPM feedback for i03 #160

Merged
merged 10 commits into from
Sep 15, 2023
8 changes: 4 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ classifiers = [
]
description = "Ophyd devices and other utils that could be used across DLS beamlines"
dependencies = [
"ophyd@git+https://github.com/Tom-Willemsen/ophyd@40e4b72c582e65e63d13a1650f76de709e5c70bb", # Switch back to just "ophyd" once <relevant PR> merged.
"ophyd@git+https://github.com/DominicOram/ophyd@31a1762435bcb275d2555068233549885eab02e7", # Switch back to just "ophyd" once ophyd#1148, ophyd#1155, ophyd#1140 merged.
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
"bluesky",
"pyepics",
"dataclasses-json",
"pillow",
"requests",
"graypy",
"pydantic<2.0",
"opencv-python-headless", # For pin-tip detection.
"aioca", # Required for CA support with Ophyd V2.
"p4p", # Required for PVA support with Ophyd V2.
"opencv-python-headless", # For pin-tip detection.
"aioca", # Required for CA support with Ophyd V2.
"p4p", # Required for PVA support with Ophyd V2.
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
]
dynamic = ["version"]
license.file = "LICENSE"
Expand Down
18 changes: 17 additions & 1 deletion src/dodal/beamlines/i03.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from dodal.devices.smargon import Smargon
from dodal.devices.synchrotron import Synchrotron
from dodal.devices.undulator import Undulator
from dodal.devices.xbpm_feedback import XBPMFeedback
from dodal.devices.xspress3_mini.xspress3_mini import Xspress3Mini
from dodal.devices.zebra import Zebra
from dodal.log import set_beamline as set_log_beamline
Expand Down Expand Up @@ -280,7 +281,22 @@ def flux(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) ->
return device_instantiation(
Flux,
"flux",
"-MO-MAPT-01:Y:",
"-MO-FLUX-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def xbpm_feedback(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> Flux:
"""Get the i03 XBPM feeback device, instantiate it if it hasn't already been.
If this is called when already instantiated in i03, it will return the existing object.
"""
return device_instantiation(
XBPMFeedback,
"xbpm_feedback",
"",
wait_for_connection,
fake_with_ophyd_sim,
)
177 changes: 177 additions & 0 deletions src/dodal/beamlines/i04.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
from dodal.beamlines.beamline_utils import device_instantiation
from dodal.beamlines.beamline_utils import set_beamline as set_utils_beamline
from dodal.devices.attenuator import Attenuator
from dodal.devices.beamstop import BeamStop
from dodal.devices.DCM import DCM
from dodal.devices.flux import Flux
from dodal.devices.i04.transfocator import Transfocator
from dodal.devices.ipin import IPin
from dodal.devices.motors import XYZPositioner
from dodal.devices.sample_shutter import SampleShutter
from dodal.devices.smargon import Smargon
from dodal.devices.xbpm_feedback import XBPMFeedback
from dodal.log import set_beamline as set_log_beamline
from dodal.utils import get_beamline_name

BL = get_beamline_name("s04")
set_log_beamline(BL)
set_utils_beamline(BL)


def smargon(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> Smargon:
"""Get the i04 Smargon device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
Smargon,
"smargon",
"-MO-SGON-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def gonio_positioner(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> XYZPositioner:
"""Get the i04 lower_gonio_stages device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
XYZPositioner,
"lower_gonio_stages",
"-MO-GONIO-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def sample_delivery_system(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> XYZPositioner:
"""Get the i04 sample_delivery_system device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
XYZPositioner,
"sample_delivery_system",
"-MO-SDE-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def ipin(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) -> IPin:
"""Get the i04 ipin device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
IPin,
"ipin",
"-EA-PIN-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def beamstop(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> BeamStop:
"""Get the i04 beamstop device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
BeamStop,
"beamstop",
"",
wait_for_connection,
fake_with_ophyd_sim,
)


def sample_shutter(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> SampleShutter:
"""Get the i04 sample shutter device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
SampleShutter,
"sample_shutter",
"-EA-SHTR-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def attenuator(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> Attenuator:
"""Get the i04 attenuator device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
Attenuator,
"attenuator",
"-EA-ATTN-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def transfocator(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> Transfocator:
"""Get the i04 transfocator device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
Transfocator,
"transfocator",
"-MO-FSWT-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def xbpm_feedback(
wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False
) -> XBPMFeedback:
"""Get the i04 xbpm_feedback device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
XBPMFeedback,
"xbpm_feedback",
"",
wait_for_connection,
fake_with_ophyd_sim,
)


def flux(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) -> Flux:
"""Get the i04 xbpm_feedback device, instantiate it if it hasn't already been.
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
Flux,
"flux",
"-MO-FLUX-01:",
wait_for_connection,
fake_with_ophyd_sim,
)


def dcm(wait_for_connection: bool = True, fake_with_ophyd_sim: bool = False) -> DCM:
"""Get the i04 DCM device, instantiate it if it hasn't already been.
If this is called when already instantiated in i04, it will return the existing object.
"""
return device_instantiation(
DCM,
"dcm",
"",
wait_for_connection,
fake_with_ophyd_sim,
)
4 changes: 2 additions & 2 deletions src/dodal/devices/DCM.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from ophyd import Component as Cpt
from ophyd import Device, EpicsMotor, EpicsSignalRO
from ophyd import Device, EpicsMotor, EpicsSignalRO, Kind


class DCM(Device):
Expand All @@ -16,7 +16,7 @@ class DCM(Device):
roll: EpicsMotor = Cpt(EpicsMotor, "-MO-DCM-01:ROLL")
offset: EpicsMotor = Cpt(EpicsMotor, "-MO-DCM-01:OFFSET")
perp: EpicsMotor = Cpt(EpicsMotor, "-MO-DCM-01:PERP")
energy: EpicsMotor = Cpt(EpicsMotor, "-MO-DCM-01:ENERGY")
energy: EpicsMotor = Cpt(EpicsMotor, "-MO-DCM-01:ENERGY", kind=Kind.hinted)
pitch: EpicsMotor = Cpt(EpicsMotor, "-MO-DCM-01:PITCH")
wavelength: EpicsMotor = Cpt(EpicsMotor, "-MO-DCM-01:WAVELENGTH")

Expand Down
4 changes: 2 additions & 2 deletions src/dodal/devices/attenuator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Optional

from ophyd import Component, Device, EpicsSignal, EpicsSignalRO
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import Status, SubscriptionStatus

from dodal.devices.detector import DetectorParams
Expand Down Expand Up @@ -78,7 +78,7 @@ def set(self, transmission: float) -> SubscriptionStatus:
EpicsSignal, "E2WL:USECURRENTENERGY.PROC"
)
change: EpicsSignal = Component(EpicsSignal, "FANOUT")
actual_transmission: EpicsSignal = Component(EpicsSignal, "MATCH")
actual_transmission: EpicsSignal = Component(EpicsSignal, "MATCH", kind=Kind.hinted)

detector_params: Optional[DetectorParams] = None

Expand Down
4 changes: 2 additions & 2 deletions src/dodal/devices/flux.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ophyd import Component, Device, EpicsSignalRO
from ophyd import Component, Device, EpicsSignalRO, Kind


class Flux(Device):
"""Simple device to get the flux reading"""

flux_reading: EpicsSignalRO = Component(EpicsSignalRO, "FLUX")
flux_reading: EpicsSignalRO = Component(EpicsSignalRO, "SAMP", kind=Kind.hinted)
58 changes: 58 additions & 0 deletions src/dodal/devices/i04/transfocator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from time import sleep

from ophyd import Component as Cpt
from ophyd import Device, EpicsSignal, EpicsSignalRO, Kind
from ophyd.status import DeviceStatus

from dodal.log import LOGGER


class Transfocator(Device):
beamsize_set: EpicsSignal = Cpt(EpicsSignal, "VERT_REQ", kind=Kind.hinted)
predict_vertical: EpicsSignal = Cpt(EpicsSignal, "LENS_PRED")

number_filters_sp: EpicsSignal = Cpt(EpicsSignal, "NUM_FILTERS")

start: EpicsSignal = Cpt(EpicsSignal, "START.PROC")
start_rbv: EpicsSignalRO = Cpt(EpicsSignalRO, "START_RBV")

vertical_lens_rbv: EpicsSignalRO = Cpt(EpicsSignalRO, "VER", kind=Kind.hinted)

TIMEOUT = 120
_POLLING_WAIT = 0.01

def polling_wait_on_stat_rbv(self, for_value):
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
# For some reason couldn't get monitors working on START_RBV
# (See https://github.com/DiamondLightSource/dodal/issues/152)
for _ in range(int(self.TIMEOUT / self._POLLING_WAIT)):
RBV_value = self.start_rbv.get()
sleep(self._POLLING_WAIT)
if RBV_value == for_value:
return
raise TimeoutError()
DominicOram marked this conversation as resolved.
Show resolved Hide resolved

def set(self, beamsize: float) -> DeviceStatus:
subscriber: int
status = DeviceStatus(self, timeout=self.TIMEOUT)

def set_based_on_predicition(old_value, value, *args, **kwargs):
if old_value != value:
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
self.predict_vertical.unsubscribe(subscriber)

value = round(value)
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info(f"Transfocator setting {value} filters")
self.number_filters_sp.set(value).wait()
self.start.set(1).wait()
self.polling_wait_on_stat_rbv(1)
self.polling_wait_on_stat_rbv(0)
status.set_finished()
Tom-Willemsen marked this conversation as resolved.
Show resolved Hide resolved

LOGGER.info(f"Transfocator setting {beamsize} beamsize")
if self.beamsize_set.get() != beamsize:
subscriber = self.predict_vertical.subscribe(
set_based_on_predicition, run=False
)
self.beamsize_set.set(beamsize)
else:
status.set_finished()
return status
DominicOram marked this conversation as resolved.
Show resolved Hide resolved
8 changes: 8 additions & 0 deletions src/dodal/devices/ipin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from ophyd import Component as Cpt
from ophyd import Device, EpicsSignalRO, Kind


class IPin(Device):
"""Simple device to get the ipin reading"""

reading: EpicsSignalRO = Cpt(EpicsSignalRO, "I", kind=Kind.hinted)
8 changes: 7 additions & 1 deletion src/dodal/devices/motors.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from dataclasses import dataclass

import numpy as np
from ophyd import EpicsMotor
from ophyd import Component, Device, EpicsMotor


class XYZPositioner(Device):
x: EpicsMotor = Component(EpicsMotor, "X")
y: EpicsMotor = Component(EpicsMotor, "Y")
z: EpicsMotor = Component(EpicsMotor, "Z")


@dataclass
Expand Down
9 changes: 6 additions & 3 deletions src/dodal/devices/sample_shutter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from typing import Union

from ophyd import Component, Device, EpicsSignal, EpicsSignalRO

Expand All @@ -16,7 +17,9 @@ class SampleShutter(Device):
pos: EpicsSignal = Component(EpicsSignal, "CTRL2")
pos_rbv: EpicsSignalRO = Component(EpicsSignalRO, "STA")

def set(self, open: int):
sp_status = self.pos.set(open)
rbv_status = await_value(self.pos_rbv, open)
def set(self, open_val: Union[int, OpenState]):
if isinstance(open_val, OpenState):
open_val = open_val.value
sp_status = self.pos.set(open_val)
rbv_status = await_value(self.pos_rbv, open_val)
return sp_status & rbv_status
Loading