Skip to content

Commit

Permalink
Class-ify the algorithms.
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanthecoder committed Mar 11, 2024
1 parent dac9b04 commit 4b64ba9
Showing 1 changed file with 176 additions and 112 deletions.
288 changes: 176 additions & 112 deletions hardware/opentrons_hardware/scripts/lld_data_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,122 +2,190 @@
import csv
import os
import argparse
from typing import List, Optional, Tuple, Any, Callable
from typing import List, Optional, Tuple, Any
import matplotlib.pyplot as plot
import numpy

from abc import ABC, abstractmethod

Check warning on line 8 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L2-L8

Added lines #L2 - L8 were not covered by tests

impossible_pressure = 9001.0

Check warning on line 10 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L10

Added line #L10 was not covered by tests


# ----present day threshold based----
def tick_th(pressure: float) -> Tuple[bool, float]:
"""Simulate firmware motor interrupt tick."""
return (pressure < -150, pressure)
class LLDAlgoABC(ABC):

Check warning on line 13 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L13

Added line #L13 was not covered by tests
"""An instance of an lld algorithm."""

@staticmethod
@abstractmethod
def name() -> str:

Check warning on line 18 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L16-L18

Added lines #L16 - L18 were not covered by tests
"""Name of this algorithm."""
...

Check warning on line 20 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L20

Added line #L20 was not covered by tests

def reset_th() -> None:
"""Reset simulator between runs."""
pass
@abstractmethod
def tick(self, pressure: float) -> Tuple[bool, float]:

Check warning on line 23 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L22-L23

Added lines #L22 - L23 were not covered by tests
"""Simulate firmware motor interrupt tick."""
...

Check warning on line 25 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L25

Added line #L25 was not covered by tests

@abstractmethod
def reset(self) -> None:

Check warning on line 28 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L27-L28

Added lines #L27 - L28 were not covered by tests
"""Reset simulator between runs."""
...

Check warning on line 30 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L30

Added line #L30 was not covered by tests

# -----Simple moving average derivative---
samples_n_smad = 10
running_samples_smad: List[float] = [impossible_pressure] * samples_n_smad
derivative_threshold_smad = -2.5

class LLDPresThresh(LLDAlgoABC):

Check warning on line 33 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L33

Added line #L33 was not covered by tests
"""present day threshold based."""

def reset_smad() -> None:
"""Reset simulator between runs."""
global running_samples_smad
running_samples_smad = [impossible_pressure] * samples_n_smad
threshold: float

Check warning on line 36 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L36

Added line #L36 was not covered by tests

def __init__(self, thresh: float = -150) -> None:

Check warning on line 38 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L38

Added line #L38 was not covered by tests
"""Init."""
self.threshold = thresh

Check warning on line 40 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L40

Added line #L40 was not covered by tests

def tick_smad(pressure: float) -> Tuple[bool, float]:
"""Simulate firmware motor interrupt tick."""
global running_samples_smad
try:
next_ind = running_samples_smad.index(impossible_pressure)
# if no exception we're still filling the minimum samples
running_samples_smad[next_ind] = pressure
return (False, impossible_pressure)
except ValueError: # the array has been filled
pass
# store old running average
prev_running_avg = sum(running_samples_smad) / samples_n_smad
# left shift old samples
for i in range(samples_n_smad - 1):
running_samples_smad[i] = running_samples_smad[i + 1]
running_samples_smad[samples_n_smad - 1] = pressure
new_running_avg = sum(running_samples_smad) / samples_n_smad
return (
(new_running_avg - prev_running_avg) < derivative_threshold_smad,
new_running_avg,
)
@staticmethod
def name() -> str:

Check warning on line 43 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L42-L43

Added lines #L42 - L43 were not covered by tests
"""Name of this algorithm."""
return "threshold"

Check warning on line 45 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L45

Added line #L45 was not covered by tests

def tick(self, pressure: float) -> Tuple[bool, float]:

Check warning on line 47 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L47

Added line #L47 was not covered by tests
"""Simulate firmware motor interrupt tick."""
return (pressure < self.threshold, pressure)

Check warning on line 49 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L49

Added line #L49 was not covered by tests

