Skip to content

Commit

Permalink
[ENG-5352][ENG-5267][ENG-5144] Messaging for Account Status Changes (…
Browse files Browse the repository at this point in the history
…OSF-side) (CenterForOpenScience#10565)

* Add Rabbit Queue for OSF->Addons Service communication and send user account status changes
* Refactor some mailchimp stuff on the way

---------

Co-authored-by: John Tordoff <>
  • Loading branch information
Johnetordoff authored Apr 11, 2024
1 parent 36a9e47 commit 3448bba
Show file tree
Hide file tree
Showing 14 changed files with 275 additions and 28 deletions.
4 changes: 2 additions & 2 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ def fake():
'mark': 'enable_search',
'replacement': mock.MagicMock()
},
'website.search.elastic_search': {
'mark': 'enable_search',
'osf.external.messages.celery_publishers._publish_user_status_change': {
'mark': 'enable_account_status_messaging',
'replacement': mock.MagicMock()
}
}
Expand Down
4 changes: 3 additions & 1 deletion framework/auth/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
user_registered = signals.signal('user-registered')
user_confirmed = signals.signal('user-confirmed')
user_email_removed = signals.signal('user-email-removed')
user_merged = signals.signal('user-merged')
user_account_merged = signals.signal('user-account-merged')
user_account_deactivated = signals.signal('user-account-deactivated')
user_account_reactivated = signals.signal('user-account-reactivated')

unconfirmed_user_created = signals.signal('unconfirmed-user-created')
2 changes: 2 additions & 0 deletions framework/celery_tasks/routers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def match_by_module(task_path):
return CeleryConfig.task_high_queue
if task_subpath in CeleryConfig.remote_computing_modules:
return CeleryConfig.task_remote_computing_queue
if task_subpath in CeleryConfig.task_account_status_changes_queue:
return CeleryConfig.task_account_status_changes_queue
return CeleryConfig.task_default_queue


Expand Down
Empty file.
45 changes: 45 additions & 0 deletions osf/external/messages/celery_publishers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import waffle
from kombu import Exchange
from framework.celery_tasks import app as celery_app
from website import settings
from osf import features
from osf.utils.requests import get_current_request


def publish_deactivated_user(user):
_publish_user_status_change(
body={
'action': 'deactivate',
'user_uri': user.get_semantic_iri(),
}
)


def publish_reactivate_user(user):
_publish_user_status_change(
body={
'action': 'reactivate',
'user_uri': user.get_semantic_iri(),
},
)


def publish_merged_user(user):
assert user.merged_by, 'User received merge signal, but has no `merged_by` reference.'
_publish_user_status_change(
body={
'action': 'merge',
'into_user_uri': user.merged_by.get_semantic_iri(),
'from_user_uri': user.get_semantic_iri(),
},
)


def _publish_user_status_change(body: dict):
if settings.USE_CELERY and waffle.flag_is_active(get_current_request(), features.ENABLE_GV):
with celery_app.producer_pool.acquire() as producer:
producer.publish(
body=body,
exchange=Exchange(celery_app.conf.task_account_status_changes_queue),
serializer='json'
)
40 changes: 40 additions & 0 deletions osf/management/commands/publish_account_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from django.core.management.base import BaseCommand
from osf.models import OSFUser
from osf.external.messages.celery_publishers import (
publish_deactivated_user,
publish_reactivate_user,
publish_merged_user,
)

actions_to_functions = {
'deactivate': publish_deactivated_user,
'reactivate': publish_reactivate_user,
'merge': publish_merged_user,
}


class Command(BaseCommand):
help = 'Sends a message to manage a user state, for test purposes only.'

def add_arguments(self, parser):
parser.add_argument('user_guid', type=str, help='use the guid of the user to post.')
# Adding a new argument to specify the action to perform
parser.add_argument(
'action',
type=str,
help='The action to perform on the user (deactivate, reactivate, merge).',
choices=['deactivate', 'reactivate', 'merge']
)

def handle(self, *args, **options):
user_guid = options['user_guid']
user = OSFUser.load(user_guid)
action = options['action']

# Using a mapping of action to function to simplify the control flow

