Skip to content

Commit

Permalink
fix(robot-server): maintain correct order of protocol analyses (#14762)
Browse files Browse the repository at this point in the history
Closes AUTH-229

# Overview

Updates the `/protocols` endpoints to always maintain the order of list
of analyses as most-recently-started-analysis last, making sure to
verify if a new analysis needs to be triggered because of new
run-time-parameter values for a previously uploaded protocol.

# Risk assessment

Medium. Does database update and fixes the analysis order that was
broken by #14688

---------

Co-authored-by: Max Marrone <[email protected]>
  • Loading branch information
sanni-t and SyntaxColoring authored Apr 3, 2024
1 parent 0f7d1ff commit 3d34031
Show file tree
Hide file tree
Showing 19 changed files with 1,331 additions and 73 deletions.
1 change: 1 addition & 0 deletions api/src/opentrons/protocol_engine/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,7 @@ def from_hw_state(cls, state: HwTipStateType) -> "TipPresenceStatus":
}[state]


# TODO (spp, 2024-04-02): move all RTP types to runner
class RTPBase(BaseModel):
"""Parameters defined in a protocol."""

Expand Down
52 changes: 52 additions & 0 deletions robot-server/robot_server/persistence/_migrations/v3_to_v4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Migrate the persistence directory from schema 3 to 4.
Summary of changes from schema 3:
- Adds a new "run_time_parameter_values_and_defaults" column to analysis table
"""

from pathlib import Path
from contextlib import ExitStack
import shutil
from typing import Any

import sqlalchemy

from ..database import sql_engine_ctx
from ..tables import schema_4
from .._folder_migrator import Migration

_DB_FILE = "robot_server.db"


class Migration3to4(Migration): # noqa: D101
def migrate(self, source_dir: Path, dest_dir: Path) -> None:
"""Migrate the persistence directory from schema 3 to 4."""
# Copy over all existing directories and files to new version
for item in source_dir.iterdir():
if item.is_dir():
shutil.copytree(src=item, dst=dest_dir / item.name)
else:
shutil.copy(src=item, dst=dest_dir / item.name)
dest_db_file = dest_dir / _DB_FILE

# Append the new column to existing analyses in v4 database
with ExitStack() as exit_stack:
dest_engine = exit_stack.enter_context(sql_engine_ctx(dest_db_file))
schema_4.metadata.create_all(dest_engine)

def add_column(
engine: sqlalchemy.engine.Engine,
table_name: str,
column: Any,
) -> None:
column_type = column.type.compile(engine.dialect)
engine.execute(
f"ALTER TABLE {table_name} ADD COLUMN {column.key} {column_type}"
)

add_column(
dest_engine,
schema_4.analysis_table.name,
schema_4.analysis_table.c.run_time_parameter_values_and_defaults,
)
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from anyio import Path as AsyncPath, to_thread

from ._folder_migrator import MigrationOrchestrator
from ._migrations import up_to_3
from ._migrations import up_to_3, v3_to_v4


_TEMP_PERSISTENCE_DIR_PREFIX: Final = "opentrons-robot-server-"
Expand Down Expand Up @@ -48,7 +48,10 @@ async def prepare_active_subdirectory(prepared_root: Path) -> Path:
"""Return the active persistence subdirectory after preparing it, if necessary."""
migration_orchestrator = MigrationOrchestrator(
root=prepared_root,
migrations=[up_to_3.MigrationUpTo3(subdirectory="3")],
migrations=[
up_to_3.MigrationUpTo3(subdirectory="3"),
v3_to_v4.Migration3to4(subdirectory="4"),
],
temp_file_prefix="temp-",
)

Expand Down
2 changes: 1 addition & 1 deletion robot-server/robot_server/persistence/tables/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""SQL database schemas."""