# -----weighted moving average derivative---
samples_n_wmad = 10
weights_wmad: numpy.ndarray[Any, numpy.dtype[numpy.float32]] = numpy.array(
[0.19, 0.17, 0.15, 0.13, 0.11, 0.09, 0.07, 0.05, 0.03, 0.01]
)
running_samples_wmad = numpy.full(samples_n_wmad, impossible_pressure)
derivative_threshold_wmad = -2


def reset_wmad() -> None:
"""Reset simulator between runs."""
global running_samples_wmad
assert numpy.sum(weights_wmad) == 1
running_samples_wmad = numpy.full(samples_n_wmad, impossible_pressure)


def tick_wmad(pressure: float) -> Tuple[bool, float]:
"""Simulate firmware motor interrupt tick."""
global running_samples_wmad
if numpy.isin(impossible_pressure, running_samples_wmad):
next_ind = numpy.where(running_samples_wmad == impossible_pressure)[0][0]
# if no exception we're still filling the minimum samples
running_samples_wmad[next_ind] = pressure
return (False, impossible_pressure)
# store old running average
prev_running_avg = numpy.sum(numpy.multiply(running_samples_wmad, weights_wmad))
# left shift old samples
for i in range(samples_n_wmad - 1):
running_samples_wmad[i] = running_samples_wmad[i + 1]
running_samples_wmad[samples_n_wmad - 1] = pressure
new_running_avg = numpy.sum(numpy.multiply(running_samples_wmad, weights_wmad))
return (
(new_running_avg - prev_running_avg) < derivative_threshold_wmad,
new_running_avg,
)

def reset(self) -> None:

Check warning on line 51 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L51

Added line #L51 was not covered by tests
"""Reset simulator between runs."""
pass

Check warning on line 53 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L53

Added line #L53 was not covered by tests

# -----exponential moving average derivative---
current_average_emad: float = impossible_pressure
smoothing_factor = 0.1
derivative_threshold_emad = -2.5

class LLDSMAD(LLDAlgoABC):

Check warning on line 56 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L56

Added line #L56 was not covered by tests
"""Simple moving average derivative."""

samples_n_smad: int
running_samples_smad: List[float]
derivative_threshold_smad: float

Check warning on line 61 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L59-L61

Added lines #L59 - L61 were not covered by tests

def __init__(self, samples: int = 10, thresh: float = -2.5) -> None:

Check warning on line 63 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L63

Added line #L63 was not covered by tests
"""Init."""
self.samples_n_smad = samples
self.derivative_threshold_smad = thresh
self.reset()

Check warning on line 67 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L65-L67

Added lines #L65 - L67 were not covered by tests

@staticmethod
def name() -> str:

Check warning on line 70 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L69-L70

Added lines #L69 - L70 were not covered by tests
"""Name of this algorithm."""
return "simple moving avg der"

Check warning on line 72 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L72

Added line #L72 was not covered by tests

def reset(self) -> None:

Check warning on line 74 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L74

Added line #L74 was not covered by tests
"""Reset simulator between runs."""
self.running_samples_smad = [impossible_pressure] * self.samples_n_smad

Check warning on line 76 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L76

Added line #L76 was not covered by tests

def tick(self, pressure: float) -> Tuple[bool, float]:

Check warning on line 78 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L78

Added line #L78 was not covered by tests
"""Simulate firmware motor interrupt tick."""
try:
next_ind = self.running_samples_smad.index(impossible_pressure)

Check warning on line 81 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L80-L81

Added lines #L80 - L81 were not covered by tests
# if no exception we're still filling the minimum samples
self.running_samples_smad[next_ind] = pressure
return (False, impossible_pressure)
except ValueError: # the array has been filled
pass

Check warning on line 86 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L83-L86

Added lines #L83 - L86 were not covered by tests
# store old running average
prev_running_avg = sum(self.running_samples_smad) / self.samples_n_smad

Check warning on line 88 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L88

Added line #L88 was not covered by tests
# left shift old samples
for i in range(self.samples_n_smad - 1):
self.running_samples_smad[i] = self.running_samples_smad[i + 1]
self.running_samples_smad[self.samples_n_smad - 1] = pressure
new_running_avg = sum(self.running_samples_smad) / self.samples_n_smad
return (

Check warning on line 94 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L90-L94

Added lines #L90 - L94 were not covered by tests
(new_running_avg - prev_running_avg) < self.derivative_threshold_smad,
new_running_avg,
)

def reset_emad() -> None:
"""Reset simulator between runs."""
global current_average_emad
current_average_emad = impossible_pressure