if action in actions_to_functions:
actions_to_functions[action](user) # Call the appropriate function
self.stdout.write(self.style.SUCCESS(f'Successfully {action} message for user: {user._id}'))
else:
self.stdout.write(self.style.ERROR('Invalid action specified.'))
12 changes: 3 additions & 9 deletions osf/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -746,15 +746,6 @@ def merge_user(self, user):
notifications_configured = user.notifications_configured.copy()
notifications_configured.update(self.notifications_configured)
self.notifications_configured = notifications_configured
if not website_settings.RUNNING_MIGRATION:
for key, value in user.mailchimp_mailing_lists.items():
# subscribe to each list if either user was subscribed
subscription = value or self.mailchimp_mailing_lists.get(key)
signals.user_merged.send(self, list_name=key, subscription=subscription)

# clear subscriptions for merged user
signals.user_merged.send(user, list_name=key, subscription=False, send_goodbye=False)

for target_id, timestamp in user.comments_viewed_timestamp.items():
if not self.comments_viewed_timestamp.get(target_id):
self.comments_viewed_timestamp[target_id] = timestamp
Expand Down Expand Up @@ -872,6 +863,7 @@ def merge_user(self, user):
user.merged_by = self

user.save()
signals.user_account_merged.send(user)

def _merge_users_preprints(self, user):
"""
Expand Down Expand Up @@ -986,6 +978,7 @@ def deactivate_account(self):
# Call to `unsubscribe` above saves, and can lead to stale data
self.reload()
self.is_disabled = True
signals.user_account_deactivated.send(self)

# we must call both methods to ensure the current session is cleared and all existing
# sessions are revoked.
Expand All @@ -1002,6 +995,7 @@ def reactivate_account(self):
self.requested_deactivation = False
from website.mailchimp_utils import subscribe_on_confirm
subscribe_on_confirm(self)
signals.user_account_reactivated.send(self)

def update_is_active(self):
"""Update ``is_active`` to be consistent with the fields that
Expand Down
4 changes: 2 additions & 2 deletions osf_tests/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from importlib import import_module

from framework.auth.exceptions import ExpiredTokenError, InvalidTokenError, ChangePasswordError
from framework.auth.signals import user_merged
from framework.auth.signals import user_account_merged
from framework.analytics import get_total_activity_count
from framework.exceptions import PermissionsError
from framework.celery_tasks import handlers
Expand Down Expand Up @@ -1507,7 +1507,7 @@ def test_send_user_merged_signal(self, mock_get_mailchimp_api, dupe, merge_dupe)

with capture_signals() as mock_signals:
merge_dupe()
assert mock_signals.signals_sent() == set([user_merged])
assert mock_signals.signals_sent() == set([user_account_merged])

@pytest.mark.enable_enqueue_task
@mock.patch('website.mailchimp_utils.get_mailchimp_api')
Expand Down
127 changes: 127 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,38 @@
import time
import unittest
from django.utils import timezone
from django.dispatch import receiver


from flask import Flask
from nose.tools import * # noqa (PEP8 asserts)
import blinker

from tests.base import OsfTestCase, DbTestCase
from osf_tests.factories import RegistrationFactory, UserFactory, fake_email
from framework.auth.signals import (
user_account_deactivated,
user_account_reactivated,
user_account_merged
)

from framework.auth.utils import generate_csl_given_name
from framework.routing import Rule, json_renderer
from framework.utils import secure_filename, throttle_period_expired
from api.base.utils import waterbutler_api_url_for
from osf.utils.functional import rapply
from waffle.testutils import override_flag
from website.routes import process_rules, OsfWebRenderer
from website import settings
from website.util import paths
from website.util import web_url_for, api_url_for, is_json_request, conjunct, api_v2_url
from website.project import utils as project_utils
from website.profile import utils as profile_utils

from osf import features

from kombu import Exchange

try:
import magic # noqa
LIBMAGIC_AVAILABLE = True
Expand Down Expand Up @@ -453,3 +465,118 @@ def test_build_create_user_time_conflict(self):
user_one_create = UserFactory()
user_two_create = UserFactory()
assert user_one_create.username != user_two_create.username


@pytest.mark.django_db
class TestUserSignals:

@pytest.fixture
def user(self, db):
return UserFactory()

@pytest.fixture
def old_user(self, db):
return UserFactory()

@pytest.fixture
def deactivated_user(self, db):
user = UserFactory()
user.deactivate_account()
return user

@pytest.fixture
def account_status_changes_exchange(self):
return Exchange('account_status_changes')

