-
Notifications
You must be signed in to change notification settings - Fork 4
/
caches.py
295 lines (251 loc) · 9.61 KB
/
caches.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
"""caches module
This module contains functions and classes to help with accessing and upating
the prediction cache. This cache holds the previously requested prediction
results.
"""
import abc
import errors
import json
import logger
import threading as t
from utils import get_arg
try:
import http.client as httplib
except ImportError:
import httplib
def factory():
"""Method to return a concrete instance of a `Cache` store as specified by
the environment variables.
Possible cache backends include memory and jdg/infinispan. Any other value
will fallback to a memory cache.
:return: A concrete instance of a `Cache` store.
:rtype: Cache
"""
CACHE_TYPE = get_arg('CACHE_TYPE', 'memory')
CACHE_HOST = get_arg('CACHE_HOST', '')
CACHE_PORT = get_arg('CACHE_PORT', '')
CACHE_NAME = get_arg('CACHE_NAME', '')
if CACHE_TYPE == 'jdg' or CACHE_TYPE == 'infinispan':
return InfinispanCache(host=CACHE_HOST, name=CACHE_NAME,
port=CACHE_PORT)
else:
return MemoryCache()
def updater(response_q, storage):
"""update the cache with predictions
This function is meant to be used as a thread target. It will listen for
responses from the prediction process on the response_q queue. As
responses are registered, the storage cache will be updated.
Arguments:
response_q -- A queue of prediction responses
storage -- A Cache object for storing predictions
"""
while True:
resp = response_q.get()
if resp == 'stop':
break
elif resp == 'invalidate':
storage.invalidate()
else:
storage.update(resp)
class Cache():
"""an abstract base for storage caches
Children of this class need to be thread safe.
"""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def store(self, prediction):
"""store a new record
raises PredictionExists if the id is already in the cache
:param prediction: prediction data
:type prediction: Dict
"""
pass
@abc.abstractmethod
def get(self, p_id):
"""get a record by id
raises PredictionNotFound if the id is not in the cache
:param p_id: unique prediction id
:type p_id: string
:return The cached prediction data
:rtype Dict
"""
pass
@abc.abstractmethod
def update(self, prediction):
"""update an existing record
raises PredictionNotFound if the id is not in the cache
:param prediction: prediction data
:type prediction: Dict
"""
pass
@abc.abstractmethod
def invalidate(self):
"""invalidates the entire cache (e.g. when a new model is loaded)
"""
pass
class MemoryCache(Cache):
"""a memory backed cache
This cache will not retain information on restart, please use it
responsibly.
"""
def __init__(self):
self.data = {}
self.lock = t.Lock()
self._logger = logger.get_logger()
self._logger.debug("Initializing memory cache store.")
def store(self, prediction):
exists = False
self.lock.acquire()
if prediction['id'] not in self.data:
self.data[prediction['id']] = prediction
else:
exists = True
self.lock.release()
if exists:
self._logger.error(
"Prediction id={} already exists in the cache.".format(
prediction['id']))
raise errors.PredictionExists
def get(self, p_id):
self.lock.acquire()
ret = self.data.get(p_id)
self.lock.release()
if ret is None:
self._logger.error("Prediction id={} not found.".format(p_id))
raise errors.PredictionNotFound
return ret
def update(self, prediction):
exists = True
self.lock.acquire()
if prediction['id'] in self.data:
self.data[prediction['id']] = prediction
else:
exists = False
self.lock.release()
if not exists:
self._logger.error(
"Prediction id={} not found.".format(prediction['id']))
raise errors.PredictionNotFound
def invalidate(self):
self.lock.acquire()
self.data = {}
self.lock.release()
class InfinispanCache(Cache):
"""A JDG/Infinispan backend cache store (using the JDG REST API)
"""
def __init__(self, host, name, port):
"""
Initialize JDG cache manager by providing connection details
:param host: The JDG server host
:type host: str
:param name: The cache name (e.g. `namedCache`)
:type name: str
:param port: The JDG server port
:type port: int
"""
self._host = host
self._name = name
self._port = port
self._logger = logger.get_logger()
self._logger.debug(
"Initializing a JDG cache store (at {}:{}/{}).".format(host,
port,
name))
# invalidate cache, just in case we have stale persisted JDG entries
self.invalidate()
def _connect(self):
"""
Creates a HTTP connection to the JDG server
:return: JDG HTTP connection
:rtype: HTTPConnection
"""
return httplib.HTTPConnection(self._host, self._port)
def store(self, prediction):
try:
conn = self._connect()
# issue a POST request to the JDG server to create the cache entry
conn.request(method="POST", url=self._format(prediction['id']),
body=json.dumps(prediction),
headers={"Content-Type": "application/json"})
response = conn.getresponse()
conn.close()
# if a prediction with this id already exists, raise an error
if response.status == 409:
self._logger.error(
"Prediction id={} already exists in the cache.".format(
prediction['id']))
raise errors.PredictionExists
# raise a cache error if a status other than OK is returned by JDG
elif response.status != 200:
self._logger.error(
"JDG could not store prediction id={}.".format(
prediction['id']))
raise errors.CacheError
except httplib.HTTPException:
self._logger.error("Error connecting to JDG cache store.")
def get(self, p_id):
try:
conn = self._connect()
# issue a GET request to the JDG server to get the cache entry
conn.request(method="GET", url=self._format(p_id))
response = conn.getresponse()
# raise an error if trying to get a prediction that isn't cached
if response.status == 404:
self._logger.error("Prediction id={} not found.".format(p_id))
raise errors.PredictionNotFound
# raise a cache error if a status other than OK is returned by JDG
elif response.status != 200:
self._logger.error(
"JDG could not get prediction id={}.".format(p_id))
raise errors.CacheError
# parse the JSON string response
result = json.loads(response.read())
conn.close()
return result
except httplib.HTTPException:
self._logger.error("Error connecting to JDG cache store.")
def update(self, prediction):
try:
conn = self._connect()
# issue a PUT request to the JDG server to update the cache entry
conn.request(method="PUT", url=self._format(prediction['id']),
body=json.dumps(prediction),
headers={"Content-Type": "application/json"})
conn.close()
response = conn.getresponse()
# raise an error if trying to update an entry that doesn't exist
if response.status == 404:
self._logger.error(
"Prediction id={} not found.".format(prediction['id']))
raise errors.PredictionNotFound
# raise a cache error if a status other than OK is returned by JDG
elif response.status != 200:
self._logger.error(
"JDG could not update prediction id={}.".format(
prediction['id']))
raise errors.CacheError
except httplib.HTTPException:
self._logger.error("Error connecting to JDG cache store.")
def _format(self, p_id):
"""
Create a JDG server REST URL from the provided prediction key
:param p_id: prediction id
:return: JDG server REST URL
:rtype: str
"""
return "/rest/{}/{}".format(self._name, p_id)
def invalidate(self):
try:
conn = self._connect()
url = "/rest/{}".format(self._name)
# issue a DELETE request to invalidate the current JDG cache
conn.request(method="DELETE", url=url)
response = conn.getresponse()
conn.close()
# raise a cache error if a status other than OK is returned by JDG
if response.status != 200:
self._logger.error(
"JDG could not invalidate cache '{}'.".format(self._name))
raise errors.CacheError
except httplib.HTTPException:
self._logger.error("Error connecting to JDG cache store.")