diff --git a/conftest.py b/conftest.py index 34c9bdfc036..edf96d46ba9 100644 --- a/conftest.py +++ b/conftest.py @@ -90,8 +90,8 @@ def fake(): 'mark': 'enable_search', 'replacement': mock.MagicMock() }, - 'website.search.elastic_search': { - 'mark': 'enable_search', + 'osf.external.messages.celery_publishers._publish_user_status_change': { + 'mark': 'enable_account_status_messaging', 'replacement': mock.MagicMock() } } diff --git a/framework/auth/signals.py b/framework/auth/signals.py index 2387ea2b4d0..15f279520e5 100644 --- a/framework/auth/signals.py +++ b/framework/auth/signals.py @@ -6,6 +6,8 @@ user_registered = signals.signal('user-registered') user_confirmed = signals.signal('user-confirmed') user_email_removed = signals.signal('user-email-removed') -user_merged = signals.signal('user-merged') +user_account_merged = signals.signal('user-account-merged') +user_account_deactivated = signals.signal('user-account-deactivated') +user_account_reactivated = signals.signal('user-account-reactivated') unconfirmed_user_created = signals.signal('unconfirmed-user-created') diff --git a/framework/celery_tasks/routers.py b/framework/celery_tasks/routers.py index 5335012dbf9..10696c32655 100644 --- a/framework/celery_tasks/routers.py +++ b/framework/celery_tasks/routers.py @@ -14,6 +14,8 @@ def match_by_module(task_path): return CeleryConfig.task_high_queue if task_subpath in CeleryConfig.remote_computing_modules: return CeleryConfig.task_remote_computing_queue + if task_subpath in CeleryConfig.task_account_status_changes_queue: + return CeleryConfig.task_account_status_changes_queue return CeleryConfig.task_default_queue diff --git a/osf/external/messages/__init__.py b/osf/external/messages/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/osf/external/messages/celery_publishers.py b/osf/external/messages/celery_publishers.py new file mode 100644 index 00000000000..3c3c90f9d37 --- /dev/null +++ b/osf/external/messages/celery_publishers.py @@ -0,0 +1,45 @@ +import waffle +from kombu import Exchange +from framework.celery_tasks import app as celery_app +from website import settings +from osf import features +from osf.utils.requests import get_current_request + + +def publish_deactivated_user(user): + _publish_user_status_change( + body={ + 'action': 'deactivate', + 'user_uri': user.get_semantic_iri(), + } + ) + + +def publish_reactivate_user(user): + _publish_user_status_change( + body={ + 'action': 'reactivate', + 'user_uri': user.get_semantic_iri(), + }, + ) + + +def publish_merged_user(user): + assert user.merged_by, 'User received merge signal, but has no `merged_by` reference.' + _publish_user_status_change( + body={ + 'action': 'merge', + 'into_user_uri': user.merged_by.get_semantic_iri(), + 'from_user_uri': user.get_semantic_iri(), + }, + ) + + +def _publish_user_status_change(body: dict): + if settings.USE_CELERY and waffle.flag_is_active(get_current_request(), features.ENABLE_GV): + with celery_app.producer_pool.acquire() as producer: + producer.publish( + body=body, + exchange=Exchange(celery_app.conf.task_account_status_changes_queue), + serializer='json' + ) diff --git a/osf/management/commands/publish_account_changes.py b/osf/management/commands/publish_account_changes.py new file mode 100644 index 00000000000..577558e8ae6 --- /dev/null +++ b/osf/management/commands/publish_account_changes.py @@ -0,0 +1,40 @@ +from django.core.management.base import BaseCommand +from osf.models import OSFUser +from osf.external.messages.celery_publishers import ( + publish_deactivated_user, + publish_reactivate_user, + publish_merged_user, +) + +actions_to_functions = { + 'deactivate': publish_deactivated_user, + 'reactivate': publish_reactivate_user, + 'merge': publish_merged_user, +} + + +class Command(BaseCommand): + help = 'Sends a message to manage a user state, for test purposes only.' + + def add_arguments(self, parser): + parser.add_argument('user_guid', type=str, help='use the guid of the user to post.') + # Adding a new argument to specify the action to perform + parser.add_argument( + 'action', + type=str, + help='The action to perform on the user (deactivate, reactivate, merge).', + choices=['deactivate', 'reactivate', 'merge'] + ) + + def handle(self, *args, **options): + user_guid = options['user_guid'] + user = OSFUser.load(user_guid) + action = options['action'] + + # Using a mapping of action to function to simplify the control flow + + if action in actions_to_functions: + actions_to_functions[action](user) # Call the appropriate function + self.stdout.write(self.style.SUCCESS(f'Successfully {action} message for user: {user._id}')) + else: + self.stdout.write(self.style.ERROR('Invalid action specified.')) diff --git a/osf/models/user.py b/osf/models/user.py index 3d1351df51a..d9d7d5db68e 100644 --- a/osf/models/user.py +++ b/osf/models/user.py @@ -746,15 +746,6 @@ def merge_user(self, user): notifications_configured = user.notifications_configured.copy() notifications_configured.update(self.notifications_configured) self.notifications_configured = notifications_configured - if not website_settings.RUNNING_MIGRATION: - for key, value in user.mailchimp_mailing_lists.items(): - # subscribe to each list if either user was subscribed - subscription = value or self.mailchimp_mailing_lists.get(key) - signals.user_merged.send(self, list_name=key, subscription=subscription) - - # clear subscriptions for merged user - signals.user_merged.send(user, list_name=key, subscription=False, send_goodbye=False) - for target_id, timestamp in user.comments_viewed_timestamp.items(): if not self.comments_viewed_timestamp.get(target_id): self.comments_viewed_timestamp[target_id] = timestamp @@ -872,6 +863,7 @@ def merge_user(self, user): user.merged_by = self user.save() + signals.user_account_merged.send(user) def _merge_users_preprints(self, user): """ @@ -986,6 +978,7 @@ def deactivate_account(self): # Call to `unsubscribe` above saves, and can lead to stale data self.reload() self.is_disabled = True + signals.user_account_deactivated.send(self) # we must call both methods to ensure the current session is cleared and all existing # sessions are revoked. @@ -1002,6 +995,7 @@ def reactivate_account(self): self.requested_deactivation = False from website.mailchimp_utils import subscribe_on_confirm subscribe_on_confirm(self) + signals.user_account_reactivated.send(self) def update_is_active(self): """Update ``is_active`` to be consistent with the fields that diff --git a/osf_tests/test_user.py b/osf_tests/test_user.py index 2c4d961f77b..a2b1912f548 100644 --- a/osf_tests/test_user.py +++ b/osf_tests/test_user.py @@ -17,7 +17,7 @@ from importlib import import_module from framework.auth.exceptions import ExpiredTokenError, InvalidTokenError, ChangePasswordError -from framework.auth.signals import user_merged +from framework.auth.signals import user_account_merged from framework.analytics import get_total_activity_count from framework.exceptions import PermissionsError from framework.celery_tasks import handlers @@ -1507,7 +1507,7 @@ def test_send_user_merged_signal(self, mock_get_mailchimp_api, dupe, merge_dupe) with capture_signals() as mock_signals: merge_dupe() - assert mock_signals.signals_sent() == set([user_merged]) + assert mock_signals.signals_sent() == set([user_account_merged]) @pytest.mark.enable_enqueue_task @mock.patch('website.mailchimp_utils.get_mailchimp_api') diff --git a/tests/test_utils.py b/tests/test_utils.py index 60528d2176e..689f937e7ce 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,6 +6,8 @@ import time import unittest from django.utils import timezone +from django.dispatch import receiver + from flask import Flask from nose.tools import * # noqa (PEP8 asserts) @@ -13,12 +15,18 @@ from tests.base import OsfTestCase, DbTestCase from osf_tests.factories import RegistrationFactory, UserFactory, fake_email +from framework.auth.signals import ( + user_account_deactivated, + user_account_reactivated, + user_account_merged +) from framework.auth.utils import generate_csl_given_name from framework.routing import Rule, json_renderer from framework.utils import secure_filename, throttle_period_expired from api.base.utils import waterbutler_api_url_for from osf.utils.functional import rapply +from waffle.testutils import override_flag from website.routes import process_rules, OsfWebRenderer from website import settings from website.util import paths @@ -26,6 +34,10 @@ from website.project import utils as project_utils from website.profile import utils as profile_utils +from osf import features + +from kombu import Exchange + try: import magic # noqa LIBMAGIC_AVAILABLE = True @@ -453,3 +465,118 @@ def test_build_create_user_time_conflict(self): user_one_create = UserFactory() user_two_create = UserFactory() assert user_one_create.username != user_two_create.username + + +@pytest.mark.django_db +class TestUserSignals: + + @pytest.fixture + def user(self, db): + return UserFactory() + + @pytest.fixture + def old_user(self, db): + return UserFactory() + + @pytest.fixture + def deactivated_user(self, db): + user = UserFactory() + user.deactivate_account() + return user + + @pytest.fixture + def account_status_changes_exchange(self): + return Exchange('account_status_changes') + + @mock.patch('osf.external.messages.celery_publishers.publish_deactivated_user') + def test_user_account_deactivated_signal(self, mock_publish_deactivated_user, user): + # Connect a mock receiver to the signal for testing + @receiver(user_account_deactivated) + def mock_receiver(user, **kwargs): + return mock_publish_deactivated_user(user) + + # Trigger the signal + user.deactivate_account() + + # Verify that the mock receiver was called + mock_publish_deactivated_user.assert_called_once_with(user) + + @mock.patch('osf.external.messages.celery_publishers.publish_merged_user') + def test_user_account_merged_signal(self, mock_publish_merged_user, user, old_user): + # Connect a mock receiver to the signal for testing + @receiver(user_account_merged) + def mock_receiver(user, **kwargs): + return mock_publish_merged_user(user) + + # Trigger the signal + user.merge_user(old_user) + + # Verify that the mock receiver was called + mock_publish_merged_user.assert_called_once_with(old_user) + + @mock.patch('osf.external.messages.celery_publishers.publish_reactivate_user') + def test_user_account_deactivate_signal(self, mock_publish_reactivate_user, deactivated_user): + # Connect a mock receiver to the signal for testing + @receiver(user_account_reactivated) + def mock_receiver(user, **kwargs): + return mock_publish_reactivate_user(user) + + # Trigger the signal + deactivated_user.reactivate_account() + + # Verify that the mock receiver was called + mock_publish_reactivate_user.assert_called_once_with(deactivated_user) + + @pytest.mark.enable_account_status_messaging + @mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire') + def test_publish_body_on_deactivation(self, mock_publish_user_status_change, user, account_status_changes_exchange): + with mock.patch.object(settings, 'USE_CELERY', True): + with override_flag(features.ENABLE_GV, active=True): + user.deactivate_account() + + mock_publish_user_status_change().__enter__().publish.assert_called_once_with( + body={'action': 'deactivate', 'user_uri': f'http://localhost:5000/{user._id}'}, + exchange=account_status_changes_exchange, + serializer='json', + ) + + @pytest.mark.enable_account_status_messaging + @mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire') + def test_publish_body_on_reactivation( + self, + mock_publish_user_status_change, + deactivated_user, + account_status_changes_exchange + ): + with mock.patch.object(settings, 'USE_CELERY', True): + with override_flag(features.ENABLE_GV, active=True): + deactivated_user.reactivate_account() + + mock_publish_user_status_change().__enter__().publish.assert_called_once_with( + body={'action': 'reactivate', 'user_uri': f'http://localhost:5000/{deactivated_user._id}'}, + exchange=account_status_changes_exchange, + serializer='json', + ) + + @pytest.mark.enable_account_status_messaging + @mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire') + def test_publish_body_on_merger( + self, + mock_publish_user_status_change, + user, + old_user, + account_status_changes_exchange + ): + with mock.patch.object(settings, 'USE_CELERY', True): + with override_flag(features.ENABLE_GV, active=True): + user.merge_user(old_user) + + mock_publish_user_status_change().__enter__().publish.assert_called_once_with( + body={ + 'action': 'merge', + 'into_user_uri': f'http://localhost:5000/{user._id}', + 'from_user_uri': f'http://localhost:5000/{old_user._id}' + }, + exchange=account_status_changes_exchange, + serializer='json', + ) \ No newline at end of file diff --git a/website/mailchimp_utils.py b/website/mailchimp_utils.py index 2a9aa32ec54..58cad24e70d 100644 --- a/website/mailchimp_utils.py +++ b/website/mailchimp_utils.py @@ -74,7 +74,7 @@ def subscribe_mailchimp(list_name, user_id): user.save() -def unsubscribe_mailchimp(list_name, user_id, username=None, send_goodbye=True): +def unsubscribe_mailchimp(list_name, user_id, username=None): """Unsubscribe a user from a mailchimp mailing list given its name. :param str list_name: mailchimp mailing list name @@ -113,10 +113,10 @@ def unsubscribe_mailchimp(list_name, user_id, username=None, send_goodbye=True): @queued_task @app.task @transaction.atomic -def unsubscribe_mailchimp_async(list_name, user_id, username=None, send_goodbye=True): +def unsubscribe_mailchimp_async(list_name, user_id, username=None): """ Same args as unsubscribe_mailchimp, used to have the task be run asynchronously """ - unsubscribe_mailchimp(list_name=list_name, user_id=user_id, username=username, send_goodbye=send_goodbye) + unsubscribe_mailchimp(list_name=list_name, user_id=user_id, username=username) @user_confirmed.connect def subscribe_on_confirm(user): diff --git a/website/profile/views.py b/website/profile/views.py index 2007a3db21c..fe93f8219da 100644 --- a/website/profile/views.py +++ b/website/profile/views.py @@ -16,7 +16,11 @@ from framework.auth.decorators import must_be_confirmed from framework.auth.exceptions import ChangePasswordError from framework.auth.views import send_confirm_email -from framework.auth.signals import user_merged +from framework.auth.signals import ( + user_account_merged, + user_account_deactivated, + user_account_reactivated, +) from framework.exceptions import HTTPError, PermissionsError from framework.flask import redirect # VOL-aware redirect from framework.status import push_status_message @@ -36,6 +40,11 @@ from website.util import api_v2_url, web_url_for, paths from website.util.sanitize import escape_html from addons.base import utils as addon_utils +from osf.external.messages.celery_publishers import ( + publish_reactivate_user, + publish_deactivated_user, + publish_merged_user +) from api.waffle.utils import storage_i18n_flag_active @@ -506,8 +515,7 @@ def user_choose_mailing_lists(auth, **kwargs): return {'message': 'Successfully updated mailing lists', 'result': all_mailing_lists}, 200 -@user_merged.connect -def update_mailchimp_subscription(user, list_name, subscription, send_goodbye=True): +def update_mailchimp_subscription(user, list_name, subscription): """ Update mailing list subscription in mailchimp. :param obj user: current user @@ -521,12 +529,40 @@ def update_mailchimp_subscription(user, list_name, subscription, send_goodbye=Tr pass else: try: - mailchimp_utils.unsubscribe_mailchimp_async(list_name, user._id, username=user.username, send_goodbye=send_goodbye) + mailchimp_utils.unsubscribe_mailchimp_async(list_name, user._id, username=user.username) except (MailChimpError, OSFError): # User has already unsubscribed, so nothing to do pass +@user_account_merged.connect +def send_account_merged_message(user): + """ Sends a message using Celery messaging to alert other services that an osf.io user has been merged.""" + publish_merged_user(user) + + +@user_account_merged.connect +def unsubscribe_old_merged_account_from_mailchimp(user): + """ This is a merged account (an old account that was merged into an active one) so it needs to be unsubscribed + from mailchimp.""" + for key, value in user.mailchimp_mailing_lists.items(): + subscription = value or user.merged_by.mailchimp_mailing_lists.get(key) + update_mailchimp_subscription(user.merged_by, list_name=key, subscription=subscription) + update_mailchimp_subscription(user, list_name=key, subscription=False) + + +@user_account_deactivated.connect +def send_account_deactivation_message(user): + """ Sends a message using Celery messaging to alert other services that an osf.io user has been deactivated.""" + publish_deactivated_user(user) + + +@user_account_reactivated.connect +def send_account_reactivation_message(user): + """ Sends a message using Celery messaging to alert other services that an osf.io user has been reactivated.""" + publish_reactivate_user(user) + + def mailchimp_get_endpoint(**kwargs): """Endpoint that the mailchimp webhook hits to check that the OSF is responding""" return {}, http_status.HTTP_200_OK diff --git a/website/search/search.py b/website/search/search.py index 2e5c54f5d01..9195f2ea4c2 100644 --- a/website/search/search.py +++ b/website/search/search.py @@ -14,7 +14,7 @@ def requires_search(func): def wrapped(*args, **kwargs): - if search_engine is not None and not settings.RUNNING_MIGRATION: + if search_engine is not None: return func(*args, **kwargs) return wrapped diff --git a/website/settings/defaults.py b/website/settings/defaults.py index 001e52f0a3c..882a5713592 100644 --- a/website/settings/defaults.py +++ b/website/settings/defaults.py @@ -411,6 +411,7 @@ class CeleryConfig: task_med_queue = 'med' task_high_queue = 'high' task_remote_computing_queue = 'remote' + task_account_status_changes_queue = 'account_status_changes' remote_computing_modules = { 'addons.boa.tasks.submit_to_boa', @@ -488,6 +489,7 @@ class CeleryConfig: routing_key=task_med_queue, consumer_arguments={'x-priority': 1}), Queue(task_high_queue, Exchange(task_high_queue), routing_key=task_high_queue, consumer_arguments={'x-priority': 10}), + Queue(task_account_status_changes_queue, Exchange(task_account_status_changes_queue), routing_key=task_account_status_changes_queue) ) task_default_exchange_type = 'direct' @@ -765,10 +767,6 @@ class CeleryConfig: # Used for gathering meta information about the current build GITHUB_API_TOKEN = None -# switch for disabling things that shouldn't happen during -# the modm to django migration -RUNNING_MIGRATION = False - # External Identity Provider EXTERNAL_IDENTITY_PROFILE = { 'OrcidProfile': 'ORCID', diff --git a/website/signals.py b/website/signals.py index ff4988c3885..c1b8660dcd4 100644 --- a/website/signals.py +++ b/website/signals.py @@ -6,7 +6,8 @@ from website.conferences import signals as conference from website.reviews import signals as reviews -ALL_SIGNALS = [ + +ALL_SIGNALS = [ # TODO: Fix project.comment_added, project.mention_added, project.unreg_contributor_added, @@ -17,7 +18,9 @@ auth.user_confirmed, auth.user_email_removed, auth.user_registered, - auth.user_merged, + auth.user_account_deactivated, + auth.user_account_reactivated, + auth.user_account_merged, auth.unconfirmed_user_created, event.file_updated, conference.osf4m_user_created,