class LLDWMAD(LLDAlgoABC):

Check warning on line 100 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L100

Added line #L100 was not covered by tests
"""Weighted moving average derivative."""

def tick_emad(pressure: float) -> Tuple[bool, float]:
"""Simulate firmware motor interrupt tick."""
global current_average_emad
if current_average_emad == impossible_pressure:
current_average_emad = pressure
return (False, impossible_pressure)
else:
new_average = (pressure * smoothing_factor) + (
current_average_emad * (1 - smoothing_factor)
samples_n_wmad: int
weights_wmad: numpy.ndarray[Any, numpy.dtype[numpy.float32]] = numpy.array(

Check warning on line 104 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L103-L104

Added lines #L103 - L104 were not covered by tests
[0.19, 0.17, 0.15, 0.13, 0.11, 0.09, 0.07, 0.05, 0.03, 0.01]
)
running_samples_wmad: numpy.ndarray[Any, numpy.dtype[numpy.float32]]
derivative_threshold_wmad: float

Check warning on line 108 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L107-L108

Added lines #L107 - L108 were not covered by tests

def __init__(self, samples: int = 10, thresh: float = -2) -> None:

Check warning on line 110 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L110

Added line #L110 was not covered by tests
"""Init."""
self.samples_n_wmad = samples
self.derivative_threshold_wmad = thresh
self.reset()

Check warning on line 114 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L112-L114

Added lines #L112 - L114 were not covered by tests

@staticmethod
def name() -> str:

Check warning on line 117 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L116-L117

Added lines #L116 - L117 were not covered by tests
"""Name of this algorithm."""
return "weighted moving avg der"

Check warning on line 119 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L119

Added line #L119 was not covered by tests

def reset(self) -> None:

Check warning on line 121 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L121

Added line #L121 was not covered by tests
"""Reset simulator between runs."""
assert numpy.sum(self.weights_wmad) == 1
self.running_samples_wmad = numpy.full(self.samples_n_wmad, impossible_pressure)

Check warning on line 124 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L123-L124

Added lines #L123 - L124 were not covered by tests

def tick(self, pressure: float) -> Tuple[bool, float]:

Check warning on line 126 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L126

Added line #L126 was not covered by tests
"""Simulate firmware motor interrupt tick."""
if numpy.isin(impossible_pressure, self.running_samples_wmad):
next_ind = numpy.where(self.running_samples_wmad == impossible_pressure)[0][

Check warning on line 129 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L128-L129

Added lines #L128 - L129 were not covered by tests
0
]
# if no exception we're still filling the minimum samples
self.running_samples_wmad[next_ind] = pressure
return (False, impossible_pressure)

Check warning on line 134 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L133-L134

Added lines #L133 - L134 were not covered by tests
# store old running average
prev_running_avg = numpy.sum(

Check warning on line 136 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L136

Added line #L136 was not covered by tests
numpy.multiply(self.running_samples_wmad, self.weights_wmad)
)
# left shift old samples
for i in range(self.samples_n_wmad - 1):
self.running_samples_wmad[i] = self.running_samples_wmad[i + 1]
self.running_samples_wmad[self.samples_n_wmad - 1] = pressure
new_running_avg = numpy.sum(

Check warning on line 143 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L140-L143

Added lines #L140 - L143 were not covered by tests
numpy.multiply(self.running_samples_wmad, self.weights_wmad)
)
derivative = new_average - current_average_emad
current_average_emad = new_average
return (derivative < derivative_threshold_emad, current_average_emad)
return (

Check warning on line 146 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L146

Added line #L146 was not covered by tests
(new_running_avg - prev_running_avg) < self.derivative_threshold_wmad,
new_running_avg,
)


class LLDEMAD(LLDAlgoABC):

Check warning on line 152 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L152

Added line #L152 was not covered by tests
"""Exponential moving average derivative."""

current_average_emad: float = impossible_pressure
smoothing_factor: float
derivative_threshold_emad: float

Check warning on line 157 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L155-L157

Added lines #L155 - L157 were not covered by tests

def __init__(self, s_factor: float = 0.1, thresh: float = -2.5) -> None:

Check warning on line 159 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L159

Added line #L159 was not covered by tests
"""Init."""
self.smoothing_factor = s_factor
self.derivative_threshold_emad = thresh
self.reset()

Check warning on line 163 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L161-L163

Added lines #L161 - L163 were not covered by tests

@staticmethod
def name() -> str:

Check warning on line 166 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L165-L166

Added lines #L165 - L166 were not covered by tests
"""Name of this algorithm."""
return "exponential moving avg der"

Check warning on line 168 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L168

Added line #L168 was not covered by tests

def reset(self) -> None:

Check warning on line 170 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L170

Added line #L170 was not covered by tests
"""Reset simulator between runs."""
self.current_average_emad = impossible_pressure

Check warning on line 172 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L172

Added line #L172 was not covered by tests

def tick(self, pressure: float) -> Tuple[bool, float]:

Check warning on line 174 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L174

Added line #L174 was not covered by tests
"""Simulate firmware motor interrupt tick."""
if self.current_average_emad == impossible_pressure:
self.current_average_emad = pressure
return (False, impossible_pressure)

Check warning on line 178 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L176-L178

Added lines #L176 - L178 were not covered by tests
else:
new_average = (pressure * self.smoothing_factor) + (

Check warning on line 180 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L180

Added line #L180 was not covered by tests
self.current_average_emad * (1 - self.smoothing_factor)
)
derivative = new_average - self.current_average_emad
self.current_average_emad = new_average
return (

Check warning on line 185 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L183-L185

Added lines #L183 - L185 were not covered by tests
derivative < self.derivative_threshold_emad,
self.current_average_emad,
)


def _running_avg(

Check warning on line 191 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L191

Added line #L191 was not covered by tests
Expand All @@ -126,19 +194,18 @@ def _running_avg(
z_travel: List[float],
p_travel: List[float],
no_plot: bool,
reset_func: Callable[[], None],
tick_func: Callable[[float], Tuple[bool, float]],
algorithm: LLDAlgoABC,
plot_name: str,
) -> Optional[Tuple[float, float, float]]:
reset_func()
algorithm.reset()
average = float(0)
running_time = []
running_derivative = []
running_avg = []
return_val = None
for i in range(1, len(time)):
prev_avg = average
found, average = tick_func(float(pressure[i]))
found, average = algorithm.tick(float(pressure[i]))
if found:

Check warning on line 209 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L200-L209

Added lines #L200 - L209 were not covered by tests
# if average < running_avg_threshold:
# print(f"found z height = {z_travel[i]}")
Expand Down Expand Up @@ -179,9 +246,7 @@ def _running_avg(

def run(

Check warning on line 247 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L247

Added line #L247 was not covered by tests
args: argparse.Namespace,
reset_func: Callable[[], None],
tick_func: Callable[[float], Tuple[bool, float]],
name: str,
algorithm: LLDAlgoABC,
) -> None:
"""Run the test with a given algorithm on all the data."""
path = args.filepath + "/"
Expand Down Expand Up @@ -229,9 +294,8 @@ def run(
z_travel,
p_travel,
args.no_plot,
reset_func,
tick_func,
f"{name} trial: {trial+1}",
algorithm,
f"{algorithm.name()} trial: {trial+1}",
)
if threshold_data:

Check warning on line 300 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L300

Added line #L300 was not covered by tests
# threshold_time = threshold_data[0]
Expand Down Expand Up @@ -265,15 +329,15 @@ def main() -> None:
parser.add_argument("--no-plot", action="store_true")
args = parser.parse_args()

Check warning on line 330 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L329-L330

Added lines #L329 - L330 were not covered by tests

function_pairs = [
("threshold", reset_th, tick_th),
("simple moving avg der", reset_smad, tick_smad),
("weighted moving avg der", reset_wmad, tick_wmad),
("exponential moving avg der", reset_emad, tick_emad),
algorithms: List[LLDAlgoABC] = [

Check warning on line 332 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L332

Added line #L332 was not covered by tests
LLDPresThresh(),
LLDSMAD(),
LLDWMAD(),
LLDEMAD(),
]
for name, reset_func, tick_func in function_pairs:
print(f"Algorithm {name}")
run(args, reset_func, tick_func, name)
for algorithm in algorithms:
print(f"Algorithm {algorithm.name()}")
run(args, algorithm)

Check warning on line 340 in hardware/opentrons_hardware/scripts/lld_data_script.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/lld_data_script.py#L338-L340

Added lines #L338 - L340 were not covered by tests


if __name__ == "__main__":
Expand Down

0 comments on commit 4b64ba9

Please sign in to comment.