From ac1362746db5d3535ad28acb1cc7dc9150de19a7 Mon Sep 17 00:00:00 2001 From: Chris Dzombak Date: Wed, 25 Sep 2024 10:09:03 -0400 Subject: [PATCH] health endpoint checks model health --- config.py | 1 + health.py | 11 ++++++++++- main.py | 12 ++++++++++-- track.py | 8 +++++--- web.py | 30 +++++++++++++++++++++++++++++- 5 files changed, 55 insertions(+), 7 deletions(-) diff --git a/config.py b/config.py index 2d2e8f8..df661f4 100644 --- a/config.py +++ b/config.py @@ -236,6 +236,7 @@ def config_from_file( "will not work with web interface or iOS app! consider " "using HTTPS for web.external_base_url." ) + cfg.web.liveness_tick_s = cfg.model.liveness_tick_s # health: cfg.health_pinger.req_timeout_s = int(cfg.model.liveness_tick_s - 1.0) diff --git a/health.py b/health.py index 5fe5d8a..434ea09 100644 --- a/health.py +++ b/health.py @@ -1,6 +1,8 @@ import logging import multiprocessing from dataclasses import dataclass +from datetime import datetime +from typing import Optional import requests @@ -10,7 +12,8 @@ @dataclass class HealthPing: - url: str + url: Optional[str] + at_t: datetime @dataclass @@ -24,17 +27,23 @@ def __init__( self, config: HealthPingerConfig, input_queue: multiprocessing.Queue, # of HealthPing + share_ns, ): self._config = config self._input_queue = input_queue + self._share_ns = share_ns def _run(self): logger = logging.getLogger(__name__) logging.basicConfig(level=self._config.log_level, format=LOG_DEFAULT_FMT) + self._share_ns.last_health_ping_at = None logger.info("starting health pinger") while True: p: HealthPing = self._input_queue.get() + self._share_ns.last_health_ping_at = p.at_t + if not p.url: + continue logger.debug(f"processing ping request for {p.url}") try: requests.get(p.url, timeout=self._config.req_timeout_s) diff --git a/main.py b/main.py index 89a682b..2874b9b 100644 --- a/main.py +++ b/main.py @@ -54,6 +54,8 @@ def main(): ntfy_web_share_manager = multiprocessing.Manager() ntfy_web_records_dict = ntfy_web_share_manager.dict() ntfy_web_share_ns = ntfy_web_share_manager.Namespace() + health_share_manager = multiprocessing.Manager() + health_share_ns = health_share_manager.Namespace() model = PredModel(args.video, config.model, tracks_queue, health_ping_queue) model_proc = multiprocessing.Process(target=model.run, args=(exit_queue,)) @@ -71,12 +73,18 @@ def main(): ntfy_web_records_dict, ) notifier_proc = multiprocessing.Process(target=notifier.run, args=(exit_queue,)) - health_pinger = HealthPinger(config.health_pinger, health_ping_queue) + health_pinger = HealthPinger( + config.health_pinger, health_ping_queue, health_share_ns + ) health_pinger_proc = multiprocessing.Process( target=health_pinger.run, args=(exit_queue,) ) ws = WebServer( - config.web, ntfy_web_share_ns, ntfy_web_records_dict, notifications_queue + config.web, + ntfy_web_share_ns, + ntfy_web_records_dict, + notifications_queue, + health_share_ns, ) ws_proc = multiprocessing.Process(target=ws.run, args=(exit_queue,)) diff --git a/track.py b/track.py index e2b2279..833cce1 100644 --- a/track.py +++ b/track.py @@ -300,10 +300,12 @@ def _run_capture_loop(self, dev, half, logger, model): ) last_liveness_tick_at = utcnow frames_since_last_liveness_tick = 0 - if self._config.healthcheck_ping_url: - self._health_ping_queue.put_nowait( - HealthPing(self._config.healthcheck_ping_url) + self._health_ping_queue.put_nowait( + HealthPing( + at_t=utcnow, + url=self._config.healthcheck_ping_url, ) + ) else: logger.info(f"video source {self._in_fname} ended") raise VideoEnded diff --git a/web.py b/web.py index 095a91c..45befa6 100644 --- a/web.py +++ b/web.py @@ -4,7 +4,7 @@ import multiprocessing import os from dataclasses import dataclass -from typing import Optional, Dict +from typing import Optional, Dict, Final import waitress from flask import Flask, jsonify, request, make_response @@ -20,6 +20,7 @@ class WebConfig: log_level: Optional[int] = logging.INFO port: int = 5550 bind_to: str = "*" + liveness_tick_s: float = 30.0 class WebServer(lib_mpex.ChildProcess): @@ -29,11 +30,13 @@ def __init__( ntfy_share_ns, ntfy_records: Dict[str, NtfyRecord], ntfy_queue: multiprocessing.Queue, + health_share_ns, ): self._config = config self._ntfy_share_ns = ntfy_share_ns self._ntfy_records = ntfy_records self._ntfy_queue = ntfy_queue + self._health_share_ns = health_share_ns def _run(self): logging.getLogger("waitress").setLevel(self._config.log_level + 10) @@ -41,12 +44,37 @@ def _run(self): logging.basicConfig(level=self._config.log_level, format=LOG_DEFAULT_FMT) logger.info("configuring web server") + unhealthy_t: Final = datetime.timedelta( + seconds=2 * self._config.liveness_tick_s + ) + app = Flask(__name__) CORS(app) @app.route("/health", methods=["GET"]) def health(): logger.debug(f"{request.remote_addr} {request.method} {request.path}") + + if not self._health_share_ns.last_health_ping_at: + return jsonify( + { + "error": "no model health pings yet", + "status": "unhealthy", + } + ), 503 + + if ( + datetime.datetime.now(datetime.UTC) + - self._health_share_ns.last_health_ping_at + >= unhealthy_t + ): + return jsonify( + { + "error": f"no model health pings in {unhealthy_t}", + "status": "unhealthy", + } + ), 503 + return jsonify({"status": "ok"}) @app.route("/mute", methods=["POST"])