Skip to content

Commit

Permalink
Merge pull request #10724 from uditijmehta/debug/async-email-502
Browse files Browse the repository at this point in the history
Add logging to post-commit handlers
  • Loading branch information
uditijmehta authored Aug 30, 2024
2 parents 6f4c7b8 + 5391b55 commit ef74fec
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
2 changes: 1 addition & 1 deletion framework/auth/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ def resend_confirmation_post(auth):
if user:
if throttle_period_expired(user.email_last_sent, settings.SEND_EMAIL_THROTTLE):
try:
send_confirm_email(user, clean_email, renew=True)
send_confirm_email_async(user, clean_email, renew=True)
except KeyError:
# already confirmed, redirect to dashboard
status_message = f'This email {clean_email} has already been confirmed.'
Expand Down
16 changes: 16 additions & 0 deletions framework/postcommit_tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,31 @@

def postcommit_queue():
if not hasattr(_local, 'postcommit_queue'):
logger.warning('Initializing postcommit_queue.')
_local.postcommit_queue = OrderedDict()
return _local.postcommit_queue

def postcommit_celery_queue():
if not hasattr(_local, 'postcommit_celery_queue'):
logger.warning('Initializing postcommit_celery_queue.')
_local.postcommit_celery_queue = OrderedDict()
return _local.postcommit_celery_queue

def postcommit_before_request():
logger.warning('Running postcommit_before_request.')
_local.postcommit_queue = OrderedDict()
_local.postcommit_celery_queue = OrderedDict()

def postcommit_after_request(response, base_status_error_code=500):
logger.warning('Running postcommit_after_request.')
if response.status_code >= base_status_error_code:
logger.warning(f'Response status {response.status_code} indicates an error; clearing queues.')
_local.postcommit_queue = OrderedDict()
_local.postcommit_celery_queue = OrderedDict()
return response
try:
if postcommit_queue():
logger.warning(f'Executing postcommit_queue with {len(postcommit_queue())} tasks.')
number_of_threads = 30 # one db connection per greenlet, let's share
pool = Pool(number_of_threads)
for func in postcommit_queue().values():
Expand All @@ -46,10 +52,12 @@ def postcommit_after_request(response, base_status_error_code=500):

if postcommit_celery_queue():
if settings.USE_CELERY:
logger.warning(f'Executing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks.')
for task_dict in postcommit_celery_queue().values():
task = Signature.from_dict(task_dict)
task.apply_async()
else:
logger.warning(f'Executing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks synchronously.')
for task in postcommit_celery_queue().values():
task()

Expand All @@ -62,17 +70,22 @@ def get_task_from_postcommit_queue(name, predicate, celery=True):
queue = postcommit_celery_queue() if celery else postcommit_queue()
matches = [task for key, task in queue.items() if task.type.name == name and predicate(task)]
if len(matches) == 1:
logger.warning(f'Found 1 match for task {name} in postcommit queue.')
return matches[0]
elif len(matches) > 1:
logger.warning(f'Found multiple matches for task {name} in postcommit queue.')
raise ValueError()
logger.warning(f'No matches found for task {name} in postcommit queue.')
return False

def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=True):
"""
Any task queued with this function where celery=True will be run asynchronously.
"""
logger.warning(f'Enqueuing postcommit task {fn.__name__}.')
if has_app_context() and current_app.config.get('TESTING', False):
# For testing purposes only: run fn directly
logger.warning(f'Running {fn.__name__} directly due to testing environment.')
fn(*args, **kwargs)
else:
# make a hash of the pertinent data
Expand All @@ -86,8 +99,10 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru
key = f'{key}:{binascii.hexlify(os.urandom(8))}'

if celery and isinstance(fn, PromiseProxy):
logger.warning(f'Enqueued task {fn.__name__} to postcommit_celery_queue.')
postcommit_celery_queue().update({key: fn.si(*args, **kwargs)})
else:
logger.warning(f'Enqueued task {fn.__name__} to postcommit_queue.')
postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)})

handlers = {
Expand All @@ -110,6 +125,7 @@ def wrapper(func):

@functools.wraps(func)
def wrapped(*args, **kwargs):
logger.warning(f'Wrapping function {func.__name__} for postcommit.')
enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request)
return wrapped
return wrapper

0 comments on commit ef74fec

Please sign in to comment.