diff --git a/.example.env b/.example.env index feb4bef6..9bfd870e 100644 --- a/.example.env +++ b/.example.env @@ -33,7 +33,9 @@ JWT_REFRESH_SECRET=abc1234 # NOVU - You don't need this when running locally (just verify your email from the database) NOVU_API_KEY=Get it from https://novu.co/ -# Memphis + +# Memphis (Event streaming service) +MEMPHIS_ENABLE=false # Set this to true if you want to use the event streaming service MEMPHIS_HOST=Get it from https://memphis.dev/ MEMPHIS_USERNAME=Get it from https://memphis.dev/ MEMPHIS_PASSWORD=Get it from https://memphis.dev/ diff --git a/apps/backend/src/shared/decorators/ip/ip.decorator.ts b/apps/backend/src/shared/decorators/ip/ip.decorator.ts new file mode 100644 index 00000000..93a879bc --- /dev/null +++ b/apps/backend/src/shared/decorators/ip/ip.decorator.ts @@ -0,0 +1,7 @@ +import { createParamDecorator, ExecutionContext } from '@nestjs/common'; + +export const Ip = createParamDecorator((data: unknown, ctx: ExecutionContext) => { + const request = ctx.switchToHttp().getRequest(); + const ip = (request.headers['x-forwarded-for'] as string) || request.socket.remoteAddress; + return ip; +}); diff --git a/apps/backend/src/shortener/producer/shortener.producer.ts b/apps/backend/src/shortener/producer/shortener.producer.ts new file mode 100644 index 00000000..9255fb54 --- /dev/null +++ b/apps/backend/src/shortener/producer/shortener.producer.ts @@ -0,0 +1,12 @@ +import { Injectable } from '@nestjs/common'; +import { ProducerService } from '@reduced.to/queue-manager'; + +const SHORTENER_PRODUCER_NAME = 'shortener'; +const SHORTENER_QUEUE_NAME = 'stats'; + +@Injectable() +export class ShortenerProducer extends ProducerService { + constructor() { + super(SHORTENER_PRODUCER_NAME, SHORTENER_QUEUE_NAME); + } +} diff --git a/apps/backend/src/shortener/shortener.controller.ts b/apps/backend/src/shortener/shortener.controller.ts index 85a06087..31b2bc03 100644 --- a/apps/backend/src/shortener/shortener.controller.ts +++ b/apps/backend/src/shortener/shortener.controller.ts @@ -5,20 +5,34 @@ import { ShortenerService } from './shortener.service'; import { UserContext } from '../auth/interfaces/user-context'; import { OptionalJwtAuthGuard } from '../auth/guards/optional-jwt-auth.guard'; import { AppLoggerSerivce } from '@reduced.to/logger'; +import { ShortenerProducer } from './producer/shortener.producer'; +import { Ip } from '../shared/decorators/ip/ip.decorator'; @Controller({ path: 'shortener', version: '1', }) export class ShortenerController { - constructor(private readonly logger: AppLoggerSerivce, private readonly shortenerService: ShortenerService) {} + constructor( + private readonly logger: AppLoggerSerivce, + private readonly shortenerService: ShortenerService, + private readonly shortenerProducer: ShortenerProducer + ) {} @Get(':shortenedUrl') - async findOne(@Param('shortenedUrl') shortenedUrl: string): Promise { + async findOne(@Ip() ip: string, @Param('shortenedUrl') shortenedUrl: string): Promise { const originalUrl = await this.shortenerService.getOriginalUrl(shortenedUrl); if (!originalUrl) { throw new BadRequestException('Shortened url is wrong or expired'); } + + // Send an event to the queue to update the shortened url's stats + await this.shortenerProducer.publish({ + ip, + shortenedUrl, + originalUrl: originalUrl, + }); + return originalUrl; } diff --git a/apps/backend/src/shortener/shortener.module.ts b/apps/backend/src/shortener/shortener.module.ts index 89b0d244..47422f86 100644 --- a/apps/backend/src/shortener/shortener.module.ts +++ b/apps/backend/src/shortener/shortener.module.ts @@ -2,11 +2,13 @@ import { Module } from '@nestjs/common'; import { ShortenerController } from './shortener.controller'; import { ShortenerService } from './shortener.service'; import { PrismaModule } from '@reduced.to/prisma'; +import { ShortenerProducer } from './producer/shortener.producer'; +import { QueueManagerModule, QueueManagerService } from '@reduced.to/queue-manager'; @Module({ - imports: [PrismaModule], + imports: [PrismaModule, QueueManagerModule], controllers: [ShortenerController], - providers: [ShortenerService], + providers: [ShortenerService, ShortenerProducer, QueueManagerService], exports: [ShortenerService], }) export class ShortenerModule {} diff --git a/jest.preset.js b/jest.preset.js index f078ddce..61e2da2a 100644 --- a/jest.preset.js +++ b/jest.preset.js @@ -1,3 +1,8 @@ const nxPreset = require('@nx/jest/preset').default; module.exports = { ...nxPreset }; + +// Set the NODE_ENV to test +process.env = Object.assign(process.env, { + NODE_ENV: 'test', +}); diff --git a/libs/config/src/lib/config.factory.ts b/libs/config/src/lib/config.factory.ts index 99223c40..d38ee45c 100644 --- a/libs/config/src/lib/config.factory.ts +++ b/libs/config/src/lib/config.factory.ts @@ -38,6 +38,7 @@ export const configFactory: ConfigFactory<{ config: Configuration }> = () => { apiKey: process.env.NOVU_API_KEY, }, memphis: { + enable: process.env.MEMPHIS_ENABLE === 'true' || false, host: process.env.MEMPHIS_HOST, username: process.env.MEMPHIS_USERNAME, password: process.env.MEMPHIS_PASSWORD, @@ -88,6 +89,7 @@ export interface NovuConfig { } export interface MemphisConfig { + enable: boolean; host: string; username: string; password: string; diff --git a/libs/queue-manager/src/lib/__mocks__/queue-manager.service.ts b/libs/queue-manager/src/lib/__mocks__/queue-manager.service.ts index 1c55ea34..6a5a2556 100644 --- a/libs/queue-manager/src/lib/__mocks__/queue-manager.service.ts +++ b/libs/queue-manager/src/lib/__mocks__/queue-manager.service.ts @@ -1,6 +1,5 @@ -// Mock implementation for the queue manager service (memphis-dev) export class MemphisMock { - produce(payload: any) { + produce(payload: { stationName: string; producerName: string; message: any }) { console.log(`Producing message to ${payload.stationName}`); } } @@ -13,7 +12,7 @@ export class QueueManagerService { this.queueManager = new MemphisMock(); } - getQueueManager() { + get client() { // Mock implementation for getting the queue manager return this.queueManager; } diff --git a/libs/queue-manager/src/lib/producer/producer.service.spec.ts b/libs/queue-manager/src/lib/producer/producer.service.spec.ts index adf5b507..ef755b6f 100644 --- a/libs/queue-manager/src/lib/producer/producer.service.spec.ts +++ b/libs/queue-manager/src/lib/producer/producer.service.spec.ts @@ -1,53 +1,57 @@ -import { Inject } from '@nestjs/common'; import { Test, TestingModule } from '@nestjs/testing'; -import { ProducerService, QUEUE_MANAGER_INJECTION_TOKEN, QueueManagerModule, QueueManagerService } from '@reduced.to/queue-manager'; -import { Memphis } from 'memphis-dev'; +import { ProducerService, QueueManagerModule, QueueManagerService } from '@reduced.to/queue-manager'; import { AppConfigModule } from '@reduced.to/config'; +import { Injectable } from '@nestjs/common'; +import { AppLoggerModule } from '@reduced.to/logger'; jest.mock('../queue-manager.service'); describe('ProducerService', () => { - const TEST_PRODUCER_NAME = 'test-producer'; + const TEST_QUEUE_NAME = 'test-queue'; + + @Injectable() class TestProducerService extends ProducerService { - constructor(@Inject(QUEUE_MANAGER_INJECTION_TOKEN) queueManager: Memphis) { - super(queueManager, TEST_PRODUCER_NAME); + constructor() { + super(TEST_PRODUCER_NAME, TEST_QUEUE_NAME); } } let service: TestProducerService; + let queueManager: QueueManagerService; beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ - imports: [AppConfigModule, QueueManagerModule], + imports: [AppConfigModule, AppLoggerModule, QueueManagerModule], providers: [TestProducerService], }).compile(); service = module.get(TestProducerService); + queueManager = module.get(QueueManagerService); }); it('should be defined', () => { expect(service).toBeDefined(); }); - it('should get the queue manager', () => { - expect(service.getQueueManager()).toBeDefined(); + it('should get the producer name', () => { + expect(service.name).toBe(TEST_PRODUCER_NAME); }); - it('should get the producer name', () => { - expect(service.getName()).toBe(TEST_PRODUCER_NAME); + it('should get the queue name', () => { + expect(service.queueName).toBe(TEST_QUEUE_NAME); }); it('should publish a message to the queue', async () => { - const queueManagerSpy = jest.spyOn(service.getQueueManager(), 'produce'); + console.log(queueManager); + const queueManagerSpy = jest.spyOn(queueManager.client, 'produce'); - const QUEUE_NAME = 'test-queue'; const PAYLOAD = { message: 'test', 1: 2 }; - await service.publish(QUEUE_NAME, PAYLOAD); + await service.publish(PAYLOAD); expect(queueManagerSpy).toBeCalledTimes(1); expect(queueManagerSpy).toBeCalledWith({ - stationName: QUEUE_NAME, + stationName: TEST_QUEUE_NAME, producerName: TEST_PRODUCER_NAME, message: PAYLOAD, }); diff --git a/libs/queue-manager/src/lib/producer/producer.service.ts b/libs/queue-manager/src/lib/producer/producer.service.ts index b64d875e..3b9c0f6b 100644 --- a/libs/queue-manager/src/lib/producer/producer.service.ts +++ b/libs/queue-manager/src/lib/producer/producer.service.ts @@ -1,21 +1,34 @@ -import { Inject } from '@nestjs/common'; -import { QUEUE_MANAGER_INJECTION_TOKEN, QueueManagerService } from '@reduced.to/queue-manager'; -import { Memphis } from 'memphis-dev/*'; +import { Inject, Injectable } from '@nestjs/common'; +import { QueueManagerService } from '@reduced.to/queue-manager'; +import { AppConfigService } from '@reduced.to/config'; +import { AppLoggerSerivce } from '@reduced.to/logger'; -export abstract class ProducerService extends QueueManagerService { - constructor(@Inject(QUEUE_MANAGER_INJECTION_TOKEN) queueManager: Memphis, private readonly name: string) { - super(queueManager); +export abstract class ProducerService { + @Inject(AppLoggerSerivce) private readonly logger: AppLoggerSerivce; + @Inject(AppConfigService) private readonly config: AppConfigService; + @Inject(QueueManagerService) private readonly queueManager: QueueManagerService; + + constructor(private readonly producerName: string, private readonly queue: string) {} + + get name() { + return this.producerName; } - getName() { - return this.name; + get queueName() { + return this.queue; } - async publish(queueName: string, data: any) { - return this.getQueueManager().produce({ - stationName: queueName, + async publish(message: any) { + // Do not publish if Memphis is disabled + if (!this.config.getConfig().memphis.enable) { + return; + } + + this.logger.log(`Publishing message to ${this.queueName} with producer ${this.producerName}`); + return this.queueManager.client.produce({ + stationName: this.queue, producerName: this.name, - message: data, + message, }); } } diff --git a/libs/queue-manager/src/lib/queue-manager.module.ts b/libs/queue-manager/src/lib/queue-manager.module.ts index bd7780d2..202da029 100644 --- a/libs/queue-manager/src/lib/queue-manager.module.ts +++ b/libs/queue-manager/src/lib/queue-manager.module.ts @@ -1,6 +1,7 @@ -import { Module } from '@nestjs/common'; +import { Global, Module } from '@nestjs/common'; import { AppConfigService } from '@reduced.to/config'; import { memphis } from 'memphis-dev'; +import { QueueManagerService } from './queue-manager.service'; export const QUEUE_MANAGER_INJECTION_TOKEN = 'QUEUE_MANAGER'; @@ -8,12 +9,19 @@ const queueManagerFactory = { provide: QUEUE_MANAGER_INJECTION_TOKEN, useFactory: async (config: AppConfigService) => { const memphisConfig = config.getConfig().memphis; + + // Do not connect to Memphis in test environment / if Memphis is disabled + if (config.getConfig().general.env === 'test' || memphisConfig.enable === false) { + return; + } + const memphisConnection = await memphis.connect(memphisConfig); return memphisConnection; }, inject: [AppConfigService], }; +@Global() @Module({ providers: [queueManagerFactory], exports: [queueManagerFactory], diff --git a/libs/queue-manager/src/lib/queue-manager.service.ts b/libs/queue-manager/src/lib/queue-manager.service.ts index fa358146..1dc64b3d 100644 --- a/libs/queue-manager/src/lib/queue-manager.service.ts +++ b/libs/queue-manager/src/lib/queue-manager.service.ts @@ -1,12 +1,12 @@ import { Inject, Injectable } from '@nestjs/common'; import { QUEUE_MANAGER_INJECTION_TOKEN } from '@reduced.to/queue-manager'; -import { Memphis } from 'memphis-dev/*'; +import { Memphis } from 'memphis-dev'; @Injectable() export class QueueManagerService { constructor(@Inject(QUEUE_MANAGER_INJECTION_TOKEN) private readonly queueManager: Memphis) {} - getQueueManager() { + get client() { return this.queueManager; } }