From 4b64ba9f437352ca57413eeb43ad65f04cbe58c0 Mon Sep 17 00:00:00 2001 From: Ryan howard Date: Mon, 11 Mar 2024 14:24:27 -0400 Subject: [PATCH] Class-ify the algorithms. --- .../scripts/lld_data_script.py | 288 +++++++++++------- 1 file changed, 176 insertions(+), 112 deletions(-) diff --git a/hardware/opentrons_hardware/scripts/lld_data_script.py b/hardware/opentrons_hardware/scripts/lld_data_script.py index 52ef401dded..f13e14f8795 100644 --- a/hardware/opentrons_hardware/scripts/lld_data_script.py +++ b/hardware/opentrons_hardware/scripts/lld_data_script.py @@ -2,122 +2,190 @@ import csv import os import argparse -from typing import List, Optional, Tuple, Any, Callable +from typing import List, Optional, Tuple, Any import matplotlib.pyplot as plot import numpy - +from abc import ABC, abstractmethod impossible_pressure = 9001.0 -# ----present day threshold based---- -def tick_th(pressure: float) -> Tuple[bool, float]: - """Simulate firmware motor interrupt tick.""" - return (pressure < -150, pressure) +class LLDAlgoABC(ABC): + """An instance of an lld algorithm.""" + @staticmethod + @abstractmethod + def name() -> str: + """Name of this algorithm.""" + ... -def reset_th() -> None: - """Reset simulator between runs.""" - pass + @abstractmethod + def tick(self, pressure: float) -> Tuple[bool, float]: + """Simulate firmware motor interrupt tick.""" + ... + @abstractmethod + def reset(self) -> None: + """Reset simulator between runs.""" + ... -# -----Simple moving average derivative--- -samples_n_smad = 10 -running_samples_smad: List[float] = [impossible_pressure] * samples_n_smad -derivative_threshold_smad = -2.5 +class LLDPresThresh(LLDAlgoABC): + """present day threshold based.""" -def reset_smad() -> None: - """Reset simulator between runs.""" - global running_samples_smad - running_samples_smad = [impossible_pressure] * samples_n_smad + threshold: float + def __init__(self, thresh: float = -150) -> None: + """Init.""" + self.threshold = thresh -def tick_smad(pressure: float) -> Tuple[bool, float]: - """Simulate firmware motor interrupt tick.""" - global running_samples_smad - try: - next_ind = running_samples_smad.index(impossible_pressure) - # if no exception we're still filling the minimum samples - running_samples_smad[next_ind] = pressure - return (False, impossible_pressure) - except ValueError: # the array has been filled - pass - # store old running average - prev_running_avg = sum(running_samples_smad) / samples_n_smad - # left shift old samples - for i in range(samples_n_smad - 1): - running_samples_smad[i] = running_samples_smad[i + 1] - running_samples_smad[samples_n_smad - 1] = pressure - new_running_avg = sum(running_samples_smad) / samples_n_smad - return ( - (new_running_avg - prev_running_avg) < derivative_threshold_smad, - new_running_avg, - ) + @staticmethod + def name() -> str: + """Name of this algorithm.""" + return "threshold" + def tick(self, pressure: float) -> Tuple[bool, float]: + """Simulate firmware motor interrupt tick.""" + return (pressure < self.threshold, pressure) -# -----weighted moving average derivative--- -samples_n_wmad = 10 -weights_wmad: numpy.ndarray[Any, numpy.dtype[numpy.float32]] = numpy.array( - [0.19, 0.17, 0.15, 0.13, 0.11, 0.09, 0.07, 0.05, 0.03, 0.01] -) -running_samples_wmad = numpy.full(samples_n_wmad, impossible_pressure) -derivative_threshold_wmad = -2 - - -def reset_wmad() -> None: - """Reset simulator between runs.""" - global running_samples_wmad - assert numpy.sum(weights_wmad) == 1 - running_samples_wmad = numpy.full(samples_n_wmad, impossible_pressure) - - -def tick_wmad(pressure: float) -> Tuple[bool, float]: - """Simulate firmware motor interrupt tick.""" - global running_samples_wmad - if numpy.isin(impossible_pressure, running_samples_wmad): - next_ind = numpy.where(running_samples_wmad == impossible_pressure)[0][0] - # if no exception we're still filling the minimum samples - running_samples_wmad[next_ind] = pressure - return (False, impossible_pressure) - # store old running average - prev_running_avg = numpy.sum(numpy.multiply(running_samples_wmad, weights_wmad)) - # left shift old samples - for i in range(samples_n_wmad - 1): - running_samples_wmad[i] = running_samples_wmad[i + 1] - running_samples_wmad[samples_n_wmad - 1] = pressure - new_running_avg = numpy.sum(numpy.multiply(running_samples_wmad, weights_wmad)) - return ( - (new_running_avg - prev_running_avg) < derivative_threshold_wmad, - new_running_avg, - ) - + def reset(self) -> None: + """Reset simulator between runs.""" + pass -# -----exponential moving average derivative--- -current_average_emad: float = impossible_pressure -smoothing_factor = 0.1 -derivative_threshold_emad = -2.5 +class LLDSMAD(LLDAlgoABC): + """Simple moving average derivative.""" + + samples_n_smad: int + running_samples_smad: List[float] + derivative_threshold_smad: float + + def __init__(self, samples: int = 10, thresh: float = -2.5) -> None: + """Init.""" + self.samples_n_smad = samples + self.derivative_threshold_smad = thresh + self.reset() + + @staticmethod + def name() -> str: + """Name of this algorithm.""" + return "simple moving avg der" + + def reset(self) -> None: + """Reset simulator between runs.""" + self.running_samples_smad = [impossible_pressure] * self.samples_n_smad + + def tick(self, pressure: float) -> Tuple[bool, float]: + """Simulate firmware motor interrupt tick.""" + try: + next_ind = self.running_samples_smad.index(impossible_pressure) + # if no exception we're still filling the minimum samples + self.running_samples_smad[next_ind] = pressure + return (False, impossible_pressure) + except ValueError: # the array has been filled + pass + # store old running average + prev_running_avg = sum(self.running_samples_smad) / self.samples_n_smad + # left shift old samples + for i in range(self.samples_n_smad - 1): + self.running_samples_smad[i] = self.running_samples_smad[i + 1] + self.running_samples_smad[self.samples_n_smad - 1] = pressure + new_running_avg = sum(self.running_samples_smad) / self.samples_n_smad + return ( + (new_running_avg - prev_running_avg) < self.derivative_threshold_smad, + new_running_avg, + ) -def reset_emad() -> None: - """Reset simulator between runs.""" - global current_average_emad - current_average_emad = impossible_pressure +class LLDWMAD(LLDAlgoABC): + """Weighted moving average derivative.""" -def tick_emad(pressure: float) -> Tuple[bool, float]: - """Simulate firmware motor interrupt tick.""" - global current_average_emad - if current_average_emad == impossible_pressure: - current_average_emad = pressure - return (False, impossible_pressure) - else: - new_average = (pressure * smoothing_factor) + ( - current_average_emad * (1 - smoothing_factor) + samples_n_wmad: int + weights_wmad: numpy.ndarray[Any, numpy.dtype[numpy.float32]] = numpy.array( + [0.19, 0.17, 0.15, 0.13, 0.11, 0.09, 0.07, 0.05, 0.03, 0.01] + ) + running_samples_wmad: numpy.ndarray[Any, numpy.dtype[numpy.float32]] + derivative_threshold_wmad: float + + def __init__(self, samples: int = 10, thresh: float = -2) -> None: + """Init.""" + self.samples_n_wmad = samples + self.derivative_threshold_wmad = thresh + self.reset() + + @staticmethod + def name() -> str: + """Name of this algorithm.""" + return "weighted moving avg der" + + def reset(self) -> None: + """Reset simulator between runs.""" + assert numpy.sum(self.weights_wmad) == 1 + self.running_samples_wmad = numpy.full(self.samples_n_wmad, impossible_pressure) + + def tick(self, pressure: float) -> Tuple[bool, float]: + """Simulate firmware motor interrupt tick.""" + if numpy.isin(impossible_pressure, self.running_samples_wmad): + next_ind = numpy.where(self.running_samples_wmad == impossible_pressure)[0][ + 0 + ] + # if no exception we're still filling the minimum samples + self.running_samples_wmad[next_ind] = pressure + return (False, impossible_pressure) + # store old running average + prev_running_avg = numpy.sum( + numpy.multiply(self.running_samples_wmad, self.weights_wmad) + ) + # left shift old samples + for i in range(self.samples_n_wmad - 1): + self.running_samples_wmad[i] = self.running_samples_wmad[i + 1] + self.running_samples_wmad[self.samples_n_wmad - 1] = pressure + new_running_avg = numpy.sum( + numpy.multiply(self.running_samples_wmad, self.weights_wmad) ) - derivative = new_average - current_average_emad - current_average_emad = new_average - return (derivative < derivative_threshold_emad, current_average_emad) + return ( + (new_running_avg - prev_running_avg) < self.derivative_threshold_wmad, + new_running_avg, + ) + + +class LLDEMAD(LLDAlgoABC): + """Exponential moving average derivative.""" + + current_average_emad: float = impossible_pressure + smoothing_factor: float + derivative_threshold_emad: float + + def __init__(self, s_factor: float = 0.1, thresh: float = -2.5) -> None: + """Init.""" + self.smoothing_factor = s_factor + self.derivative_threshold_emad = thresh + self.reset() + + @staticmethod + def name() -> str: + """Name of this algorithm.""" + return "exponential moving avg der" + + def reset(self) -> None: + """Reset simulator between runs.""" + self.current_average_emad = impossible_pressure + + def tick(self, pressure: float) -> Tuple[bool, float]: + """Simulate firmware motor interrupt tick.""" + if self.current_average_emad == impossible_pressure: + self.current_average_emad = pressure + return (False, impossible_pressure) + else: + new_average = (pressure * self.smoothing_factor) + ( + self.current_average_emad * (1 - self.smoothing_factor) + ) + derivative = new_average - self.current_average_emad + self.current_average_emad = new_average + return ( + derivative < self.derivative_threshold_emad, + self.current_average_emad, + ) def _running_avg( @@ -126,11 +194,10 @@ def _running_avg( z_travel: List[float], p_travel: List[float], no_plot: bool, - reset_func: Callable[[], None], - tick_func: Callable[[float], Tuple[bool, float]], + algorithm: LLDAlgoABC, plot_name: str, ) -> Optional[Tuple[float, float, float]]: - reset_func() + algorithm.reset() average = float(0) running_time = [] running_derivative = [] @@ -138,7 +205,7 @@ def _running_avg( return_val = None for i in range(1, len(time)): prev_avg = average - found, average = tick_func(float(pressure[i])) + found, average = algorithm.tick(float(pressure[i])) if found: # if average < running_avg_threshold: # print(f"found z height = {z_travel[i]}") @@ -179,9 +246,7 @@ def _running_avg( def run( args: argparse.Namespace, - reset_func: Callable[[], None], - tick_func: Callable[[float], Tuple[bool, float]], - name: str, + algorithm: LLDAlgoABC, ) -> None: """Run the test with a given algorithm on all the data.""" path = args.filepath + "/" @@ -229,9 +294,8 @@ def run( z_travel, p_travel, args.no_plot, - reset_func, - tick_func, - f"{name} trial: {trial+1}", + algorithm, + f"{algorithm.name()} trial: {trial+1}", ) if threshold_data: # threshold_time = threshold_data[0] @@ -265,15 +329,15 @@ def main() -> None: parser.add_argument("--no-plot", action="store_true") args = parser.parse_args() - function_pairs = [ - ("threshold", reset_th, tick_th), - ("simple moving avg der", reset_smad, tick_smad), - ("weighted moving avg der", reset_wmad, tick_wmad), - ("exponential moving avg der", reset_emad, tick_emad), + algorithms: List[LLDAlgoABC] = [ + LLDPresThresh(), + LLDSMAD(), + LLDWMAD(), + LLDEMAD(), ] - for name, reset_func, tick_func in function_pairs: - print(f"Algorithm {name}") - run(args, reset_func, tick_func, name) + for algorithm in algorithms: + print(f"Algorithm {algorithm.name()}") + run(args, algorithm) if __name__ == "__main__":