diff --git a/src/ophyd_async/core/__init__.py b/src/ophyd_async/core/__init__.py index 103638019d..d595335b67 100644 --- a/src/ophyd_async/core/__init__.py +++ b/src/ophyd_async/core/__init__.py @@ -30,6 +30,8 @@ SignalRW, SignalW, SignalX, + assert_reading, + assert_value, observe_value, set_and_wait_for_value, set_sim_callback, @@ -73,6 +75,8 @@ "set_sim_put_proceeds", "set_sim_value", "wait_for_value", + assert_value, + assert_reading, "AsyncStatus", "DirectoryInfo", "DirectoryProvider", diff --git a/src/ophyd_async/core/signal.py b/src/ophyd_async/core/signal.py index 927faaf402..32698bebef 100644 --- a/src/ophyd_async/core/signal.py +++ b/src/ophyd_async/core/signal.py @@ -2,7 +2,16 @@ import asyncio import functools -from typing import AsyncGenerator, Callable, Dict, Generic, Optional, Union +from typing import ( + Any, + AsyncGenerator, + Callable, + Dict, + Generic, + Optional, + Sequence, + Union, +) from bluesky.protocols import ( Descriptor, @@ -253,6 +262,37 @@ def set_sim_callback(signal: Signal[T], callback: ReadingValueCallback[T]) -> No return _sim_backends[signal].set_callback(callback) +async def verify_readable( + func: Readable | Dict[str, Any], + expected: Dict[str, Any], +) -> None: + """verify readable""" + expectation = expected + for signal in expectation: + for field in expectation[signal]: + if field == "timestamp": + assert isinstance(func["sim_signal"]["timestamp"], float) + else: + assert func[signal][field] == expectation[signal][field] + + +async def assert_value(signal: SignalR[T], value: T) -> None: + """assert value""" + assert await signal.get_value() == value + + +async def assert_reading(readable: Readable, reading: Reading) -> None: + """assert reading""" + await verify_readable(readable, reading) + + +async def assert_configuration( + readable: Readable, configuration: Sequence[SignalR] +) -> None: + """assert configuration""" + await verify_readable(readable, configuration) + + async def observe_value(signal: SignalR[T], timeout=None) -> AsyncGenerator[T, None]: """Subscribe to the value of a signal so it can be iterated from. diff --git a/tests/core/test_signal.py b/tests/core/test_signal.py index 5393cb1a78..3c1d79e866 100644 --- a/tests/core/test_signal.py +++ b/tests/core/test_signal.py @@ -8,6 +8,8 @@ Signal, SignalRW, SimSignalBackend, + assert_reading, + assert_value, set_and_wait_for_value, set_sim_put_proceeds, set_sim_value, @@ -119,3 +121,17 @@ async def test_set_and_wait_for_value(): assert not st.done set_sim_put_proceeds(sim_signal, True) assert await time_taken_by(st) < 0.1 + + +async def test_helper_funtion(): + sim_signal = SignalRW(SimSignalBackend(int, "test")) + sim_signal.set_name("sim_signal") + await sim_signal.connect(sim=True) + set_sim_value(sim_signal, 168) + await assert_value(sim_signal, 168) + + dummy_readable = { + "sim_signal": {"alarm_severity": 0, "timestamp": 46709394.28, "value": 168} + } + reading = await sim_signal.read() + await assert_reading(reading, dummy_readable)