-
Notifications
You must be signed in to change notification settings - Fork 4
/
ctx_util.py
69 lines (53 loc) · 1.76 KB
/
ctx_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import functools
import json
import os
import threading
import watchdog.events
import watchdog.observers.polling
import ci.util
import model
def _cfg_factory_from_secret(path: str) -> model.ConfigFactory:
path = ci.util.existing_file(path)
with open(path, 'rb') as file:
return model.ConfigFactory.from_dict(json.loads(file.read()))
class FileChangeEventHandler(watchdog.events.FileSystemEventHandler):
def dispatch(self, event):
# Clear cache so that the next time the cfg factory is needed it is
# created using the new cfg
cfg_factory.cache_clear()
@functools.cache
def watch_for_file_changes(
path: str,
event_handler: FileChangeEventHandler=None,
):
if not event_handler:
event_handler = FileChangeEventHandler()
observer = watchdog.observers.polling.PollingObserver(timeout=60)
observer.schedule(event_handler, path)
observer.start()
class RepeatTimer(threading.Timer):
def run(self):
while not self.finished.wait(self.interval):
self.function(
*self.args,
**self.kwargs,
)
def refresh_periodically(
refresh_interval_seconds: float,
):
timer = RepeatTimer(
refresh_interval_seconds,
cfg_factory.cache_clear,
)
timer.daemon = True
timer.start()
@functools.cache
def cfg_factory() -> model.ConfigFactory:
# cfg factory creation from k8s secret
if path := os.environ.get('CFG_FACTORY_SECRET_PATH'):
watch_for_file_changes(path)
return _cfg_factory_from_secret(path)
# fallback to default cfg factory creation
refresh_interval_seconds = 60 * 60 * 12 # 12h
refresh_periodically(refresh_interval_seconds=refresh_interval_seconds)
return ci.util.ctx().cfg_factory()