Skip to content

Commit

Permalink
feat: use create a basic producer to send messages over to memphis cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
orig committed Sep 22, 2023
1 parent 129496f commit 27a3564
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 38 deletions.
4 changes: 3 additions & 1 deletion .example.env
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ JWT_REFRESH_SECRET=abc1234
# NOVU - You don't need this when running locally (just verify your email from the database)
NOVU_API_KEY=Get it from https://novu.co/

# Memphis

# Memphis (Event streaming service)
MEMPHIS_ENABLE=false # Set this to true if you want to use the event streaming service
MEMPHIS_HOST=Get it from https://memphis.dev/
MEMPHIS_USERNAME=Get it from https://memphis.dev/
MEMPHIS_PASSWORD=Get it from https://memphis.dev/
Expand Down
7 changes: 7 additions & 0 deletions apps/backend/src/shared/decorators/ip/ip.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { createParamDecorator, ExecutionContext } from '@nestjs/common';

export const Ip = createParamDecorator((data: unknown, ctx: ExecutionContext) => {
const request = ctx.switchToHttp().getRequest();
const ip = (request.headers['x-forwarded-for'] as string) || request.socket.remoteAddress;
return ip;
});
12 changes: 12 additions & 0 deletions apps/backend/src/shortener/producer/shortener.producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { Injectable } from '@nestjs/common';
import { ProducerService } from '@reduced.to/queue-manager';

const SHORTENER_PRODUCER_NAME = 'shortener';
const SHORTENER_QUEUE_NAME = 'stats';

@Injectable()
export class ShortenerProducer extends ProducerService {
constructor() {
super(SHORTENER_PRODUCER_NAME, SHORTENER_QUEUE_NAME);
}
}
18 changes: 16 additions & 2 deletions apps/backend/src/shortener/shortener.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,34 @@ import { ShortenerService } from './shortener.service';
import { UserContext } from '../auth/interfaces/user-context';
import { OptionalJwtAuthGuard } from '../auth/guards/optional-jwt-auth.guard';
import { AppLoggerSerivce } from '@reduced.to/logger';
import { ShortenerProducer } from './producer/shortener.producer';
import { Ip } from '../shared/decorators/ip/ip.decorator';

