-
Notifications
You must be signed in to change notification settings - Fork 4
/
caching.py
197 lines (150 loc) · 6.07 KB
/
caching.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import collections
import collections.abc
import datetime
import hashlib
import os
import pickle
import threading
import cachetools.keys
own_dir = os.path.abspath(os.path.dirname(__file__))
default_cache_dir = os.path.join(own_dir, '.cache', 'dora')
class FilesystemCache:
'''
Base class which implements a basic filesytem cache using pickle. This implementation does _not_
take care of clearing the cache, e.g. if it reaches a certain size.
'''
def __getitem__(self, filepath: str):
if os.path.exists(filepath):
return pickle.load(open(filepath, 'rb'))
raise self.__missing__(filepath)
def __setitem__(self, filepath: str, value):
cache_dir = os.path.dirname(filepath)
if not os.path.isdir(cache_dir):
os.makedirs(name=cache_dir, exist_ok=True)
pickle.dump(value, open(filepath, 'wb'))
def __missing__(self, filepath: str):
raise KeyError(filepath)
class LFUFilesystemCache(FilesystemCache):
'''
Implements a Least-Frequently-Used filesystem cache. If `max_total_size_mib` is reached, the
least frequently used items are removed from the cache accordingly until enough space is
available again to store new items.
@param max_total_size_mib:
the maximum allowed total cache size in MiB, if `None`, LFU cache clearing is disabled
'''
def __init__(self, max_total_size_mib: int | None=None):
# convert MiB -> bytes
self._max_total_size = max_total_size_mib * 1024 * 1024 if max_total_size_mib else None
self._item_sizes = {}
self._total_size = 0
self._ref_counters = collections.Counter()
self._item_sizes_lock = threading.Lock()
self._ref_counters_lock = threading.Lock()
def __getitem__(self, filepath: str):
item = super().__getitem__(filepath)
if self._max_total_size:
with self._ref_counters_lock:
self._ref_counters[filepath] -= 1
return item
def __setitem__(self, filepath: str, value):
if not self._max_total_size:
return super().__setitem__(filepath, value)
cache_dir = os.path.dirname(filepath)
if not os.path.isdir(cache_dir):
os.makedirs(name=cache_dir, exist_ok=True)
pickled_value = pickle.dumps(value)
item_size = len(pickled_value)
if item_size > self._max_total_size:
raise ValueError(f'value too large ({item_size=})')
while self._total_size + item_size > self._max_total_size:
self.pop_item()
with self._item_sizes_lock:
self._item_sizes[filepath] = item_size
self._total_size += item_size
with self._ref_counters_lock:
self._ref_counters[filepath] -= 1
with open(filepath, 'wb') as f:
f.write(pickled_value)
def pop_item(self):
with self._ref_counters_lock:
((filepath, _),) = self._ref_counters.most_common(1)
value = self.__getitem__(filepath)
try:
os.remove(filepath)
except OSError:
pass
with self._item_sizes_lock:
self._total_size -= self._item_sizes[filepath]
del self._item_sizes[filepath]
with self._ref_counters_lock:
self._ref_counters.pop(filepath)
return (filepath, value)
class TTLFilesystemCache(LFUFilesystemCache):
'''
Implements a Time-To-Live filesystem cache. If an item is older than `ttl`, it is removed from
the cache. If `max_total_size_mib` is reached, the least frequently used items are removed from
the cache accordingly until enough space is available again to store new items.
@param ttl:
the maximum allowed time a cache item is valid in seconds
@param max_total_size_mib:
the maximum allowed total cache size in MiB, if `None`, LFU cache clearing is disabled
'''
def __init__(self, ttl: int, max_total_size_mib: int):
super().__init__(max_total_size_mib)
self._ttl = ttl
def __getitem__(self, filepath: str):
if os.path.exists(filepath):
modified_on = datetime.datetime.fromtimestamp(os.path.getmtime(filepath))
age_seconds = datetime.datetime.now() - modified_on
if age_seconds.total_seconds() < self._ttl:
return super().__getitem__(filepath)
return self.__missing__(filepath)
def __setitem__(self, filepath: str, value):
super().__setitem__(filepath, value)
def cached(
cache: FilesystemCache,
key_func: collections.abc.Callable=cachetools.keys.hashkey,
cache_dir: str=default_cache_dir,
):
'''
Decorator to wrap a function with a callable that saves results to a defined `FilesystemCache`.
'''
def decorator(func):
def wrapper(*args, **kwargs):
key = hashlib.sha1(usedforsecurity=False)
for key_part in key_func(*args, **kwargs):
key.update(str(key_part).encode('utf-8'))
filepath = os.path.join(cache_dir, key.hexdigest())
try:
return cache[filepath]
except KeyError:
pass
result = func(*args, **kwargs)
cache[filepath] = result
return result
return wrapper
return decorator
def async_cached(
cache: FilesystemCache,
key_func: collections.abc.Callable=cachetools.keys.hashkey,
cache_dir: str=default_cache_dir,
):
'''
Decorator to wrap an async function with a callable that saves results to a defined
`FilesystemCache`.
'''
def decorator(func):
async def wrapper(*args, **kwargs):
key = hashlib.sha1(usedforsecurity=False)
for key_part in key_func(*args, **kwargs):
key.update(str(key_part).encode('utf-8'))
filepath = os.path.join(cache_dir, key.hexdigest())
try:
return cache[filepath]
except KeyError:
pass
result = await func(*args, **kwargs)
cache[filepath] = result
return result
return wrapper
return decorator