-
Notifications
You must be signed in to change notification settings - Fork 1
/
health.py
51 lines (41 loc) · 1.33 KB
/
health.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import logging
import multiprocessing
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import requests
import lib_mpex
from log import LOG_DEFAULT_FMT
@dataclass
class HealthPing:
url: Optional[str]
at_t: datetime
@dataclass
class HealthPingerConfig:
log_level: int = logging.INFO
req_timeout_s: int = 30
class HealthPinger(lib_mpex.ChildProcess):
def __init__(
self,
config: HealthPingerConfig,
input_queue: multiprocessing.Queue, # of HealthPing
share_ns,
):
self._config = config
self._input_queue = input_queue
self._share_ns = share_ns
def _run(self):
logger = logging.getLogger(__name__)
logging.basicConfig(level=self._config.log_level, format=LOG_DEFAULT_FMT)
self._share_ns.last_health_ping_at = None
logger.info("starting health pinger")
while True:
p: HealthPing = self._input_queue.get()
self._share_ns.last_health_ping_at = p.at_t
if not p.url:
continue
logger.debug(f"processing ping request for {p.url}")
try:
requests.get(p.url, timeout=self._config.req_timeout_s)
except requests.RequestException as e:
logger.error(f"error pinging {p.url}: {e}")