Skip to content

Commit

Permalink
Add SignalRW helpers
Browse files Browse the repository at this point in the history
Add tests for all signal creation helpers
  • Loading branch information
GDYendell committed Apr 16, 2024
1 parent feaf884 commit ca84049
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
28 changes: 26 additions & 2 deletions src/ophyd_async/epics/signal/signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ def _make_backend(


def epics_signal_rw(
datatype: Type[T], read_pv: str, write_pv: Optional[str] = None
datatype: Type[T],
read_pv: str,
write_pv: Optional[str] = None,
read_suffix: str = "",
) -> SignalRW[T]:
"""Create a `SignalRW` backed by 1 or 2 EPICS PVs
Expand All @@ -53,11 +56,32 @@ def epics_signal_rw(
The PV to read and monitor
write_pv:
If given, use this PV to write to, otherwise use read_pv
read_suffix:
Append this suffix to read_pv
"""
backend = _make_backend(datatype, read_pv, write_pv or read_pv)
backend = _make_backend(datatype, f"{read_pv}{read_suffix}", write_pv or read_pv)
return SignalRW(backend)


def epics_signal_rw_rbv(
datatype: Type[T],
read_pv: str,
write_pv: Optional[str] = None,
) -> SignalRW[T]:
"""Create a `SignalRW` backed by 1 or 2 EPICS PVs, with read_suffix as "_RBV"
Parameters
----------
datatype:
Check that the PV is of this type
read_pv:
The PV to read and monitor
write_pv:
If given, use this PV to write to, otherwise use read_pv
"""
return epics_signal_rw(datatype, read_pv, write_pv or read_pv, read_suffix="_RBV")


def epics_signal_r(datatype: Type[T], read_pv: str) -> SignalR[T]:
"""Create a `SignalR` backed by 1 EPICS PV
Expand Down
24 changes: 24 additions & 0 deletions tests/core/test_signal.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
wait_for_value,
)
from ophyd_async.core.utils import DEFAULT_TIMEOUT
from ophyd_async.epics.signal.signal import epics_signal_r, epics_signal_rw, epics_signal_rw_rbv, epics_signal_w, epics_signal_x


class MySignal(Signal):
Expand Down Expand Up @@ -119,3 +120,26 @@ async def test_set_and_wait_for_value():
assert not st.done
set_sim_put_proceeds(sim_signal, True)
assert await time_taken_by(st) < 0.1


def test_signal_helpers():
read_write = epics_signal_rw(int, "ReadWrite")
assert read_write._backend.read_pv == "ReadWrite"
assert read_write._backend.write_pv == "ReadWrite"

read_write_rbv_manual = epics_signal_rw(int, "ReadWrite", read_suffix="_RBV")
assert read_write_rbv_manual._backend.read_pv == "ReadWrite_RBV"
assert read_write_rbv_manual._backend.write_pv == "ReadWrite"

read_write_rbv = epics_signal_rw_rbv(int, "ReadWrite")
assert read_write_rbv._backend.read_pv == "ReadWrite_RBV"
assert read_write_rbv._backend.write_pv == "ReadWrite"

read = epics_signal_r(int, "Read")
assert read._backend.read_pv == "Read"

write = epics_signal_w(int, "Write")
assert write._backend.write_pv == "Write"

execute = epics_signal_x("Execute")
assert execute._backend.write_pv == "Execute"

0 comments on commit ca84049

Please sign in to comment.