diff --git a/apps/api/migrations/preference-centralization/preference-centralization.migration.spec.ts b/apps/api/migrations/preference-centralization/preference-centralization.migration.spec.ts new file mode 100644 index 00000000000..72111ffdc4e --- /dev/null +++ b/apps/api/migrations/preference-centralization/preference-centralization.migration.spec.ts @@ -0,0 +1,111 @@ +import { expect } from 'chai'; +import { NestFactory } from '@nestjs/core'; +import { + SubscriberPreferenceRepository, + NotificationTemplateRepository, + PreferenceLevelEnum, + PreferencesRepository, +} from '@novu/dal'; +import { UpsertPreferences } from '@novu/application-generic'; +import { PreferencesTypeEnum } from '@novu/shared'; +import { UserSession } from '@novu/testing'; +import { INestApplication } from '@nestjs/common'; +import { AppModule } from '../../src/app.module'; +import { preferenceCentralization } from './preference-centralization.migration'; + +describe('Preference Centralization Migration', function () { + let app: INestApplication; + let session: UserSession; + let subscriberPreferenceRepository: SubscriberPreferenceRepository; + let notificationTemplateRepository: NotificationTemplateRepository; + let preferenceRepository: PreferencesRepository; + let upsertPreferences: UpsertPreferences; + + before(async () => { + app = await NestFactory.create(AppModule, { logger: false }); + subscriberPreferenceRepository = app.get(SubscriberPreferenceRepository); + notificationTemplateRepository = app.get(NotificationTemplateRepository); + preferenceRepository = app.get(PreferencesRepository); + upsertPreferences = app.get(UpsertPreferences); + }); + + beforeEach(async () => { + session = new UserSession(); + await session.initialize(); + + // Clean up the repositories before each test + await subscriberPreferenceRepository._model.deleteMany({}); + await notificationTemplateRepository._model.deleteMany({}); + await preferenceRepository._model.deleteMany({}); + }); + + after(async () => { + await app.close(); + }); + + it('should migrate subscriber global preferences', async function () { + await subscriberPreferenceRepository.create({ + _subscriberId: session.subscriberId, + _environmentId: session.environment._id, + _organizationId: session.organization._id, + level: PreferenceLevelEnum.GLOBAL, + channels: ['email', 'sms'], + }); + + await preferenceCentralization(); + + const updatedPreferences = await preferenceRepository.find({ + _environmentId: session.environment._id, + _organizationId: session.organization._id, + }); + expect(updatedPreferences.length).to.equal(1); + expect(updatedPreferences[0].type).to.equal(PreferencesTypeEnum.SUBSCRIBER_GLOBAL); + }); + + it('should migrate subscriber workflow preferences', async function () { + await subscriberPreferenceRepository.create({ + _subscriberId: session.subscriberId, + _templateId: NotificationTemplateRepository.createObjectId(), + _environmentId: session.environment._id, + _organizationId: session.organization._id, + level: PreferenceLevelEnum.TEMPLATE, + channels: ['push'], + }); + + await preferenceCentralization(); + + const updatedPreferences = await preferenceRepository.find({ + _environmentId: session.environment._id, + _organizationId: session.organization._id, + }); + expect(updatedPreferences.length).to.equal(1); + expect(updatedPreferences[0].type).to.equal(PreferencesTypeEnum.SUBSCRIBER_WORKFLOW); + }); + + it('should migrate workflow preferences', async function () { + await notificationTemplateRepository.create({ + _creatorId: session.user._id, + _environmentId: session.environment._id, + _organizationId: session.organization._id, + critical: true, + preferenceSettings: { email: true, sms: false, push: true, in_app: true, chat: true }, + }); + + await preferenceCentralization(); + + const updatedPreferences = await preferenceRepository.find({ + _environmentId: session.environment._id, + _organizationId: session.organization._id, + }); + expect(updatedPreferences.length).to.equal(2); + const workflowPreference = updatedPreferences.find( + (preference) => preference.type === PreferencesTypeEnum.WORKFLOW_RESOURCE + ); + const userPreference = updatedPreferences.find( + (preference) => preference.type === PreferencesTypeEnum.USER_WORKFLOW + ); + + expect(workflowPreference).to.exist; + expect(userPreference).to.exist; + }); +}); diff --git a/apps/api/migrations/preference-centralization/preference-centralization.migration.ts b/apps/api/migrations/preference-centralization/preference-centralization.migration.ts new file mode 100644 index 00000000000..b2486d06994 --- /dev/null +++ b/apps/api/migrations/preference-centralization/preference-centralization.migration.ts @@ -0,0 +1,146 @@ +import '../../src/config'; + +import { NestFactory } from '@nestjs/core'; +import { SubscriberPreferenceRepository, NotificationTemplateRepository, PreferenceLevelEnum } from '@novu/dal'; +import { + UpsertPreferences, + UpsertWorkflowPreferencesCommand, + UpsertUserWorkflowPreferencesCommand, + UpsertSubscriberGlobalPreferencesCommand, + UpsertSubscriberWorkflowPreferencesCommand, +} from '@novu/application-generic'; +import { buildWorkflowPreferencesFromPreferenceChannels, DEFAULT_WORKFLOW_PREFERENCES } from '@novu/shared'; + +import { AppModule } from '../../src/app.module'; + +const BATCH_SIZE = 100; + +const sleep = (ms: number) => { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +}; + +/** + * Migration to centralize workflow and subscriber preferences + */ +export async function preferenceCentralization() { + const app = await NestFactory.create(AppModule, { + logger: false, + }); + console.log('start migration - preference centralization'); + + const upsertPreferences = app.get(UpsertPreferences); + + const subscriberPreferenceRepository = app.get(SubscriberPreferenceRepository); + const subscriberPreferenceCursor = await subscriberPreferenceRepository._model + .find({}) + .batchSize(BATCH_SIZE) + .cursor(); + + const counter = { + subscriberGlobal: { + success: 0, + error: 0, + }, + subscriberWorkflow: { + success: 0, + error: 0, + }, + subscriberUnknown: { + success: 0, + error: 0, + }, + workflow: { + success: 0, + error: 0, + }, + }; + console.log('start workflow preference migration'); + + const workflowPreferenceRepository = app.get(NotificationTemplateRepository); + const workflowPreferenceCursor = await workflowPreferenceRepository._model.find({}).batchSize(BATCH_SIZE).cursor(); + + for await (const workflowPreference of workflowPreferenceCursor) { + try { + const userWorkflowPreferenceToUpsert = UpsertUserWorkflowPreferencesCommand.create({ + userId: workflowPreference._creatorId.toString(), + templateId: workflowPreference._id.toString(), + environmentId: workflowPreference._environmentId.toString(), + organizationId: workflowPreference._organizationId.toString(), + preferences: buildWorkflowPreferencesFromPreferenceChannels( + workflowPreference.critical, + workflowPreference.preferenceSettings + ), + }); + + await upsertPreferences.upsertUserWorkflowPreferences(userWorkflowPreferenceToUpsert); + + const workflowPreferenceToUpsert = UpsertWorkflowPreferencesCommand.create({ + templateId: workflowPreference._id.toString(), + environmentId: workflowPreference._environmentId.toString(), + organizationId: workflowPreference._organizationId.toString(), + preferences: DEFAULT_WORKFLOW_PREFERENCES, + }); + + await upsertPreferences.upsertWorkflowPreferences(workflowPreferenceToUpsert); + + counter.workflow.success += 1; + } catch (error) { + console.error(error); + counter.workflow.error += 1; + } + } + + console.log('end workflow preference migration'); + console.log({ counter }); + console.log('start subscriber preference migration'); + + for await (const subscriberPreference of subscriberPreferenceCursor) { + await sleep(10); + if (subscriberPreference.level === PreferenceLevelEnum.GLOBAL) { + try { + const preferenceToUpsert = UpsertSubscriberGlobalPreferencesCommand.create({ + _subscriberId: subscriberPreference._subscriberId.toString(), + environmentId: subscriberPreference._environmentId.toString(), + organizationId: subscriberPreference._organizationId.toString(), + preferences: buildWorkflowPreferencesFromPreferenceChannels(false, subscriberPreference.channels), + }); + + await upsertPreferences.upsertSubscriberGlobalPreferences(preferenceToUpsert); + + counter.subscriberGlobal.success += 1; + } catch (error) { + console.error(error); + counter.subscriberGlobal.error += 1; + } + } else if (subscriberPreference.level === PreferenceLevelEnum.TEMPLATE) { + try { + const preferenceToUpsert = UpsertSubscriberWorkflowPreferencesCommand.create({ + _subscriberId: subscriberPreference._subscriberId.toString(), + templateId: subscriberPreference._templateId.toString(), + environmentId: subscriberPreference._environmentId.toString(), + organizationId: subscriberPreference._organizationId.toString(), + preferences: buildWorkflowPreferencesFromPreferenceChannels(false, subscriberPreference.channels), + }); + + await upsertPreferences.upsertSubscriberWorkflowPreferences(preferenceToUpsert); + + counter.subscriberWorkflow.success += 1; + } catch (error) { + console.error(error); + counter.subscriberWorkflow.error += 1; + } + } else { + console.error(`Invalid preference level ${subscriberPreference.level}`); + counter.subscriberUnknown.error += 1; + } + } + console.log('end subscriber preference migration'); + console.log({ counter }); + console.log('end migration - preference centralization'); + + app.close(); +} + +preferenceCentralization();