Skip to content

Commit

Permalink
Add logging to post-commit handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
Uditi Mehta committed Aug 27, 2024
1 parent 14216c1 commit 464aa73
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
2 changes: 1 addition & 1 deletion framework/auth/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ def resend_confirmation_post(auth):
if user:
if throttle_period_expired(user.email_last_sent, settings.SEND_EMAIL_THROTTLE):
try:
send_confirm_email(user, clean_email, renew=True)
send_confirm_email_async(user, clean_email, renew=True)
except KeyError:
# already confirmed, redirect to dashboard
status_message = f'This email {clean_email} has already been confirmed.'
Expand Down
16 changes: 14 additions & 2 deletions framework/postcommit_tasks/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,43 @@ def postcommit_celery_queue():
return _local.postcommit_celery_queue

def postcommit_before_request():
logger.debug("Initializing postcommit queues before request")
_local.postcommit_queue = OrderedDict()
_local.postcommit_celery_queue = OrderedDict()

def postcommit_after_request(response, base_status_error_code=500):
logger.debug(f"Postcommit after request triggered with status code {response.status_code}")
if response.status_code >= base_status_error_code:
logger.debug("Clearing postcommit queues due to error response")
_local.postcommit_queue = OrderedDict()
_local.postcommit_celery_queue = OrderedDict()
return response
try:
if postcommit_queue():
logger.debug(f"Processing postcommit_queue with {len(postcommit_queue())} tasks")
number_of_threads = 30 # one db connection per greenlet, let's share
pool = Pool(number_of_threads)
for func in postcommit_queue().values():
logger.debug(f"Spawning task {func}")
pool.spawn(func)
pool.join(timeout=5.0, raise_error=True) # 5 second timeout and reraise exceptions

if postcommit_celery_queue():
logger.debug(f"Processing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks")
if settings.USE_CELERY:
for task_dict in postcommit_celery_queue().values():
task = Signature.from_dict(task_dict)
logger.debug(f"Applying async task {task}")
task.apply_async()
else:
for task in postcommit_celery_queue().values():
logger.debug(f"Executing task {task}")
task()

except AttributeError as ex:
if not settings.DEBUG_MODE:
logger.error(f'Post commit task queue not initialized: {ex}')
logger.error(f'Post commit task queue not initialized: {ex}')
except Exception as ex:
logger.error(f"Exception during postcommit processing: {ex}")
return response

def get_task_from_postcommit_queue(name, predicate, celery=True):
Expand Down Expand Up @@ -86,8 +95,10 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru
key = f'{key}:{binascii.hexlify(os.urandom(8))}'

if celery and isinstance(fn, PromiseProxy):
logger.debug(f"Enqueuing celery task {fn.__name__} with key {key}")
postcommit_celery_queue().update({key: fn.si(*args, **kwargs)})
else:
logger.debug(f"Enqueuing task {fn.__name__} with key {key}")
postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)})

handlers = {
Expand All @@ -110,6 +121,7 @@ def wrapper(func):

@functools.wraps(func)
def wrapped(*args, **kwargs):
logger.debug(f"Wrapping function {func.__name__} for postcommit")
enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request)
return wrapped
return wrapper

0 comments on commit 464aa73

Please sign in to comment.