Skip to content

Commit

Permalink
fix: refactor bull queue config to common module in content-publishin…
Browse files Browse the repository at this point in the history
…g service
  • Loading branch information
JoeCap08055 committed Jul 31, 2024
1 parent ec49704 commit 221d8aa
Show file tree
Hide file tree
Showing 60 changed files with 778 additions and 921 deletions.
86 changes: 10 additions & 76 deletions services/content-publishing/apps/api/src/api.module.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
import { Module } from '@nestjs/common';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { BullModule } from '@nestjs/bullmq';
import { ScheduleModule } from '@nestjs/schedule';
import { RedisModule } from '@songkeys/nestjs-redis';
import { BullBoardModule } from '@bull-board/nestjs';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { MulterModule } from '@nestjs/platform-express';
import { DevelopmentController } from './development.controller';
import * as QueueConstants from '../../../libs/common/src';
import { DevelopmentControllerV1 } from './controllers/v1/development.controller.v1';
import { ConfigModule, ConfigService } from '#libs/config';
import { QueuesModule, QueueConstants } from '#libs/queues';
import { IpfsService } from '#libs/utils/ipfs.client';
import { ApiService } from './api.service';
import { IpfsService } from '../../../libs/common/src/utils/ipfs.client';
import { ConfigModule } from '../../../libs/common/src/config/config.module';
import { ConfigService } from '../../../libs/common/src/config/config.service';
import { AssetController } from './asset.controller';
import { ContentController } from './content.controller';
import { HealthController } from './health.controller';
import { ProfileController } from './profile.controller';
import { HealthController } from './controllers/health.controller';
import { AssetControllerV1, ContentControllerV1, ProfileControllerV1 } from './controllers/v1';

@Module({
imports: [
Expand All @@ -31,68 +27,7 @@ import { ProfileController } from './profile.controller';
},
true, // isGlobal
),
BullModule.forRootAsync({
imports: [ConfigModule],
useFactory: (configService: ConfigService) => {
// Note: BullMQ doesn't honor a URL for the Redis connection, and
// JS URL doesn't parse 'redis://' as a valid protocol, so we fool
// it by changing the URL to use 'http://' in order to parse out
// the host, port, username, password, etc.
// We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but
// trying to keep the # of environment variables from proliferating
const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http'));
const { hostname, port, username, password, pathname } = url;
return {
connection: {
host: hostname || undefined,
port: port ? Number(port) : undefined,
username: username || undefined,
password: password || undefined,
db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined,
},
};
},
inject: [ConfigService],
}),
BullModule.registerQueue(
{
name: QueueConstants.ASSET_QUEUE_NAME,
},
{
name: QueueConstants.REQUEST_QUEUE_NAME,
},
{
name: QueueConstants.BROADCAST_QUEUE_NAME,
},
{
name: QueueConstants.REPLY_QUEUE_NAME,
},
{
name: QueueConstants.REACTION_QUEUE_NAME,
},
{
name: QueueConstants.TOMBSTONE_QUEUE_NAME,
},
{
name: QueueConstants.UPDATE_QUEUE_NAME,
},
{
name: QueueConstants.PROFILE_QUEUE_NAME,
},
{
name: QueueConstants.BATCH_QUEUE_NAME,
},
{
name: QueueConstants.PUBLISH_QUEUE_NAME,
},
{
name: QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME,
},
{
name: QueueConstants.STATUS_QUEUE_NAME,
},
),

QueuesModule,
// Bullboard UI
BullBoardModule.forRoot({
route: '/queues',
Expand Down Expand Up @@ -166,7 +101,6 @@ import { ProfileController } from './profile.controller';
}),
ScheduleModule.forRoot(),
MulterModule.registerAsync({
imports: [ConfigModule],
useFactory: async (configService: ConfigService) => ({
limits: {
fileSize: configService.fileUploadMaxSizeInBytes,
Expand All @@ -175,13 +109,13 @@ import { ProfileController } from './profile.controller';
inject: [ConfigService],
}),
],
providers: [ConfigService, ApiService, IpfsService],
providers: [ApiService, IpfsService],
// Controller order determines the order of display for docs
// v[Desc first][ABC Second], Health, and then Dev only last
controllers:
process.env?.ENVIRONMENT === 'dev'
? [AssetController, ContentController, ProfileController, HealthController, DevelopmentController]
: [AssetController, ContentController, ProfileController, HealthController],
? [AssetControllerV1, ContentControllerV1, ProfileControllerV1, HealthController, DevelopmentControllerV1]
: [AssetControllerV1, ContentControllerV1, ProfileControllerV1, HealthController],
exports: [],
})
export class ApiModule {}
40 changes: 26 additions & 14 deletions services/content-publishing/apps/api/src/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@ import { InjectRedis } from '@songkeys/nestjs-redis';
import Redis from 'ioredis';
import { HttpErrorByCode } from '@nestjs/common/utils/http-error-by-code.util';
import {
AnnouncementResponseDto,
AnnouncementTypeDto,
ASSET_QUEUE_NAME,
RequestTypeDto,
AnnouncementResponseDto,
AssetIncludedRequestDto,
IRequestJob,
isImage,
REQUEST_QUEUE_NAME,
RequestTypeDto,
UploadResponseDto,
} from '../../../libs/common/src';
import { calculateIpfsCID } from '../../../libs/common/src/utils/ipfs';
import { IAssetJob, IAssetMetadata } from '../../../libs/common/src/interfaces/asset-job.interface';
import { getAssetDataKey, getAssetMetadataKey, STORAGE_EXPIRE_UPPER_LIMIT_SECONDS } from '../../../libs/common/src/utils/redis';
} from '#libs/dtos';
import { IRequestJob, IAssetMetadata, IAssetJob } from '#libs/interfaces';
import { REQUEST_QUEUE_NAME, ASSET_QUEUE_NAME } from '#libs/queues/queue.constants';
import { calculateIpfsCID } from '#libs/utils/ipfs';
import { getAssetMetadataKey, getAssetDataKey, STORAGE_EXPIRE_UPPER_LIMIT_SECONDS } from '#libs/utils/redis';