# Re-export the latest schema.
from .schema_3 import (
from .schema_4 import (
metadata,
protocol_table,
analysis_table,
Expand Down
130 changes: 130 additions & 0 deletions robot-server/robot_server/persistence/tables/schema_4.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""v4 of our SQLite schema."""

import sqlalchemy

from robot_server.persistence._utc_datetime import UTCDateTime

metadata = sqlalchemy.MetaData()

protocol_table = sqlalchemy.Table(
"protocol",
metadata,
sqlalchemy.Column(
"id",
sqlalchemy.String,
primary_key=True,
),
sqlalchemy.Column(
"created_at",
UTCDateTime,
nullable=False,
),
sqlalchemy.Column("protocol_key", sqlalchemy.String, nullable=True),
)

analysis_table = sqlalchemy.Table(
"analysis",
metadata,
sqlalchemy.Column(
"id",
sqlalchemy.String,
primary_key=True,
),
sqlalchemy.Column(
"protocol_id",
sqlalchemy.String,
sqlalchemy.ForeignKey("protocol.id"),
index=True,
nullable=False,
),
sqlalchemy.Column(
"analyzer_version",
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"completed_analysis",
# Stores a JSON string. See CompletedAnalysisStore.
sqlalchemy.String,
nullable=False,
),
# column added in schema v4
sqlalchemy.Column(
"run_time_parameter_values_and_defaults",
sqlalchemy.String,
nullable=True,
),
)

run_table = sqlalchemy.Table(
"run",
metadata,
sqlalchemy.Column(
"id",
sqlalchemy.String,
primary_key=True,
),
sqlalchemy.Column(
"created_at",
UTCDateTime,
nullable=False,
),
sqlalchemy.Column(
"protocol_id",
sqlalchemy.String,
sqlalchemy.ForeignKey("protocol.id"),
nullable=True,
),
# column added in schema v1
sqlalchemy.Column(
"state_summary",
sqlalchemy.String,
nullable=True,
),
# column added in schema v1
sqlalchemy.Column("engine_status", sqlalchemy.String, nullable=True),
# column added in schema v1
sqlalchemy.Column("_updated_at", UTCDateTime, nullable=True),
)

action_table = sqlalchemy.Table(
"action",
metadata,
sqlalchemy.Column(
"id",
sqlalchemy.String,
primary_key=True,
),
sqlalchemy.Column("created_at", UTCDateTime, nullable=False),
sqlalchemy.Column("action_type", sqlalchemy.String, nullable=False),
sqlalchemy.Column(
"run_id",
sqlalchemy.String,
sqlalchemy.ForeignKey("run.id"),
nullable=False,
),
)

run_command_table = sqlalchemy.Table(
"run_command",
metadata,
sqlalchemy.Column("row_id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column(
"run_id", sqlalchemy.String, sqlalchemy.ForeignKey("run.id"), nullable=False
),
sqlalchemy.Column("index_in_run", sqlalchemy.Integer, nullable=False),
sqlalchemy.Column("command_id", sqlalchemy.String, nullable=False),
sqlalchemy.Column("command", sqlalchemy.String, nullable=False),
sqlalchemy.Index(
"ix_run_run_id_command_id", # An arbitrary name for the index.
"run_id",
"command_id",
unique=True,
),
sqlalchemy.Index(
"ix_run_run_id_index_in_run", # An arbitrary name for the index.
"run_id",
"index_in_run",
unique=True,
),
)
9 changes: 8 additions & 1 deletion robot-server/robot_server/protocols/analysis_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opentrons.protocol_engine.types import RunTimeParameter
from opentrons_shared_data.robot.dev_types import RobotType
from pydantic import BaseModel, Field
from typing import List, Optional, Union
from typing import List, Optional, Union, NamedTuple
from typing_extensions import Literal

from opentrons.protocol_engine import (
Expand Down Expand Up @@ -150,4 +150,11 @@ class CompletedAnalysis(BaseModel):
)


class RunTimeParameterAnalysisData(NamedTuple):
"""Data from analysis of a run-time parameter."""

value: Union[float, bool, str]
default: Union[float, bool, str]


ProtocolAnalysis = Union[PendingAnalysis, CompletedAnalysis]
103 changes: 101 additions & 2 deletions robot-server/robot_server/protocols/analysis_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
LoadedModule,
Liquid,
)
from opentrons.protocol_engine.types import RunTimeParamValuesType

from .analysis_models import (
AnalysisSummary,
Expand All @@ -27,6 +28,7 @@
CompletedAnalysis,
AnalysisResult,
AnalysisStatus,
RunTimeParameterAnalysisData,
)

from .completed_analysis_store import CompletedAnalysisStore, CompletedAnalysisResource
Expand Down Expand Up @@ -71,6 +73,14 @@ def __init__(self, analysis_id: str) -> None:
super().__init__(f'Analysis "{analysis_id}" not found.')


class AnalysisIsPendingError(RuntimeError):
"""Exception raised if a given analysis is still pending."""

def __init__(self, analysis_id: str) -> None:
"""Initialize the error's message."""
super().__init__(f'Analysis "{analysis_id}" is still pending.')


# TODO(sf, 2023-05-05): Like for protocols and runs, there's an in-memory cache for
# elements of this store. Unlike for protocols and runs, it isn't just an lru_cache
# on the top-level store's access methods, because those access methods have to be
Expand All @@ -93,10 +103,14 @@ class AnalysisStore:
so they're only kept in-memory, and lost when the store instance is destroyed.
"""

def __init__(self, sql_engine: sqlalchemy.engine.Engine) -> None:
def __init__(
self,
sql_engine: sqlalchemy.engine.Engine,
completed_store: Optional[CompletedAnalysisStore] = None,
) -> None:
"""Initialize the `AnalysisStore`."""
self._pending_store = _PendingAnalysisStore()
self._completed_store = CompletedAnalysisStore(
self._completed_store = completed_store or CompletedAnalysisStore(
sql_engine=sql_engine,
memory_cache=MemoryCache(_CACHE_MAX_SIZE, str, CompletedAnalysisResource),
current_analyzer_version=_CURRENT_ANALYZER_VERSION,
Expand Down Expand Up @@ -180,6 +194,9 @@ async def update(
protocol_id=protocol_id,
analyzer_version=_CURRENT_ANALYZER_VERSION,
completed_analysis=completed_analysis,
run_time_parameter_values_and_defaults=self._extract_run_time_param_values_and_defaults(
completed_analysis
),
)
await self._completed_store.add(
completed_analysis_resource=completed_analysis_resource
Expand Down Expand Up @@ -258,6 +275,88 @@ async def get_by_protocol(self, protocol_id: str) -> List[ProtocolAnalysis]:
else:
return completed_analyses + [pending_analysis]

@staticmethod
def _extract_run_time_param_values_and_defaults(
completed_analysis: CompletedAnalysis,
) -> Dict[str, RunTimeParameterAnalysisData]:
"""Extract the Run Time Parameters with current value and default value of each.
We do this in order to save the RTP data separately, outside the analysis
in the database. This saves us from having to de-serialize the entire analysis
to read just the RTP values.
"""
rtp_list = completed_analysis.runTimeParameters

rtp_values_and_defaults = {}
for param_spec in rtp_list:
rtp_values_and_defaults.update(
{
param_spec.variableName: RunTimeParameterAnalysisData(
value=param_spec.value, default=param_spec.default
)
}
)
return rtp_values_and_defaults

async def matching_rtp_values_in_analysis(
self, analysis_summary: AnalysisSummary, new_rtp_values: RunTimeParamValuesType
) -> bool:
"""Return whether the last analysis of the given protocol used the mentioned RTP values.
It is not sufficient to just check the values of provided parameters against the
corresponding parameter values in analysis because a previous request could have
composed of some extra parameters that are not in the current list.
Similarly, it is not enough to only compare the current parameter values from
the client with the previous values from the client because a previous param
might have been assigned a default value by the client while the current request
doesn't include that param because it can rely on the API to assign the default
value to that param.
So, we check that the Run Time Parameters in the previous analysis has params
with the values provided in the current request, and also verify that rest of the
parameters in the analysis use default values.
"""
if analysis_summary.status == AnalysisStatus.PENDING:
raise AnalysisIsPendingError(analysis_summary.id)

rtp_values_and_defaults_in_last_analysis = (
await self._completed_store.get_rtp_values_and_defaults_by_analysis_id(
analysis_summary.id
)
)
# We already make sure that the protocol has an analysis associated with before
# checking the RTP values so this assert should never raise.
# It is only added for type checking.
assert (
rtp_values_and_defaults_in_last_analysis is not None
), "This protocol has no analysis associated with it."

if not set(new_rtp_values.keys()).issubset(
set(rtp_values_and_defaults_in_last_analysis.keys())
):
# Since the RTP keys in analysis represent all params defined in the protocol,
# if the client passes a parameter that's not present in the analysis,
# it means that the client is sending incorrect parameters.
# We will let this request trigger an analysis using the incorrect params
# and have the analysis raise an appropriate error instead of giving an
# error response to the protocols request.
# This makes the behavior of robot server consistent regardless of whether
# the client is sending a protocol for the first time or for the nth time.
return False
for (
parameter,
prev_value_and_default,
) in rtp_values_and_defaults_in_last_analysis.items():
if (
new_rtp_values.get(parameter, prev_value_and_default.default)
== prev_value_and_default.value
):
continue
else:
return False
return True


class _PendingAnalysisStore:
"""An in-memory store of protocol analyses that are pending.
Expand Down
Loading

0 comments on commit 3d34031

Please sign in to comment.