Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pin tip centring now takes a median of values #242

Merged
merged 6 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions src/dodal/devices/areadetector/plugins/MXSC.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,47 @@
from typing import List, Tuple

import numpy as np
from ophyd import Component, Device, EpicsSignal, EpicsSignalRO, Kind, Signal
from ophyd.status import Status, SubscriptionStatus
from ophyd.status import StableSubscriptionStatus, Status

from dodal.log import LOGGER

Pixel = Tuple[int, int]


def statistics_of_positions(
positions: List[Pixel],
) -> Tuple[Pixel, Tuple[float, float]]:
"""Get the median and standard deviation from a list of readings.

Note that x/y are treated separately so the median position is not guaranteed to be
a position that was actually read.

Args:
positions (List[Pixel]): A list of tip positions.

Returns:
Tuple[Pixel, Tuple[float, float]]: The median tip position and the standard
deviation in x/y
"""
x_coords, y_coords = np.array(positions).T

median = (int(np.median(x_coords)), int(np.median(y_coords)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want these to be integers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. They're pixel values so we assumed they were integers later on for doing things like drawing grids and putting values into ispyb. Maybe I should do this conversion at the point we definitely need integers though

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, there's so many places this could potentially break that I think conversion here makes sense, I will add a comment though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I added type hinting, which makes it clearer that these things are valid Pixel locations and so must be ints. Doesn't really answer your question 100% but hopefully makes it clearer?

std = (np.std(x_coords, dtype=float), np.std(y_coords, dtype=float))

return median, std


class PinTipDetect(Device):
"""This will read the pin tip location from the MXSC plugin.

If the plugin finds no tip it will return {INVALID_POSITION}. However, it will also
occassionally give false negatives and return this value when there is a pin tip
there. Therefore, it is recommended that you trigger this device, which will cause a
subsequent read to return a valid pin immediatedly if one is found or wait
{validity_timeout} seconds if one is not, at which point a subsequent read will give
you {INVALID_POSITION}.
occassionally give incorrect data. Therefore, it is recommended that you trigger
this device, which will set {triggered_tip} to a median of the valid points taken
for {settle_time_s} seconds.

If no valid points are found within {validity_timeout} seconds a {triggered_tip}
will be set to {INVALID_POSITION}.
"""

INVALID_POSITION = (-1, -1)
Expand All @@ -19,16 +50,36 @@ class PinTipDetect(Device):

triggered_tip: Signal = Component(Signal, kind=Kind.hinted, value=INVALID_POSITION)
validity_timeout: Signal = Component(Signal, value=5)
settle_time_s: Signal = Component(Signal, value=0.5)

def log_tips_and_statistics(self, _):
median, standard_deviation = statistics_of_positions(self.tip_positions)
LOGGER.info(
f"Found tips {self.tip_positions} with median {median} and standard deviation {standard_deviation}"
)

def update_tip_if_valid(self, value, **_):
current_value = (value, self.tip_y.get())
def update_tip_if_valid(self, value: int, **_):
current_value = (value, int(self.tip_y.get()))
if current_value != self.INVALID_POSITION:
self.triggered_tip.put(current_value)
self.tip_positions.append(current_value)

(
median_tip_location,
__,
) = statistics_of_positions(self.tip_positions)

self.triggered_tip.put(median_tip_location)
return True
return False

def trigger(self) -> Status:
subscription_status = SubscriptionStatus(
self.tip_x, self.update_tip_if_valid, run=True
self.tip_positions: List[Pixel] = []

subscription_status = StableSubscriptionStatus(
self.tip_x,
self.update_tip_if_valid,
stability_time=self.settle_time_s.get(),
run=True,
)

def set_to_default_and_finish(timeout_status: Status):
Expand All @@ -44,6 +95,7 @@ def set_to_default_and_finish(timeout_status: Status):
self._timeout_status = Status(self, timeout=self.validity_timeout.get())
self._timeout_status.add_callback(set_to_default_and_finish)
subscription_status.add_callback(lambda _: self._timeout_status.set_finished())
subscription_status.add_callback(self.log_tips_and_statistics)

return subscription_status

Expand Down
92 changes: 85 additions & 7 deletions tests/devices/unit_tests/test_pin_tip_detect.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
from typing import List, Tuple

import bluesky.plan_stubs as bps
import pytest
from bluesky import RunEngine
from ophyd.sim import make_fake_device

from dodal.devices.areadetector.plugins.MXSC import PinTipDetect
from dodal.devices.areadetector.plugins.MXSC import (
PinTipDetect,
statistics_of_positions,
)


@pytest.fixture
def fake_pin_tip_detect() -> PinTipDetect:
FakePinTipDetect = make_fake_device(PinTipDetect)
pin_tip_detect: PinTipDetect = FakePinTipDetect(name="pin_tip")
pin_tip_detect.settle_time_s.set(0).wait()
yield pin_tip_detect


def trigger_and_read(fake_pin_tip_detect, value_to_set_during_trigger=None):
def trigger_and_read(
fake_pin_tip_detect, values_to_set_during_trigger: List[Tuple] = None
):
yield from bps.trigger(fake_pin_tip_detect)
if value_to_set_during_trigger:
fake_pin_tip_detect.tip_y.sim_put(value_to_set_during_trigger[1])
fake_pin_tip_detect.tip_x.sim_put(value_to_set_during_trigger[0])
if values_to_set_during_trigger:
for position in values_to_set_during_trigger:
fake_pin_tip_detect.tip_y.sim_put(position[1])
fake_pin_tip_detect.tip_x.sim_put(position[0])
yield from bps.wait()
return (yield from bps.rd(fake_pin_tip_detect))

Expand All @@ -39,7 +48,7 @@ def test_given_pin_tip_invalid_when_triggered_and_set_then_rd_returns_value(
fake_pin_tip_detect: PinTipDetect,
):
RE = RunEngine(call_returns_result=True)
result = RE(trigger_and_read(fake_pin_tip_detect, (200, 100)))
result = RE(trigger_and_read(fake_pin_tip_detect, [(200, 100)]))

assert result.plan_result == (200, 100)

Expand All @@ -48,8 +57,77 @@ def test_given_pin_tip_found_before_timeout_then_timeout_status_cleaned_up_and_t
fake_pin_tip_detect: PinTipDetect,
):
RE = RunEngine(call_returns_result=True)
RE(trigger_and_read(fake_pin_tip_detect, (100, 200)))
RE(trigger_and_read(fake_pin_tip_detect, [(100, 200)]))
# A success should clear up the timeout status but it may clear it up slightly later
# so we need the small timeout to avoid the race condition
fake_pin_tip_detect._timeout_status.wait(0.01)
assert fake_pin_tip_detect.triggered_tip.get() == (100, 200)


def test_median_of_positions_calculated_correctly():
test = [(1, 2), (1, 5), (3, 3)]

actual_med, _ = statistics_of_positions(test)

assert actual_med == (1, 3)


def test_standard_dev_of_positions_calculated_correctly():
test = [(1, 2), (1, 3)]

_, actual_std = statistics_of_positions(test)

assert actual_std == (0, 0.5)


def test_given_multiple_tips_found_then_running_median_calculated(
fake_pin_tip_detect: PinTipDetect,
):
fake_pin_tip_detect.settle_time_s.set(0.1).wait()

RE = RunEngine(call_returns_result=True)
RE(trigger_and_read(fake_pin_tip_detect, [(100, 200), (50, 60), (400, 800)]))

assert fake_pin_tip_detect.triggered_tip.get() == (100, 200)


def trigger_and_read_twice(
fake_pin_tip_detect: PinTipDetect, first_values: List[Tuple], second_value: Tuple
):
yield from trigger_and_read(fake_pin_tip_detect, first_values)
fake_pin_tip_detect.tip_y.sim_put(second_value[1])
fake_pin_tip_detect.tip_x.sim_put(second_value[0])
return (yield from trigger_and_read(fake_pin_tip_detect, []))


def test_given_median_previously_calculated_when_triggered_again_then_only_calculated_on_new_values(
fake_pin_tip_detect: PinTipDetect,
):
fake_pin_tip_detect.settle_time_s.set(0.1).wait()

RE = RunEngine(call_returns_result=True)

def my_plan():
tip_pos = yield from trigger_and_read_twice(
fake_pin_tip_detect, [(10, 20), (1, 3), (4, 8)], (100, 200)
)
assert tip_pos == (100, 200)

RE(my_plan())


def test_given_previous_tip_found_when_this_tip_not_found_then_returns_invalid(
fake_pin_tip_detect: PinTipDetect,
):
fake_pin_tip_detect.settle_time_s.set(0.1).wait()
fake_pin_tip_detect.validity_timeout.set(0.5).wait()

RE = RunEngine(call_returns_result=True)

def my_plan():
tip_pos = yield from trigger_and_read_twice(
fake_pin_tip_detect, [(10, 20), (1, 3), (4, 8)], (-1, -1)
)
assert tip_pos == (-1, -1)

RE(my_plan())