-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into signal_names
- Loading branch information
Showing
6 changed files
with
181 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from .sim_motor import SimMotor | ||
|
||
__all__ = ["SimMotor"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import asyncio | ||
import time | ||
from typing import Callable, List, Optional | ||
|
||
from bluesky.protocols import Movable, Stoppable | ||
|
||
from ophyd_async.core import StandardReadable | ||
from ophyd_async.core.async_status import AsyncStatus | ||
from ophyd_async.core.signal import soft_signal_r_and_backend, soft_signal_rw | ||
|
||
|
||
class SimMotor(StandardReadable, Movable, Stoppable): | ||
def __init__(self, name="", instant=True) -> None: | ||
""" | ||
Simulated motor device | ||
args: | ||
- prefix: str: Signal names prefix | ||
- name: str: name of device | ||
- instant: bool: whether to move instantly, or with a delay | ||
""" | ||
self._instant = instant | ||
self._move_task: Optional[asyncio.Task] = None | ||
|
||
# Define some signals | ||
self.user_setpoint = soft_signal_rw(float, 0) | ||
self.user_readback, self._user_readback = soft_signal_r_and_backend(float, 0) | ||
self.velocity = soft_signal_rw(float, 1.0) | ||
self.egu = soft_signal_rw(float, "mm") | ||
|
||
# Set name and signals for read() and read_configuration() | ||
self.set_readable_signals( | ||
read=[self.user_readback], | ||
config=[self.velocity, self.egu], | ||
) | ||
super().__init__(name=name) | ||
|
||
# Whether set() should complete successfully or not | ||
self._set_success = True | ||
|
||
def stop(self, success=False): | ||
""" | ||
Stop the motor if it is moving | ||
""" | ||
if self._move_task: | ||
self._move_task.cancel() | ||
self._move_task = None | ||
|
||
self._set_success = success | ||
|
||
def set(self, new_position: float, timeout: Optional[float] = None) -> AsyncStatus: # noqa: F821 | ||
""" | ||
Asynchronously move the motor to a new position. | ||
""" | ||
watchers: List[Callable] = [] | ||
coro = asyncio.wait_for(self._move(new_position, watchers), timeout=timeout) | ||
return AsyncStatus(coro, watchers) | ||
|
||
async def _move(self, new_position: float, watchers: List[Callable] = []): | ||
""" | ||
Start the motor moving to a new position. | ||
If the motor is already moving, it will stop first. | ||
If this is an instant motor the move will be instantaneous. | ||
""" | ||
self.stop() | ||
start = time.monotonic() | ||
|
||
current_position = await self.user_readback.get_value() | ||
distance = abs(new_position - current_position) | ||
travel_time = 0 if self._instant else distance / await self.velocity.get_value() | ||
|
||
old_position, units = await asyncio.gather( | ||
self.user_setpoint.get_value(), | ||
self.egu.get_value(), | ||
) | ||
|
||
async def update_position(): | ||
while True: | ||
time_elapsed = round(time.monotonic() - start, 2) | ||
|
||
# update position based on time elapsed | ||
if time_elapsed >= travel_time: | ||
# successfully reached our target position | ||
await self._user_readback.put(new_position) | ||
self._set_success = True | ||
break | ||
else: | ||
current_position = ( | ||
old_position + distance * time_elapsed / travel_time | ||
) | ||
|
||
await self._user_readback.put(current_position) | ||
|
||
# notify watchers of the new position | ||
for watcher in watchers: | ||
watcher( | ||
name=self.name, | ||
current=current_position, | ||
initial=old_position, | ||
target=new_position, | ||
unit=units, | ||
time_elapsed=time.monotonic() - start, | ||
) | ||
|
||
# 10hz update loop | ||
await asyncio.sleep(0.1) | ||
|
||
# set up a task that updates the motor position at 10hz | ||
self._move_task = asyncio.create_task(update_position()) | ||
|
||
try: | ||
await self._move_task | ||
finally: | ||
if not self._set_success: | ||
raise RuntimeError("Motor was stopped") |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
import asyncio | ||
import time | ||
|
||
from bluesky.plans import spiral_square | ||
from bluesky.run_engine import RunEngine | ||
|
||
from ophyd_async.core.device import DeviceCollector | ||
from ophyd_async.sim.demo.sim_motor import SimMotor | ||
|
||
|
||
async def test_move_sim_in_plan(): | ||
RE = RunEngine() | ||
|
||
async with DeviceCollector(): | ||
m1 = SimMotor("M1", "sim_motor1") | ||
m2 = SimMotor("M2", "sim_motor2") | ||
|
||
my_plan = spiral_square([], m1, m2, 0, 0, 4, 4, 10, 10) | ||
|
||
RE(my_plan) | ||
|
||
assert await m1.user_readback.get_value() == -2 | ||
assert await m2.user_readback.get_value() == -2 | ||
|
||
|
||
async def test_slow_move(): | ||
async with DeviceCollector(): | ||
m1 = SimMotor("M1", instant=False) | ||
|
||
await m1.velocity.set(20) | ||
|
||
start = time.monotonic() | ||
await m1.set(10) | ||
elapsed = time.monotonic() - start | ||
|
||
assert await m1.user_readback.get_value() == 10 | ||
assert elapsed >= 0.5 | ||
assert elapsed < 1 | ||
|
||
|
||
async def test_stop(): | ||
async with DeviceCollector(): | ||
m1 = SimMotor("M1", instant=False) | ||
|
||
# this move should take 10 seconds but we will stop it after 0.2 | ||
move_status = m1.set(10) | ||
await asyncio.sleep(0.2) | ||
m1.stop() | ||
|
||
new_pos = await m1.user_readback.get_value() | ||
|
||
assert move_status.done | ||
# move should not be successful as we stopped it | ||
assert not move_status.success | ||
assert new_pos < 10 | ||
assert new_pos >= 0.1 |