@mock.patch('osf.external.messages.celery_publishers.publish_deactivated_user')
def test_user_account_deactivated_signal(self, mock_publish_deactivated_user, user):
# Connect a mock receiver to the signal for testing
@receiver(user_account_deactivated)
def mock_receiver(user, **kwargs):
return mock_publish_deactivated_user(user)

# Trigger the signal
user.deactivate_account()

# Verify that the mock receiver was called
mock_publish_deactivated_user.assert_called_once_with(user)

@mock.patch('osf.external.messages.celery_publishers.publish_merged_user')
def test_user_account_merged_signal(self, mock_publish_merged_user, user, old_user):
# Connect a mock receiver to the signal for testing
@receiver(user_account_merged)
def mock_receiver(user, **kwargs):
return mock_publish_merged_user(user)

# Trigger the signal
user.merge_user(old_user)

# Verify that the mock receiver was called
mock_publish_merged_user.assert_called_once_with(old_user)

@mock.patch('osf.external.messages.celery_publishers.publish_reactivate_user')
def test_user_account_deactivate_signal(self, mock_publish_reactivate_user, deactivated_user):
# Connect a mock receiver to the signal for testing
@receiver(user_account_reactivated)
def mock_receiver(user, **kwargs):
return mock_publish_reactivate_user(user)

# Trigger the signal
deactivated_user.reactivate_account()

# Verify that the mock receiver was called
mock_publish_reactivate_user.assert_called_once_with(deactivated_user)

@pytest.mark.enable_account_status_messaging
@mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire')
def test_publish_body_on_deactivation(self, mock_publish_user_status_change, user, account_status_changes_exchange):
with mock.patch.object(settings, 'USE_CELERY', True):
with override_flag(features.ENABLE_GV, active=True):
user.deactivate_account()

mock_publish_user_status_change().__enter__().publish.assert_called_once_with(
body={'action': 'deactivate', 'user_uri': f'http://localhost:5000/{user._id}'},
exchange=account_status_changes_exchange,
serializer='json',
)

@pytest.mark.enable_account_status_messaging
@mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire')
def test_publish_body_on_reactivation(
self,
mock_publish_user_status_change,
deactivated_user,
account_status_changes_exchange
):
with mock.patch.object(settings, 'USE_CELERY', True):
with override_flag(features.ENABLE_GV, active=True):
deactivated_user.reactivate_account()

mock_publish_user_status_change().__enter__().publish.assert_called_once_with(
body={'action': 'reactivate', 'user_uri': f'http://localhost:5000/{deactivated_user._id}'},
exchange=account_status_changes_exchange,
serializer='json',
)

@pytest.mark.enable_account_status_messaging
@mock.patch('osf.external.messages.celery_publishers.celery_app.producer_pool.acquire')
def test_publish_body_on_merger(
self,
mock_publish_user_status_change,
user,
old_user,
account_status_changes_exchange
):
with mock.patch.object(settings, 'USE_CELERY', True):
with override_flag(features.ENABLE_GV, active=True):
user.merge_user(old_user)

mock_publish_user_status_change().__enter__().publish.assert_called_once_with(
body={
'action': 'merge',
'into_user_uri': f'http://localhost:5000/{user._id}',
'from_user_uri': f'http://localhost:5000/{old_user._id}'
},
exchange=account_status_changes_exchange,
serializer='json',
)
6 changes: 3 additions & 3 deletions website/mailchimp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def subscribe_mailchimp(list_name, user_id):
user.save()


def unsubscribe_mailchimp(list_name, user_id, username=None, send_goodbye=True):
def unsubscribe_mailchimp(list_name, user_id, username=None):
"""Unsubscribe a user from a mailchimp mailing list given its name.
:param str list_name: mailchimp mailing list name
Expand Down Expand Up @@ -113,10 +113,10 @@ def unsubscribe_mailchimp(list_name, user_id, username=None, send_goodbye=True):
@queued_task
@app.task
@transaction.atomic
def unsubscribe_mailchimp_async(list_name, user_id, username=None, send_goodbye=True):
def unsubscribe_mailchimp_async(list_name, user_id, username=None):
""" Same args as unsubscribe_mailchimp, used to have the task be run asynchronously
"""
unsubscribe_mailchimp(list_name=list_name, user_id=user_id, username=username, send_goodbye=send_goodbye)
unsubscribe_mailchimp(list_name=list_name, user_id=user_id, username=username)

@user_confirmed.connect
def subscribe_on_confirm(user):
Expand Down
Loading

0 comments on commit 3448bba

Please sign in to comment.