diff --git a/src/ophyd_async/epics/signal/signal.py b/src/ophyd_async/epics/signal/signal.py index d4259c66c8..c8f7a0d46a 100644 --- a/src/ophyd_async/epics/signal/signal.py +++ b/src/ophyd_async/epics/signal/signal.py @@ -41,7 +41,10 @@ def _make_backend( def epics_signal_rw( - datatype: Type[T], read_pv: str, write_pv: Optional[str] = None + datatype: Type[T], + read_pv: str, + write_pv: Optional[str] = None, + read_suffix: str = "", ) -> SignalRW[T]: """Create a `SignalRW` backed by 1 or 2 EPICS PVs @@ -53,11 +56,32 @@ def epics_signal_rw( The PV to read and monitor write_pv: If given, use this PV to write to, otherwise use read_pv + read_suffix: + Append this suffix to read_pv """ - backend = _make_backend(datatype, read_pv, write_pv or read_pv) + backend = _make_backend(datatype, f"{read_pv}{read_suffix}", write_pv or read_pv) return SignalRW(backend) +def epics_signal_rw_rbv( + datatype: Type[T], + read_pv: str, + write_pv: Optional[str] = None, +) -> SignalRW[T]: + """Create a `SignalRW` backed by 1 or 2 EPICS PVs, with read_suffix as "_RBV" + + Parameters + ---------- + datatype: + Check that the PV is of this type + read_pv: + The PV to read and monitor + write_pv: + If given, use this PV to write to, otherwise use read_pv + """ + return epics_signal_rw(datatype, read_pv, write_pv or read_pv, read_suffix="_RBV") + + def epics_signal_r(datatype: Type[T], read_pv: str) -> SignalR[T]: """Create a `SignalR` backed by 1 EPICS PV diff --git a/tests/core/test_signal.py b/tests/core/test_signal.py index 5393cb1a78..ee8850895b 100644 --- a/tests/core/test_signal.py +++ b/tests/core/test_signal.py @@ -14,6 +14,7 @@ wait_for_value, ) from ophyd_async.core.utils import DEFAULT_TIMEOUT +from ophyd_async.epics.signal.signal import epics_signal_r, epics_signal_rw, epics_signal_rw_rbv, epics_signal_w, epics_signal_x class MySignal(Signal): @@ -119,3 +120,26 @@ async def test_set_and_wait_for_value(): assert not st.done set_sim_put_proceeds(sim_signal, True) assert await time_taken_by(st) < 0.1 + + +def test_signal_helpers(): + read_write = epics_signal_rw(int, "ReadWrite") + assert read_write._backend.read_pv == "ReadWrite" + assert read_write._backend.write_pv == "ReadWrite" + + read_write_rbv_manual = epics_signal_rw(int, "ReadWrite", read_suffix="_RBV") + assert read_write_rbv_manual._backend.read_pv == "ReadWrite_RBV" + assert read_write_rbv_manual._backend.write_pv == "ReadWrite" + + read_write_rbv = epics_signal_rw_rbv(int, "ReadWrite") + assert read_write_rbv._backend.read_pv == "ReadWrite_RBV" + assert read_write_rbv._backend.write_pv == "ReadWrite" + + read = epics_signal_r(int, "Read") + assert read._backend.read_pv == "Read" + + write = epics_signal_w(int, "Write") + assert write._backend.write_pv == "Write" + + execute = epics_signal_x("Execute") + assert execute._backend.write_pv == "Execute"