@Controller({
path: 'shortener',
version: '1',
})
export class ShortenerController {
constructor(private readonly logger: AppLoggerSerivce, private readonly shortenerService: ShortenerService) {}
constructor(
private readonly logger: AppLoggerSerivce,
private readonly shortenerService: ShortenerService,
private readonly shortenerProducer: ShortenerProducer
) {}

@Get(':shortenedUrl')
async findOne(@Param('shortenedUrl') shortenedUrl: string): Promise<string> {
async findOne(@Ip() ip: string, @Param('shortenedUrl') shortenedUrl: string): Promise<string> {
const originalUrl = await this.shortenerService.getOriginalUrl(shortenedUrl);
if (!originalUrl) {
throw new BadRequestException('Shortened url is wrong or expired');
}

// Send an event to the queue to update the shortened url's stats
await this.shortenerProducer.publish({
ip,
shortenedUrl,
originalUrl: originalUrl,
});

return originalUrl;
}

Expand Down
6 changes: 4 additions & 2 deletions apps/backend/src/shortener/shortener.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import { Module } from '@nestjs/common';
import { ShortenerController } from './shortener.controller';
import { ShortenerService } from './shortener.service';
import { PrismaModule } from '@reduced.to/prisma';
import { ShortenerProducer } from './producer/shortener.producer';
import { QueueManagerModule, QueueManagerService } from '@reduced.to/queue-manager';

@Module({
imports: [PrismaModule],
imports: [PrismaModule, QueueManagerModule],
controllers: [ShortenerController],
providers: [ShortenerService],
providers: [ShortenerService, ShortenerProducer, QueueManagerService],
exports: [ShortenerService],
})
export class ShortenerModule {}
5 changes: 5 additions & 0 deletions jest.preset.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
const nxPreset = require('@nx/jest/preset').default;

module.exports = { ...nxPreset };

// Set the NODE_ENV to test
process.env = Object.assign(process.env, {
NODE_ENV: 'test',
});
2 changes: 2 additions & 0 deletions libs/config/src/lib/config.factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export const configFactory: ConfigFactory<{ config: Configuration }> = () => {
apiKey: process.env.NOVU_API_KEY,
},
memphis: {
enable: process.env.MEMPHIS_ENABLE === 'true' || false,
host: process.env.MEMPHIS_HOST,
username: process.env.MEMPHIS_USERNAME,
password: process.env.MEMPHIS_PASSWORD,
Expand Down Expand Up @@ -88,6 +89,7 @@ export interface NovuConfig {
}

export interface MemphisConfig {
enable: boolean;
host: string;
username: string;
password: string;
Expand Down
5 changes: 2 additions & 3 deletions libs/queue-manager/src/lib/__mocks__/queue-manager.service.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Mock implementation for the queue manager service (memphis-dev)
export class MemphisMock {
produce(payload: any) {
produce(payload: { stationName: string; producerName: string; message: any }) {
console.log(`Producing message to ${payload.stationName}`);
}
}
Expand All @@ -13,7 +12,7 @@ export class QueueManagerService {
this.queueManager = new MemphisMock();
}

getQueueManager() {
get client() {
// Mock implementation for getting the queue manager
return this.queueManager;
}
Expand Down
34 changes: 19 additions & 15 deletions libs/queue-manager/src/lib/producer/producer.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,57 @@
import { Inject } from '@nestjs/common';
import { Test, TestingModule } from '@nestjs/testing';
import { ProducerService, QUEUE_MANAGER_INJECTION_TOKEN, QueueManagerModule, QueueManagerService } from '@reduced.to/queue-manager';
import { Memphis } from 'memphis-dev';
import { ProducerService, QueueManagerModule, QueueManagerService } from '@reduced.to/queue-manager';
import { AppConfigModule } from '@reduced.to/config';
import { Injectable } from '@nestjs/common';
import { AppLoggerModule } from '@reduced.to/logger';

jest.mock('../queue-manager.service');

describe('ProducerService', () => {

const TEST_PRODUCER_NAME = 'test-producer';
const TEST_QUEUE_NAME = 'test-queue';

@Injectable()
class TestProducerService extends ProducerService {
constructor(@Inject(QUEUE_MANAGER_INJECTION_TOKEN) queueManager: Memphis) {
super(queueManager, TEST_PRODUCER_NAME);
constructor() {
super(TEST_PRODUCER_NAME, TEST_QUEUE_NAME);
}
}

let service: TestProducerService;
let queueManager: QueueManagerService;

beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [AppConfigModule, QueueManagerModule],
imports: [AppConfigModule, AppLoggerModule, QueueManagerModule],
providers: [TestProducerService],
}).compile();

service = module.get<TestProducerService>(TestProducerService);
queueManager = module.get<QueueManagerService>(QueueManagerService);
});

it('should be defined', () => {
expect(service).toBeDefined();
});

it('should get the queue manager', () => {
expect(service.getQueueManager()).toBeDefined();
it('should get the producer name', () => {
expect(service.name).toBe(TEST_PRODUCER_NAME);
});

it('should get the producer name', () => {
expect(service.getName()).toBe(TEST_PRODUCER_NAME);
it('should get the queue name', () => {
expect(service.queueName).toBe(TEST_QUEUE_NAME);
});

it('should publish a message to the queue', async () => {
const queueManagerSpy = jest.spyOn(service.getQueueManager(), 'produce');
console.log(queueManager);
const queueManagerSpy = jest.spyOn(queueManager.client, 'produce');

const QUEUE_NAME = 'test-queue';
const PAYLOAD = { message: 'test', 1: 2 };

await service.publish(QUEUE_NAME, PAYLOAD);
await service.publish(PAYLOAD);
expect(queueManagerSpy).toBeCalledTimes(1);
expect(queueManagerSpy).toBeCalledWith({
stationName: QUEUE_NAME,
stationName: TEST_QUEUE_NAME,
producerName: TEST_PRODUCER_NAME,
message: PAYLOAD,
});
Expand Down
37 changes: 25 additions & 12 deletions libs/queue-manager/src/lib/producer/producer.service.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
import { Inject } from '@nestjs/common';
import { QUEUE_MANAGER_INJECTION_TOKEN, QueueManagerService } from '@reduced.to/queue-manager';
import { Memphis } from 'memphis-dev/*';
import { Inject, Injectable } from '@nestjs/common';
import { QueueManagerService } from '@reduced.to/queue-manager';
import { AppConfigService } from '@reduced.to/config';
import { AppLoggerSerivce } from '@reduced.to/logger';

export abstract class ProducerService extends QueueManagerService {
constructor(@Inject(QUEUE_MANAGER_INJECTION_TOKEN) queueManager: Memphis, private readonly name: string) {
super(queueManager);
export abstract class ProducerService {
@Inject(AppLoggerSerivce) private readonly logger: AppLoggerSerivce;
@Inject(AppConfigService) private readonly config: AppConfigService;
@Inject(QueueManagerService) private readonly queueManager: QueueManagerService;

constructor(private readonly producerName: string, private readonly queue: string) {}

get name() {
return this.producerName;
}

getName() {
return this.name;
get queueName() {
return this.queue;
}

async publish(queueName: string, data: any) {
return this.getQueueManager().produce({
stationName: queueName,
async publish(message: any) {
// Do not publish if Memphis is disabled
if (!this.config.getConfig().memphis.enable) {
return;
}

this.logger.log(`Publishing message to ${this.queueName} with producer ${this.producerName}`);
return this.queueManager.client.produce({
stationName: this.queue,
producerName: this.name,
message: data,
message,
});
}
}
10 changes: 9 additions & 1 deletion libs/queue-manager/src/lib/queue-manager.module.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import { Module } from '@nestjs/common';
import { Global, Module } from '@nestjs/common';
import { AppConfigService } from '@reduced.to/config';
import { memphis } from 'memphis-dev';
import { QueueManagerService } from './queue-manager.service';

export const QUEUE_MANAGER_INJECTION_TOKEN = 'QUEUE_MANAGER';

const queueManagerFactory = {
provide: QUEUE_MANAGER_INJECTION_TOKEN,
useFactory: async (config: AppConfigService) => {
const memphisConfig = config.getConfig().memphis;

// Do not connect to Memphis in test environment / if Memphis is disabled
if (config.getConfig().general.env === 'test' || memphisConfig.enable === false) {
return;
}

const memphisConnection = await memphis.connect(memphisConfig);
return memphisConnection;
},
inject: [AppConfigService],
};

@Global()
@Module({
providers: [queueManagerFactory],
exports: [queueManagerFactory],
Expand Down
4 changes: 2 additions & 2 deletions libs/queue-manager/src/lib/queue-manager.service.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { Inject, Injectable } from '@nestjs/common';
import { QUEUE_MANAGER_INJECTION_TOKEN } from '@reduced.to/queue-manager';
import { Memphis } from 'memphis-dev/*';
import { Memphis } from 'memphis-dev';

@Injectable()
export class QueueManagerService {
constructor(@Inject(QUEUE_MANAGER_INJECTION_TOKEN) private readonly queueManager: Memphis) {}

getQueueManager() {
get client() {
return this.queueManager;
}
}

0 comments on commit 27a3564

Please sign in to comment.