Skip to content

Commit

Permalink
health endpoint checks model health
Browse files Browse the repository at this point in the history
  • Loading branch information
cdzombak committed Sep 25, 2024
1 parent 7f44552 commit ac13627
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 7 deletions.
1 change: 1 addition & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def config_from_file(
"will not work with web interface or iOS app! consider "
"using HTTPS for web.external_base_url."
)
cfg.web.liveness_tick_s = cfg.model.liveness_tick_s

# health:
cfg.health_pinger.req_timeout_s = int(cfg.model.liveness_tick_s - 1.0)
Expand Down
11 changes: 10 additions & 1 deletion health.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import multiprocessing
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

import requests

Expand All @@ -10,7 +12,8 @@

@dataclass
class HealthPing:
url: str
url: Optional[str]
at_t: datetime


@dataclass
Expand All @@ -24,17 +27,23 @@ def __init__(
self,
config: HealthPingerConfig,
input_queue: multiprocessing.Queue, # of HealthPing
share_ns,
):
self._config = config
self._input_queue = input_queue
self._share_ns = share_ns

def _run(self):
logger = logging.getLogger(__name__)
logging.basicConfig(level=self._config.log_level, format=LOG_DEFAULT_FMT)
self._share_ns.last_health_ping_at = None
logger.info("starting health pinger")

while True:
p: HealthPing = self._input_queue.get()
self._share_ns.last_health_ping_at = p.at_t
if not p.url:
continue
logger.debug(f"processing ping request for {p.url}")
try:
requests.get(p.url, timeout=self._config.req_timeout_s)
Expand Down
12 changes: 10 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ def main():
ntfy_web_share_manager = multiprocessing.Manager()
ntfy_web_records_dict = ntfy_web_share_manager.dict()
ntfy_web_share_ns = ntfy_web_share_manager.Namespace()
health_share_manager = multiprocessing.Manager()
health_share_ns = health_share_manager.Namespace()

model = PredModel(args.video, config.model, tracks_queue, health_ping_queue)
model_proc = multiprocessing.Process(target=model.run, args=(exit_queue,))
Expand All @@ -71,12 +73,18 @@ def main():
ntfy_web_records_dict,
)
notifier_proc = multiprocessing.Process(target=notifier.run, args=(exit_queue,))
health_pinger = HealthPinger(config.health_pinger, health_ping_queue)
health_pinger = HealthPinger(
config.health_pinger, health_ping_queue, health_share_ns
)
health_pinger_proc = multiprocessing.Process(
target=health_pinger.run, args=(exit_queue,)
)
ws = WebServer(
config.web, ntfy_web_share_ns, ntfy_web_records_dict, notifications_queue
config.web,
ntfy_web_share_ns,
ntfy_web_records_dict,
notifications_queue,
health_share_ns,
)
ws_proc = multiprocessing.Process(target=ws.run, args=(exit_queue,))

Expand Down
8 changes: 5 additions & 3 deletions track.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,10 +300,12 @@ def _run_capture_loop(self, dev, half, logger, model):
)
last_liveness_tick_at = utcnow
frames_since_last_liveness_tick = 0
if self._config.healthcheck_ping_url:
self._health_ping_queue.put_nowait(
HealthPing(self._config.healthcheck_ping_url)
self._health_ping_queue.put_nowait(
HealthPing(
at_t=utcnow,
url=self._config.healthcheck_ping_url,
)
)
else:
logger.info(f"video source {self._in_fname} ended")
raise VideoEnded
Expand Down
30 changes: 29 additions & 1 deletion web.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import multiprocessing
import os
from dataclasses import dataclass
from typing import Optional, Dict
from typing import Optional, Dict, Final

import waitress
from flask import Flask, jsonify, request, make_response
Expand All @@ -20,6 +20,7 @@ class WebConfig:
log_level: Optional[int] = logging.INFO
port: int = 5550
bind_to: str = "*"
liveness_tick_s: float = 30.0


class WebServer(lib_mpex.ChildProcess):
Expand All @@ -29,24 +30,51 @@ def __init__(
ntfy_share_ns,
ntfy_records: Dict[str, NtfyRecord],
ntfy_queue: multiprocessing.Queue,
health_share_ns,
):
self._config = config
self._ntfy_share_ns = ntfy_share_ns
self._ntfy_records = ntfy_records
self._ntfy_queue = ntfy_queue
self._health_share_ns = health_share_ns

def _run(self):
logging.getLogger("waitress").setLevel(self._config.log_level + 10)
logger = logging.getLogger(__name__)
logging.basicConfig(level=self._config.log_level, format=LOG_DEFAULT_FMT)
logger.info("configuring web server")

unhealthy_t: Final = datetime.timedelta(
seconds=2 * self._config.liveness_tick_s
)

app = Flask(__name__)
CORS(app)

@app.route("/health", methods=["GET"])
def health():
logger.debug(f"{request.remote_addr} {request.method} {request.path}")

if not self._health_share_ns.last_health_ping_at:
return jsonify(
{
"error": "no model health pings yet",
"status": "unhealthy",
}
), 503

if (
datetime.datetime.now(datetime.UTC)
- self._health_share_ns.last_health_ping_at
>= unhealthy_t
):
return jsonify(
{
"error": f"no model health pings in {unhealthy_t}",
"status": "unhealthy",
}
), 503

return jsonify({"status": "ok"})

@app.route("/mute", methods=["POST"])
Expand Down

0 comments on commit ac13627

Please sign in to comment.