-
Notifications
You must be signed in to change notification settings - Fork 3
/
tasks.py
317 lines (275 loc) · 13.5 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# -*- coding: utf-8 -*-
'''
Copyright 2014 FreshPlanet (http://freshplanet.com | [email protected])
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import cPickle
import importlib
import logging
import random
import types
from google.appengine.api import taskqueue, modules
from google.appengine.ext import ndb
from google.appengine.ext.deferred import deferred
import webapp2
# List of task queue names we should use as default queues to execute the tasks
DEFAULT_QUEUES = ['default']
# Name of the module to import to load your application code.
# That's where you should have implemented your /_warmup handler.
# Use this to make sure all your code is available by the time we de-serialize the task.
# Alternatively you can use the appengine_config.py module to make sure all your modules are always loaded.
# Ex: "myapp.warmup"
WARMUP_MODULE = None
# If you want to execute your tasks on another module than the default one, specify the module name here.
# This feature is meant to be used this way:
# - You use two modules with the same code, with different settings (scheduler, instance class...)
# to optimize how you serve user-facing requests versus how you serve things that run in the background like your tasks.
# - Both modules use versions with the same names, versions with the same name sharing the same code.
# Then we can redirect the tasks you enqueue to the right module and version,
# avoiding compatibility issues when you deploy new code to a new version.
BACKGROUND_MODULE = None
@ndb.tasklet
def addTask(queues, func, *args, **kwargs):
""" Enqueue a task to execute the specified function call later from the task queue.
Handle exceptions and dispatching to the right queue.
@param queues: List of queues names. We will randomly select one to push the task into.
Can be 'default' to use default queues.
@param func: The function to execute later
@param _countdown: seconds to wait before calling the function
@param _eta: timestamp defining when to call the function
@param _name: Name to give the Task; if not specified, a name will be
auto-generated when added to a queue and assigned to this object.
Must match the _TASK_NAME_PATTERN regular expression: ^[a-zA-Z0-9_-]{1,500}$
@param _target: specific version and/or module the task should execute on
@param _raiseIfExists: if set to True, we raise the eventual TaskAlreadyExistsError
@param _transactional: to make sure task in enqueued in the task queue
@param _retry_options: task queue retry options
@param _parent: ndb Key instance, if provided, function payload will be stored in data store entity under this
parent if the size of payload exceeds 100KB. Max size of payload could be 1MB otherwise data store will throw error
@return: A Future that will yield True if the task could be enqueued.
@rtype: ndb.Future
"""
if not isinstance(queues, list):
queues = DEFAULT_QUEUES
_raiseIfExists = kwargs.pop('_raiseIfExists', False)
taskName = kwargs.pop('_name', None)
countdown = kwargs.pop('_countdown', None)
eta = kwargs.pop('_eta', None)
target = kwargs.pop('_target', None)
transactional = kwargs.pop('_transactional', False)
retry_options = kwargs.pop('_retry_options', None)
parent = kwargs.pop('_parent', None)
if not target and BACKGROUND_MODULE:
# Tasks from the default module are executed into the background module.
# Tasks from other modules (stage, background) stays inside their module.
if modules.get_current_module_name() == 'default':
# Target mirror of current version to avoid compatibility issues
# If that version does not exist, it will fall back to the default background version.
target = modules.get_current_version_name() + '.' + BACKGROUND_MODULE
success = False
try:
yield _defer(queues, func, args, kwargs, countdown, eta, taskName, target, transactional, retry_options, parent)
success = True
except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError):
# TaskAlreadyExistsError: a task with same name is in the queue
# TombstonedTaskError: a task with same name has been in the queue recently
if taskName:
# If we specified a name it's to avoid duplicated so this error is expected
logging.info("TaskAlreadyExistsError: task with name %s already enqueued.", taskName)
if _raiseIfExists:
raise
else:
logging.exception("Could not enqueue the task")
except:
logging.exception("Could not enqueue the task")
raise ndb.Return(success)
class _DeferredTaskEntity(ndb.Model):
"""Datastore representation of a deferred task.
This is used in cases when the deferred task is too big to be included as
payload with the task queue entry.
"""
data = ndb.BlobProperty(required=True)
def _run_from_datastore(key):
"""Retrieves a task from the datastore and executes it.
Args:
key: The datastore key of a _DeferredTaskEntity storing the task.
Returns:
The return value of the function invocation.
"""
logging.info('Retrieving function payload from data store')
entity = key.get()
if not entity:
logging.error("Datastore entity not found for key: %s", key)
raise deferred.PermanentTaskFailure
try:
_run(entity.data)
entity.key.delete()
except deferred.PermanentTaskFailure:
entity.key.delete()
raise
def isFromTaskQueue(request=None):
""" Check if we are currently running from a task queue """
request = request or webapp2.get_request()
# As stated in the doc (https://developers.google.com/appengine/docs/python/taskqueue/overview-push#Task_Request_Headers)
# These headers are set internally by Google App Engine.
# If your request handler finds any of these headers, it can trust that the request is a Task Queue request.
# If any of the above headers are present in an external user request to your App, they are stripped.
# The exception being requests from logged in administrators of the application, who are allowed to set the headers for testing purposes.
return bool(request.headers.get('X-Appengine-TaskName'))
def getRetryCount():
""" Returns the current number of times the current task is being retried """
return int(webapp2.get_request().headers.get('X-Appengine-TaskRetryCount', 0))
def logAsRetried(message, *args, **kwargs):
"""
Depending on # of times the task is being retried, we will increase the logging level.
"""
if isFromTaskQueue():
retryCount = getRetryCount()
if retryCount == 0:
level = logging.INFO
elif retryCount == 1:
level = logging.WARN
elif retryCount == 2:
level = logging.ERROR
else:
level = logging.CRITICAL
else:
level = logging.ERROR
logging.log(level, message, *args, **kwargs)
@ndb.tasklet
def _defer(queues, func, funcArgs, funcKwargs, countdown=None, eta=None,
taskName=None, target=None, transactional=False, retry_options=None, parent=None):
"""
Our own implementation of deferred.defer.
This allows:
- using webapp2 as deferred handler and applying our middlewares
- using task asynchronous API
- using cPickle instead of pickle
- logging headers at DEBUG level instead of INFO
"""
payload = _serialize(func, funcArgs, funcKwargs)
queueName = random.choice(queues)
# We track which function is called so that it appears clearly in the App admin dash-board.
# Note: if it's a class method, we only track the method name and not the class name.
url = "/_cb/deferred/%s/%s" % (getattr(func, '__module__', ''), getattr(func, '__name__', ''))
headers = {"Content-Type": "application/octet-stream"}
try:
task = taskqueue.Task(payload=payload, target=target, url=url, headers=headers,
countdown=countdown, eta=eta, name=taskName, retry_options=retry_options)
except taskqueue.TaskTooLargeError:
logging.info('Task Too Large. Storing payload in the data store')
key = yield _DeferredTaskEntity(data=payload, parent=parent).put_async()
payload = _serialize(_run_from_datastore, [key], {})
task = taskqueue.Task(payload=payload, target=target, url=url, headers=headers,
countdown=countdown, eta=eta, name=taskName, retry_options=retry_options)
ret = yield task.add_async(queueName, transactional=transactional)
raise ndb.Return(ret)
class DeferredHandler(webapp2.RequestHandler):
# Queue & task name are already set in the request log.
# We don't care about country and name-space.
_SKIP_HEADERS = {'x-appengine-country', 'x-appengine-queuename', 'x-appengine-taskname',
'x-appengine-current-namespace'}
def post(self, *args, **kwargs):
""" Executes a deferred task """
# Add some task debug information.
headers = []
for key, value in self.request.headers.items():
k = key.lower()
if k.startswith("x-appengine-") and k not in self._SKIP_HEADERS:
headers.append("%s:%s" % (key, value))
logging.debug(", ".join(headers))
# Make sure all modules are loaded
if WARMUP_MODULE:
importlib.import_module(WARMUP_MODULE)
# Make sure we are called from the Task Queue (security)
if isFromTaskQueue(self.request):
try:
_run(self.request.body)
except deferred.SingularTaskFailure as e:
msg = "Failure executing task, task retry forced"
if e.message:
msg += ": %s" % e.message
logging.debug(msg)
self.response.set_status(408)
except deferred.PermanentTaskFailure:
logging.exception("Permanent failure attempting to execute task")
else:
logging.critical('Detected an attempted XSRF attack: we are not executing from a task queue.')
self.response.set_status(403)
def _run(data):
"""Unpickles and executes a task.
Args:
data: A pickled tuple of (function, args, kwargs) to execute.
Returns:
The return value of the function invocation.
"""
try:
func, args, kwds = cPickle.loads(data)
except Exception, e:
raise deferred.PermanentTaskFailure(e)
try:
func(*args, **kwds)
except TypeError:
logging.debug("Deferred function arguments: %s %s", args, kwds)
raise
# ===========================================================================
# From google.appengine.ext.deferred.defer lib
# ===========================================================================
def _invokeMember(obj, memberName, *args, **kwargs):
"""Retrieves a member of an object, then calls it with the provided arguments.
Args:
obj: The object to operate on.
membername: The name of the member to retrieve from ojb.
args: Positional arguments to pass to the method.
kwargs: Keyword arguments to pass to the method.
Returns:
The return value of the method invocation.
"""
return getattr(obj, memberName)(*args, **kwargs)
def _curry_callable(obj, args, kwargs):
"""Takes a callable and arguments and returns a task queue tuple.
The returned tuple consists of (callable, args, kwargs), and can be pickled
and unpickled safely.
Args:
obj: The callable to curry. See the module docstring for restrictions.
args: Positional arguments to call the callable with.
kwargs: Keyword arguments to call the callable with.
Returns:
A tuple consisting of (callable, args, kwargs) that can be evaluated by
run() with equivalent effect of executing the function directly.
Raises:
ValueError: If the passed in object is not of a valid callable type.
"""
if isinstance(obj, types.MethodType):
return (_invokeMember, (obj.im_self, obj.im_func.__name__) + args, kwargs)
elif isinstance(obj, types.BuiltinMethodType):
if not obj.__self__:
return (obj, args, kwargs)
else:
return (_invokeMember, (obj.__self__, obj.__name__) + args, kwargs)
elif isinstance(obj, types.ObjectType) and hasattr(obj, "__call__"):
return (obj, args, kwargs)
elif isinstance(obj, (types.FunctionType, types.BuiltinFunctionType,
types.ClassType, types.UnboundMethodType)):
return (obj, args, kwargs)
else:
raise ValueError("obj must be callable")
def _serialize(obj, args, kwargs):
"""Serializes a callable into a format recognized by the deferred executor.
Args:
obj: The callable to serialize. See module docstring for restrictions.
args: Positional arguments to call the callable with.
kwargs: Keyword arguments to call the callable with.
Returns:
A serialized representation of the callable.
"""
curried = _curry_callable(obj, args, kwargs)
return cPickle.dumps(curried, protocol=cPickle.HIGHEST_PROTOCOL)