@Injectable()
export class ApiService {
Expand Down Expand Up @@ -51,7 +49,11 @@ export class ApiService {
// not used in id calculation since the order in map might not be deterministic
data.assetToMimeType = assetToMimeType;
}
const job = await this.requestQueue.add(`Request Job - ${data.id}`, data, { jobId: data.id, removeOnFail: false, removeOnComplete: 2000 }); // TODO: should come from queue configs
const job = await this.requestQueue.add(`Request Job - ${data.id}`, data, {
jobId: data.id,
removeOnFail: false,
removeOnComplete: 2000,
}); // TODO: should come from queue configs
this.logger.debug('Enqueue Request Job: ', job);
return {
referenceId: data.id,
Expand All @@ -61,7 +63,9 @@ export class ApiService {
async validateAssetsAndFetchMetadata(content: AssetIncludedRequestDto): Promise<Map<string, string> | undefined> {
const checkingList: { onlyImage: boolean; referenceId: string }[] = [];
if (content.profile) {
content.profile.icon?.forEach((reference) => checkingList.push({ onlyImage: true, referenceId: reference.referenceId }));
content.profile.icon?.forEach((reference) =>
checkingList.push({ onlyImage: true, referenceId: reference.referenceId }),
);
} else if (content.content) {
content.content.assets?.forEach((asset) =>
asset.references?.forEach((reference) =>
Expand All @@ -73,12 +77,16 @@ export class ApiService {
);
}

const redisResults = await Promise.all(checkingList.map((obj) => this.redis.get(getAssetMetadataKey(obj.referenceId))));
const redisResults = await Promise.all(
checkingList.map((obj) => this.redis.get(getAssetMetadataKey(obj.referenceId))),
);
const errors: string[] = [];
const map = new Map();
redisResults.forEach((res, index) => {
if (res === null) {
errors.push(`${content.profile ? 'profile.icon' : 'content.assets'}.referenceId ${checkingList[index].referenceId} does not exist!`);
errors.push(
`${content.profile ? 'profile.icon' : 'content.assets'}.referenceId ${checkingList[index].referenceId} does not exist!`,
);
} else {
const metadata: IAssetMetadata = JSON.parse(res);
map[checkingList[index].referenceId] = metadata.mimeType;
Expand Down Expand Up @@ -106,7 +114,11 @@ export class ApiService {
const jobs: any[] = [];
files.forEach((f, index) => {
// adding data and metadata to the transaction
dataTransaction = dataTransaction.setex(getAssetDataKey(references[index]), STORAGE_EXPIRE_UPPER_LIMIT_SECONDS, f.buffer);
dataTransaction = dataTransaction.setex(
getAssetDataKey(references[index]),
STORAGE_EXPIRE_UPPER_LIMIT_SECONDS,
f.buffer,
);
metadataTransaction = metadataTransaction.setex(
getAssetMetadataKey(references[index]),
STORAGE_EXPIRE_UPPER_LIMIT_SECONDS,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import { Controller, HttpCode, HttpStatus, Logger, ParseFilePipeBuilder, Put, UploadedFiles, UseInterceptors } from '@nestjs/common';
import { FilesUploadDto, UploadResponseDto } from '#libs/dtos/common.dto';
import { DSNP_VALID_MIME_TYPES } from '#libs/dtos/validation.dto';
import {
Controller,
HttpCode,
HttpStatus,
Logger,
ParseFilePipeBuilder,
Put,
UploadedFiles,
UseInterceptors,
} from '@nestjs/common';
import { FilesInterceptor } from '@nestjs/platform-express';
import { ApiBody, ApiConsumes, ApiOperation, ApiTags } from '@nestjs/swagger';
import { ApiService } from './api.service';
import { DSNP_VALID_MIME_TYPES, FilesUploadDto, UploadResponseDto } from '../../../libs/common/src';
import { ApiService } from '../../api.service';

@Controller('v1/asset')
@ApiTags('v1/asset')
export class AssetController {
export class AssetControllerV1 {
private readonly logger: Logger;

constructor(private apiService: ApiService) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
import { Body, Controller, Delete, HttpCode, Logger, Param, Post, Put, UploadedFiles, UseInterceptors } from '@nestjs/common';
import { Body, Controller, Delete, HttpCode, Logger, Param, Post, Put } from '@nestjs/common';
import { ApiOperation, ApiTags } from '@nestjs/swagger';
import { ApiService } from './api.service';
import { ApiService } from '../../api.service';
import {
DsnpUserIdParam,
BroadcastDto,
AnnouncementResponseDto,
AnnouncementTypeDto,
AssetIncludedRequestDto,
BroadcastDto,
DsnpUserIdParam,
ReactionDto,
AnnouncementTypeDto,
ReplyDto,
TombstoneDto,
ReactionDto,
UpdateDto,
} from '../../../libs/common/src';
TombstoneDto,
} from '#libs/dtos';

@Controller('v1/content')
@ApiTags('v1/content')
export class ContentController {
export class ContentControllerV1 {
private readonly logger: Logger;

constructor(private apiService: ApiService) {
Expand All @@ -25,7 +25,10 @@ export class ContentController {
@Post(':userDsnpId/broadcast')
@ApiOperation({ summary: 'Create DSNP Broadcast for User' })
@HttpCode(202)
async broadcast(@Param() userDsnpId: DsnpUserIdParam, @Body() broadcastDto: BroadcastDto): Promise<AnnouncementResponseDto> {
async broadcast(
@Param() userDsnpId: DsnpUserIdParam,
@Body() broadcastDto: BroadcastDto,
): Promise<AnnouncementResponseDto> {
const metadata = await this.apiService.validateAssetsAndFetchMetadata(broadcastDto as AssetIncludedRequestDto);
return this.apiService.enqueueRequest(AnnouncementTypeDto.BROADCAST, userDsnpId.userDsnpId, broadcastDto, metadata);
}
Expand All @@ -41,7 +44,10 @@ export class ContentController {
@Post(':userDsnpId/reaction')
@ApiOperation({ summary: 'Create DSNP Reaction for User' })
@HttpCode(202)
async reaction(@Param() userDsnpId: DsnpUserIdParam, @Body() reactionDto: ReactionDto): Promise<AnnouncementResponseDto> {
async reaction(
@Param() userDsnpId: DsnpUserIdParam,
@Body() reactionDto: ReactionDto,
): Promise<AnnouncementResponseDto> {
return this.apiService.enqueueRequest(AnnouncementTypeDto.REACTION, userDsnpId.userDsnpId, reactionDto);
}

Expand All @@ -56,7 +62,10 @@ export class ContentController {
@Delete(':userDsnpId')
@ApiOperation({ summary: 'Delete DSNP Content for User' })
@HttpCode(202)
async delete(@Param() userDsnpId: DsnpUserIdParam, @Body() tombstoneDto: TombstoneDto): Promise<AnnouncementResponseDto> {
async delete(
@Param() userDsnpId: DsnpUserIdParam,
@Body() tombstoneDto: TombstoneDto,
): Promise<AnnouncementResponseDto> {
return this.apiService.enqueueRequest(AnnouncementTypeDto.TOMBSTONE, userDsnpId.userDsnpId, tombstoneDto);
}
}
Loading

0 comments on commit 221d8aa

Please sign in to comment.