diff --git a/framework/auth/views.py b/framework/auth/views.py index e398a6db0a5..ac099b48796 100644 --- a/framework/auth/views.py +++ b/framework/auth/views.py @@ -993,7 +993,7 @@ def resend_confirmation_post(auth): if user: if throttle_period_expired(user.email_last_sent, settings.SEND_EMAIL_THROTTLE): try: - send_confirm_email(user, clean_email, renew=True) + send_confirm_email_async(user, clean_email, renew=True) except KeyError: # already confirmed, redirect to dashboard status_message = f'This email {clean_email} has already been confirmed.' diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index 0af90a3b30f..1bee56cdc2c 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -19,25 +19,31 @@ def postcommit_queue(): if not hasattr(_local, 'postcommit_queue'): + logger.warning('Initializing postcommit_queue.') _local.postcommit_queue = OrderedDict() return _local.postcommit_queue def postcommit_celery_queue(): if not hasattr(_local, 'postcommit_celery_queue'): + logger.warning('Initializing postcommit_celery_queue.') _local.postcommit_celery_queue = OrderedDict() return _local.postcommit_celery_queue def postcommit_before_request(): + logger.warning('Running postcommit_before_request.') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() def postcommit_after_request(response, base_status_error_code=500): + logger.warning('Running postcommit_after_request.') if response.status_code >= base_status_error_code: + logger.warning(f'Response status {response.status_code} indicates an error; clearing queues.') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() return response try: if postcommit_queue(): + logger.warning(f'Executing postcommit_queue with {len(postcommit_queue())} tasks.') number_of_threads = 30 # one db connection per greenlet, let's share pool = Pool(number_of_threads) for func in postcommit_queue().values(): @@ -46,10 +52,12 @@ def postcommit_after_request(response, base_status_error_code=500): if postcommit_celery_queue(): if settings.USE_CELERY: + logger.warning(f'Executing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks.') for task_dict in postcommit_celery_queue().values(): task = Signature.from_dict(task_dict) task.apply_async() else: + logger.warning(f'Executing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks synchronously.') for task in postcommit_celery_queue().values(): task() @@ -62,17 +70,22 @@ def get_task_from_postcommit_queue(name, predicate, celery=True): queue = postcommit_celery_queue() if celery else postcommit_queue() matches = [task for key, task in queue.items() if task.type.name == name and predicate(task)] if len(matches) == 1: + logger.warning(f'Found 1 match for task {name} in postcommit queue.') return matches[0] elif len(matches) > 1: + logger.warning(f'Found multiple matches for task {name} in postcommit queue.') raise ValueError() + logger.warning(f'No matches found for task {name} in postcommit queue.') return False def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=True): """ Any task queued with this function where celery=True will be run asynchronously. """ + logger.warning(f'Enqueuing postcommit task {fn.__name__}.') if has_app_context() and current_app.config.get('TESTING', False): # For testing purposes only: run fn directly + logger.warning(f'Running {fn.__name__} directly due to testing environment.') fn(*args, **kwargs) else: # make a hash of the pertinent data @@ -86,8 +99,10 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru key = f'{key}:{binascii.hexlify(os.urandom(8))}' if celery and isinstance(fn, PromiseProxy): + logger.warning(f'Enqueued task {fn.__name__} to postcommit_celery_queue.') postcommit_celery_queue().update({key: fn.si(*args, **kwargs)}) else: + logger.warning(f'Enqueued task {fn.__name__} to postcommit_queue.') postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)}) handlers = { @@ -110,6 +125,7 @@ def wrapper(func): @functools.wraps(func) def wrapped(*args, **kwargs): + logger.warning(f'Wrapping function {func.__name__} for postcommit.') enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request) return wrapped return wrapper