-
Notifications
You must be signed in to change notification settings - Fork 79
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ENH: add a "busy" device #1028
base: master
Are you sure you want to change the base?
ENH: add a "busy" device #1028
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
import time | ||
import threading | ||
|
||
from ophyd.device import Device | ||
from ophyd.status import DeviceStatus | ||
|
||
|
||
class BusyStatus(DeviceStatus): | ||
""" | ||
A "busy" device that takes a fixed amount of time in seconds to report complete. | ||
|
||
The clock starts as soon as the device is created (so do not hold onto these things!) | ||
|
||
Parameters | ||
---------- | ||
device : Device | ||
The object this status object belongs to | ||
|
||
delay : float | ||
Total delay in seconds | ||
|
||
tick : float, default=0.1 | ||
Time between updating the | ||
|
||
""" | ||
|
||
def __init__(self, device, delay, *, tick=0.1, **kwargs): | ||
super().__init__(device, **kwargs) | ||
start = time.monotonic() | ||
deadline = start + delay | ||
|
||
def busy_status_loop(): | ||
current = time.monotonic() | ||
while current < deadline: | ||
elapsed = current - start | ||
for w in self._watchers: | ||
w( | ||
name=self.device.name, | ||
current=elapsed, | ||
initial=0, | ||
target=delay, | ||
unit="s", | ||
fraction=(1 - elapsed / delay), | ||
time_elapsed=elapsed, | ||
time_remaining=delay - elapsed, | ||
) | ||
time.sleep(tick) | ||
current = time.monotonic() | ||
w( | ||
name=self.device.name, | ||
current=delay, | ||
initial=0, | ||
target=delay, | ||
unit="s", | ||
fraction=0, | ||
time_elapsed=current - start, | ||
time_remaining=0, | ||
) | ||
self.set_finished() | ||
|
||
threading.Thread(target=busy_status_loop).start() | ||
|
||
|
||
class Busy(Device): | ||
""" | ||
A "busy" device that takes a fixed amount of time in seconds to report complete. | ||
|
||
|
||
""" | ||
|
||
def set(self, delay): | ||
return BusyStatus(self, delay, tick=max(1, min(0.1, delay / 100))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this Did you have another corner case in mind? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I messed the logic up here. The goal was to never tick faster that 10hz (people can not see much faster and no need to stress the system) and never slower than 1hz (1s is about as long as I want to when I watch something to see if it is dead or not) and to get as close as possible to 100 updates as possible in between (because that felt like a "nice" number). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incomplete description here