From 464aa73b235e659d431ae8f9cf7b51dad6db9942 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Tue, 27 Aug 2024 16:05:20 -0400 Subject: [PATCH 1/8] Add logging to post-commit handlers --- framework/auth/views.py | 2 +- framework/postcommit_tasks/handlers.py | 16 ++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/framework/auth/views.py b/framework/auth/views.py index e398a6db0a5..ac099b48796 100644 --- a/framework/auth/views.py +++ b/framework/auth/views.py @@ -993,7 +993,7 @@ def resend_confirmation_post(auth): if user: if throttle_period_expired(user.email_last_sent, settings.SEND_EMAIL_THROTTLE): try: - send_confirm_email(user, clean_email, renew=True) + send_confirm_email_async(user, clean_email, renew=True) except KeyError: # already confirmed, redirect to dashboard status_message = f'This email {clean_email} has already been confirmed.' diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index 0af90a3b30f..bd7014d1eef 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -28,34 +28,43 @@ def postcommit_celery_queue(): return _local.postcommit_celery_queue def postcommit_before_request(): + logger.debug("Initializing postcommit queues before request") _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() def postcommit_after_request(response, base_status_error_code=500): + logger.debug(f"Postcommit after request triggered with status code {response.status_code}") if response.status_code >= base_status_error_code: + logger.debug("Clearing postcommit queues due to error response") _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() return response try: if postcommit_queue(): + logger.debug(f"Processing postcommit_queue with {len(postcommit_queue())} tasks") number_of_threads = 30 # one db connection per greenlet, let's share pool = Pool(number_of_threads) for func in postcommit_queue().values(): + logger.debug(f"Spawning task {func}") pool.spawn(func) pool.join(timeout=5.0, raise_error=True) # 5 second timeout and reraise exceptions if postcommit_celery_queue(): + logger.debug(f"Processing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks") if settings.USE_CELERY: for task_dict in postcommit_celery_queue().values(): task = Signature.from_dict(task_dict) + logger.debug(f"Applying async task {task}") task.apply_async() else: for task in postcommit_celery_queue().values(): + logger.debug(f"Executing task {task}") task() except AttributeError as ex: - if not settings.DEBUG_MODE: - logger.error(f'Post commit task queue not initialized: {ex}') + logger.error(f'Post commit task queue not initialized: {ex}') + except Exception as ex: + logger.error(f"Exception during postcommit processing: {ex}") return response def get_task_from_postcommit_queue(name, predicate, celery=True): @@ -86,8 +95,10 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru key = f'{key}:{binascii.hexlify(os.urandom(8))}' if celery and isinstance(fn, PromiseProxy): + logger.debug(f"Enqueuing celery task {fn.__name__} with key {key}") postcommit_celery_queue().update({key: fn.si(*args, **kwargs)}) else: + logger.debug(f"Enqueuing task {fn.__name__} with key {key}") postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)}) handlers = { @@ -110,6 +121,7 @@ def wrapper(func): @functools.wraps(func) def wrapped(*args, **kwargs): + logger.debug(f"Wrapping function {func.__name__} for postcommit") enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request) return wrapped return wrapper From a519f4f6d3d55544739f77171bab10252517e5e9 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Tue, 27 Aug 2024 16:27:14 -0400 Subject: [PATCH 2/8] Update commit to warning level --- framework/postcommit_tasks/handlers.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index bd7014d1eef..0253f69d598 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -28,37 +28,37 @@ def postcommit_celery_queue(): return _local.postcommit_celery_queue def postcommit_before_request(): - logger.debug("Initializing postcommit queues before request") + logger.debug('Initializing postcommit queues before request') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() def postcommit_after_request(response, base_status_error_code=500): - logger.debug(f"Postcommit after request triggered with status code {response.status_code}") + logger.debug(f'Postcommit after request triggered with status code {response.status_code}') if response.status_code >= base_status_error_code: - logger.debug("Clearing postcommit queues due to error response") + logger.debug('Clearing postcommit queues due to error response') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() return response try: if postcommit_queue(): - logger.debug(f"Processing postcommit_queue with {len(postcommit_queue())} tasks") + logger.debug(f'Processing postcommit_queue with {len(postcommit_queue())} tasks') number_of_threads = 30 # one db connection per greenlet, let's share pool = Pool(number_of_threads) for func in postcommit_queue().values(): - logger.debug(f"Spawning task {func}") + logger.debug(f'Spawning task {func}') pool.spawn(func) pool.join(timeout=5.0, raise_error=True) # 5 second timeout and reraise exceptions if postcommit_celery_queue(): - logger.debug(f"Processing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks") + logger.debug(f'Processing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks') if settings.USE_CELERY: for task_dict in postcommit_celery_queue().values(): task = Signature.from_dict(task_dict) - logger.debug(f"Applying async task {task}") + logger.debug(f'Applying async task {task}') task.apply_async() else: for task in postcommit_celery_queue().values(): - logger.debug(f"Executing task {task}") + logger.debug(f'Executing task {task}') task() except AttributeError as ex: @@ -95,10 +95,10 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru key = f'{key}:{binascii.hexlify(os.urandom(8))}' if celery and isinstance(fn, PromiseProxy): - logger.debug(f"Enqueuing celery task {fn.__name__} with key {key}") + logger.debug(f'Enqueuing celery task {fn.__name__} with key {key}') postcommit_celery_queue().update({key: fn.si(*args, **kwargs)}) else: - logger.debug(f"Enqueuing task {fn.__name__} with key {key}") + logger.debug(f'Enqueuing task {fn.__name__} with key {key}') postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)}) handlers = { @@ -121,7 +121,7 @@ def wrapper(func): @functools.wraps(func) def wrapped(*args, **kwargs): - logger.debug(f"Wrapping function {func.__name__} for postcommit") + logger.debug(f'Wrapping function {func.__name__} for postcommit') enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request) return wrapped return wrapper From aebef61787a66ffba083626af0ce90d132f3a387 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Tue, 27 Aug 2024 16:25:20 -0400 Subject: [PATCH 3/8] Update commit to warning level --- framework/postcommit_tasks/handlers.py | 48 +++++++------------------- 1 file changed, 12 insertions(+), 36 deletions(-) diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index 0253f69d598..578cbdfaa55 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -28,77 +28,61 @@ def postcommit_celery_queue(): return _local.postcommit_celery_queue def postcommit_before_request(): - logger.debug('Initializing postcommit queues before request') + logger.warning('Initializing postcommit queues before request') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() def postcommit_after_request(response, base_status_error_code=500): - logger.debug(f'Postcommit after request triggered with status code {response.status_code}') + logger.warning(f'Postcommit after request triggered with status code {response.status_code}') if response.status_code >= base_status_error_code: - logger.debug('Clearing postcommit queues due to error response') + logger.warning('Clearing postcommit queues due to error response') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() return response try: if postcommit_queue(): - logger.debug(f'Processing postcommit_queue with {len(postcommit_queue())} tasks') + logger.warning(f'Processing postcommit_queue with {len(postcommit_queue())} tasks') number_of_threads = 30 # one db connection per greenlet, let's share pool = Pool(number_of_threads) for func in postcommit_queue().values(): - logger.debug(f'Spawning task {func}') + logger.warning(f'Spawning task {func}') pool.spawn(func) pool.join(timeout=5.0, raise_error=True) # 5 second timeout and reraise exceptions if postcommit_celery_queue(): - logger.debug(f'Processing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks') + logger.warning(f'Processing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks') if settings.USE_CELERY: for task_dict in postcommit_celery_queue().values(): task = Signature.from_dict(task_dict) - logger.debug(f'Applying async task {task}') + logger.warning(f'Applying async task {task}') task.apply_async() else: for task in postcommit_celery_queue().values(): - logger.debug(f'Executing task {task}') - task() + logger.warning(f'Executing task {task}') except AttributeError as ex: logger.error(f'Post commit task queue not initialized: {ex}') except Exception as ex: - logger.error(f"Exception during postcommit processing: {ex}") + logger.error(f'Exception during postcommit processing: {ex}') return response -def get_task_from_postcommit_queue(name, predicate, celery=True): - queue = postcommit_celery_queue() if celery else postcommit_queue() - matches = [task for key, task in queue.items() if task.type.name == name and predicate(task)] - if len(matches) == 1: - return matches[0] - elif len(matches) > 1: - raise ValueError() - return False - def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=True): - """ - Any task queued with this function where celery=True will be run asynchronously. - """ if has_app_context() and current_app.config.get('TESTING', False): - # For testing purposes only: run fn directly fn(*args, **kwargs) else: - # make a hash of the pertinent data raw = [fn.__name__, fn.__module__, args, kwargs] m = hashlib.md5() m.update('-'.join([x.__repr__() for x in raw]).encode()) key = m.hexdigest() if not once_per_request: - # we want to run it once for every occurrence, add a random string key = f'{key}:{binascii.hexlify(os.urandom(8))}' if celery and isinstance(fn, PromiseProxy): - logger.debug(f'Enqueuing celery task {fn.__name__} with key {key}') + logger.warning(f'Enqueuing celery task {fn.__name__} with key {key}') postcommit_celery_queue().update({key: fn.si(*args, **kwargs)}) else: - logger.debug(f'Enqueuing task {fn.__name__} with key {key}') + logger.warning(f'Enqueuing task {fn.__name__} with key {key}') postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)}) handlers = { @@ -107,21 +91,13 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru } def run_postcommit(once_per_request=True, celery=False): - """ - Delays function execution until after the request's transaction has been committed. - If you set the celery kwarg to True args and kwargs must be JSON serializable - Tasks will only be run if the response's status code is < 500. - Any task queued with this function where celery=True will be run asynchronously. - :return: - """ def wrapper(func): - # if we're local dev or running unit tests, run without queueing if settings.DEBUG_MODE: return func @functools.wraps(func) def wrapped(*args, **kwargs): - logger.debug(f'Wrapping function {func.__name__} for postcommit') + logger.warning(f'Wrapping function {func.__name__} for postcommit') enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request) return wrapped return wrapper From 05dec43d61914f43af8acd29297b693ae9cafe60 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Tue, 27 Aug 2024 16:30:48 -0400 Subject: [PATCH 4/8] Update commit to warning level --- framework/postcommit_tasks/handlers.py | 28 ++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index 578cbdfaa55..1d4aad00d14 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -59,23 +59,39 @@ def postcommit_after_request(response, base_status_error_code=500): else: for task in postcommit_celery_queue().values(): logger.warning(f'Executing task {task}') + task() except AttributeError as ex: logger.error(f'Post commit task queue not initialized: {ex}') except Exception as ex: - logger.error(f'Exception during postcommit processing: {ex}') + logger.error(f"Exception during postcommit processing: {ex}") return response +def get_task_from_postcommit_queue(name, predicate, celery=True): + queue = postcommit_celery_queue() if celery else postcommit_queue() + matches = [task for key, task in queue.items() if task.type.name == name and predicate(task)] + if len(matches) == 1: + return matches[0] + elif len(matches) > 1: + raise ValueError() + return False + def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=True): + """ + Any task queued with this function where celery=True will be run asynchronously. + """ if has_app_context() and current_app.config.get('TESTING', False): + # For testing purposes only: run fn directly fn(*args, **kwargs) else: + # make a hash of the pertinent data raw = [fn.__name__, fn.__module__, args, kwargs] m = hashlib.md5() m.update('-'.join([x.__repr__() for x in raw]).encode()) key = m.hexdigest() if not once_per_request: + # we want to run it once for every occurrence, add a random string key = f'{key}:{binascii.hexlify(os.urandom(8))}' if celery and isinstance(fn, PromiseProxy): @@ -91,8 +107,16 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru } def run_postcommit(once_per_request=True, celery=False): + """ + Delays function execution until after the request's transaction has been committed. + If you set the celery kwarg to True args and kwargs must be JSON serializable + Tasks will only be run if the response's status code is < 500. + Any task queued with this function where celery=True will be run asynchronously. + :return: + """ def wrapper(func): - if settings.DEBUG_MODE: + # if we're local dev or running unit tests, run without queueing + if settings.warning_MODE: return func @functools.wraps(func) From 370c9794a17d564a8148011a180aa0c5be715d81 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Tue, 27 Aug 2024 16:30:48 -0400 Subject: [PATCH 5/8] Update commit to warning level --- framework/postcommit_tasks/handlers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index 1d4aad00d14..7aec0f58a73 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -60,6 +60,7 @@ def postcommit_after_request(response, base_status_error_code=500): for task in postcommit_celery_queue().values(): logger.warning(f'Executing task {task}') task() + task() except AttributeError as ex: logger.error(f'Post commit task queue not initialized: {ex}') @@ -116,7 +117,7 @@ def run_postcommit(once_per_request=True, celery=False): """ def wrapper(func): # if we're local dev or running unit tests, run without queueing - if settings.warning_MODE: + if settings.DEBUG_MODE: return func @functools.wraps(func) From a3f47474fbce6f29ba822dc4dfdf860b1cecafa6 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Thu, 29 Aug 2024 13:44:37 -0400 Subject: [PATCH 6/8] remove debugging --- framework/postcommit_tasks/handlers.py | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index 2ef9b94bdf9..6883745ee48 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -28,38 +28,29 @@ def postcommit_celery_queue(): return _local.postcommit_celery_queue def postcommit_before_request(): - logger.warning('Initializing postcommit queues before request') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() def postcommit_after_request(response, base_status_error_code=500): - logger.warning(f'Postcommit after request triggered with status code {response.status_code}') if response.status_code >= base_status_error_code: - logger.warning('Clearing postcommit queues due to error response') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() return response try: if postcommit_queue(): - logger.warning(f'Processing postcommit_queue with {len(postcommit_queue())} tasks') number_of_threads = 30 # one db connection per greenlet, let's share pool = Pool(number_of_threads) for func in postcommit_queue().values(): - logger.warning(f'Spawning task {func}') pool.spawn(func) pool.join(timeout=5.0, raise_error=True) # 5 second timeout and reraise exceptions if postcommit_celery_queue(): - logger.warning(f'Processing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks') if settings.USE_CELERY: for task_dict in postcommit_celery_queue().values(): task = Signature.from_dict(task_dict) - logger.warning(f'Applying async task {task}') task.apply_async() else: for task in postcommit_celery_queue().values(): - logger.warning(f'Executing task {task}') - task() task() except AttributeError as ex: @@ -95,10 +86,8 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru key = f'{key}:{binascii.hexlify(os.urandom(8))}' if celery and isinstance(fn, PromiseProxy): - logger.warning(f'Enqueuing celery task {fn.__name__} with key {key}') postcommit_celery_queue().update({key: fn.si(*args, **kwargs)}) else: - logger.warning(f'Enqueuing task {fn.__name__} with key {key}') postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)}) handlers = { @@ -121,7 +110,6 @@ def wrapper(func): @functools.wraps(func) def wrapped(*args, **kwargs): - logger.warning(f'Wrapping function {func.__name__} for postcommit') enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request) return wrapped - return wrapper + return wrapper \ No newline at end of file From b2b68b5a45bd37e70d2b49b089347733d1232429 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Thu, 29 Aug 2024 13:49:28 -0400 Subject: [PATCH 7/8] remove debugging --- framework/postcommit_tasks/handlers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index 6883745ee48..0af90a3b30f 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -112,4 +112,4 @@ def wrapper(func): def wrapped(*args, **kwargs): enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request) return wrapped - return wrapper \ No newline at end of file + return wrapper From 5391b55cf9a91775e1233f7b822c5c138156dbd5 Mon Sep 17 00:00:00 2001 From: Uditi Mehta Date: Thu, 29 Aug 2024 14:33:49 -0400 Subject: [PATCH 8/8] add debugging --- framework/postcommit_tasks/handlers.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/framework/postcommit_tasks/handlers.py b/framework/postcommit_tasks/handlers.py index 0af90a3b30f..1bee56cdc2c 100644 --- a/framework/postcommit_tasks/handlers.py +++ b/framework/postcommit_tasks/handlers.py @@ -19,25 +19,31 @@ def postcommit_queue(): if not hasattr(_local, 'postcommit_queue'): + logger.warning('Initializing postcommit_queue.') _local.postcommit_queue = OrderedDict() return _local.postcommit_queue def postcommit_celery_queue(): if not hasattr(_local, 'postcommit_celery_queue'): + logger.warning('Initializing postcommit_celery_queue.') _local.postcommit_celery_queue = OrderedDict() return _local.postcommit_celery_queue def postcommit_before_request(): + logger.warning('Running postcommit_before_request.') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() def postcommit_after_request(response, base_status_error_code=500): + logger.warning('Running postcommit_after_request.') if response.status_code >= base_status_error_code: + logger.warning(f'Response status {response.status_code} indicates an error; clearing queues.') _local.postcommit_queue = OrderedDict() _local.postcommit_celery_queue = OrderedDict() return response try: if postcommit_queue(): + logger.warning(f'Executing postcommit_queue with {len(postcommit_queue())} tasks.') number_of_threads = 30 # one db connection per greenlet, let's share pool = Pool(number_of_threads) for func in postcommit_queue().values(): @@ -46,10 +52,12 @@ def postcommit_after_request(response, base_status_error_code=500): if postcommit_celery_queue(): if settings.USE_CELERY: + logger.warning(f'Executing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks.') for task_dict in postcommit_celery_queue().values(): task = Signature.from_dict(task_dict) task.apply_async() else: + logger.warning(f'Executing postcommit_celery_queue with {len(postcommit_celery_queue())} tasks synchronously.') for task in postcommit_celery_queue().values(): task() @@ -62,17 +70,22 @@ def get_task_from_postcommit_queue(name, predicate, celery=True): queue = postcommit_celery_queue() if celery else postcommit_queue() matches = [task for key, task in queue.items() if task.type.name == name and predicate(task)] if len(matches) == 1: + logger.warning(f'Found 1 match for task {name} in postcommit queue.') return matches[0] elif len(matches) > 1: + logger.warning(f'Found multiple matches for task {name} in postcommit queue.') raise ValueError() + logger.warning(f'No matches found for task {name} in postcommit queue.') return False def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=True): """ Any task queued with this function where celery=True will be run asynchronously. """ + logger.warning(f'Enqueuing postcommit task {fn.__name__}.') if has_app_context() and current_app.config.get('TESTING', False): # For testing purposes only: run fn directly + logger.warning(f'Running {fn.__name__} directly due to testing environment.') fn(*args, **kwargs) else: # make a hash of the pertinent data @@ -86,8 +99,10 @@ def enqueue_postcommit_task(fn, args, kwargs, celery=False, once_per_request=Tru key = f'{key}:{binascii.hexlify(os.urandom(8))}' if celery and isinstance(fn, PromiseProxy): + logger.warning(f'Enqueued task {fn.__name__} to postcommit_celery_queue.') postcommit_celery_queue().update({key: fn.si(*args, **kwargs)}) else: + logger.warning(f'Enqueued task {fn.__name__} to postcommit_queue.') postcommit_queue().update({key: functools.partial(fn, *args, **kwargs)}) handlers = { @@ -110,6 +125,7 @@ def wrapper(func): @functools.wraps(func) def wrapped(*args, **kwargs): + logger.warning(f'Wrapping function {func.__name__} for postcommit.') enqueue_postcommit_task(func, args, kwargs, celery=celery, once_per_request=once_per_request) return wrapped return wrapper