-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
102 lines (78 loc) · 3.13 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import logging
import logging.config
import yaml
from datetime import datetime, timedelta
from lib.api import DiveraConnector
from lib.config import Config
from lib.exceptions import ValidationError, ReadTimeout
from lib.parser import Parser
from lib.reader import Boss925Reader
LOGGING_CONF = "conf/logging.yml"
PROJECT_CONF = "conf/config.yml"
with open(LOGGING_CONF, "r") as logging_conf_file:
logging_conf = yaml.safe_load(logging_conf_file.read())
logging.config.dictConfig(logging_conf)
log = logging.getLogger("main")
class AlertController:
def __init__(self, config):
self.config = config
self.divera = DiveraConnector(config)
self.reader = Boss925Reader(config)
self.parser = Parser(config)
self._last_ping = datetime.now()
self._last_ping_alarmed = False
def run(self) -> None:
"""Main loop"""
log.info("Listening for alarms ...")
while True:
try:
self.reader.read_line()
except ReadTimeout:
self._process_timeout()
else:
if self.reader.is_complete():
self._process_transmission(self.reader.get_transmission())
self.reader.reset_store()
def _process_transmission(self, transmission: dict) -> None:
"""Process an incoming transmission after it was received completely"""
if self.reader.is_ping():
self._process_ping()
else:
log.info("Incoming alarm detected!")
log.info(" RIC = %s%s", transmission["ric"], transmission["subric"])
log.info(" MSG = %s...", transmission["message"][:64])
rics = self.config.get_rics(transmission["ric"])
data = self.parser.parse_transmission(transmission)
self.divera.alert(rics, data)
def _process_timeout(self) -> None:
log.debug("Read timeout ...")
if (datetime.now() - self._last_ping) > timedelta(minutes=15):
delay = datetime.now() - self._last_ping
log.error("Ping delay too high! Delay is %s", delay)
if not self._last_ping_alarmed:
self._last_ping_alarmed = True
rics = self.config.get_rics("ping")
data = {
"title": "Keine Meldungen!",
"text": f"Keine Rückmeldung vom DME!\n\nLetzter Ping: {self._last_ping}\nDelay: {delay}",
"priority": False,
}
self.divera.alert(rics, data)
def _process_ping(self) -> None:
delay = datetime.now() - self._last_ping
log.debug("Received ping (Delay was %s)", delay)
self._last_ping = datetime.now()
self._last_ping_alarmed = False
if __name__ == "__main__":
try:
config = Config.from_file(filename=PROJECT_CONF)
controller = AlertController(config)
controller.run()
except ValidationError as e:
log.error("Config is not valid!")
log.error(e)
except NotImplementedError:
log.error("Parser not compatible!")
except KeyboardInterrupt:
pass
log.info("Shutting down ...")