From 221d8aa4eac7d50e16f643131aeede23f3ddf1ef Mon Sep 17 00:00:00 2001 From: Joe Caputo Date: Wed, 31 Jul 2024 12:47:41 -0400 Subject: [PATCH] fix: refactor bull queue config to common module in content-publishing service --- .../apps/api/src/api.module.ts | 86 +------- .../apps/api/src/api.service.ts | 40 ++-- .../{ => controllers}/health.controller.ts | 0 .../v1/asset.controller.v1.ts} | 18 +- .../v1/content.controller.v1.ts} | 33 ++-- .../v1/development.controller.v1.ts} | 95 ++++++--- .../apps/api/src/controllers/v1/index.ts | 4 + .../v1/profile.controller.v1.ts} | 17 +- .../content-publishing/apps/api/src/main.ts | 4 +- .../apps/api/src/metadata.ts | 116 +---------- .../apps/worker/src/BaseConsumer.ts | 2 +- .../asset_processor/asset.processor.module.ts | 41 +--- .../asset.processor.service.ts | 20 +- .../batch_announcer/batch.announcer.module.ts | 78 +------- .../batch.announcer.service.ts | 23 +-- .../batch_announcer/batch.announcer.spec.ts | 8 +- .../src/batch_announcer/batch.announcer.ts | 11 +- .../batching.processor.module.ts | 85 ++------ .../batching.processor.service.ts | 45 +++-- .../workers/broadcast.worker.ts | 10 +- .../src/batching_processor/workers/index.ts | 6 + .../workers/profile.worker.ts | 6 +- .../workers/reaction.worker.ts | 10 +- .../workers/reply.worker.ts | 6 +- .../workers/tombstone.worker.ts | 6 +- .../workers/update.worker.ts | 10 +- .../batch-announcer.job.interface.ts | 2 +- .../apps/worker/src/interfaces/index.ts | 3 + .../src/monitor/status.monitor.module.ts | 64 +----- .../src/monitor/tx.status.monitor.service.ts | 32 +-- .../worker/src/publisher/ipfs.publisher.ts | 20 +- .../worker/src/publisher/nonce.service.ts | 8 +- .../worker/src/publisher/publisher.module.ts | 76 +------ .../src/publisher/publishing.service.ts | 32 +-- .../dsnp.announcement.processor.spec.ts | 7 +- .../dsnp.announcement.processor.ts | 163 ++++++++++----- .../request.processor.module.ts | 68 +------ .../request.processor.service.ts | 15 +- .../apps/worker/src/worker.module.ts | 20 +- .../src/blockchain/blockchain-constants.ts | 2 +- .../src/blockchain/blockchain.module.ts | 7 +- .../src/blockchain/blockchain.service.ts | 186 +++++++++++------- .../libs/common/src/blockchain/extrinsic.ts | 7 +- .../libs/common/src/config/config.module.ts | 3 +- .../libs/common/src/config/config.service.ts | 6 +- .../libs/common/src/config/env.config.ts | 8 +- .../libs/common/src/config/index.ts | 2 + .../libs/common/src/dtos/activity.dto.ts | 2 +- .../libs/common/src/dtos/announcement.dto.ts | 2 +- .../libs/common/src/dtos/index.ts | 4 + .../libs/common/src/index.ts | 6 - .../libs/common/src/interfaces/index.ts | 5 + .../src/interfaces/request-job.interface.ts | 3 +- .../libs/common/src/queues/index.ts | 2 + .../queues.ts => queues/queue.constants.ts} | 2 +- .../libs/common/src/queues/queues.module.ts | 96 +++++++++ .../libs/common/src/utils/dsnp.schema.ts | 2 +- .../libs/common/src/utils/ipfs.client.ts | 17 +- .../libs/common/src/utils/processing.ts | 2 +- services/content-publishing/tsconfig.json | 45 ++++- 60 files changed, 778 insertions(+), 921 deletions(-) rename services/content-publishing/apps/api/src/{ => controllers}/health.controller.ts (100%) rename services/content-publishing/apps/api/src/{asset.controller.ts => controllers/v1/asset.controller.v1.ts} (73%) rename services/content-publishing/apps/api/src/{content.controller.ts => controllers/v1/content.controller.v1.ts} (79%) rename services/content-publishing/apps/api/src/{development.controller.ts => controllers/v1/development.controller.v1.ts} (52%) create mode 100644 services/content-publishing/apps/api/src/controllers/v1/index.ts rename services/content-publishing/apps/api/src/{profile.controller.ts => controllers/v1/profile.controller.v1.ts} (64%) create mode 100644 services/content-publishing/apps/worker/src/batching_processor/workers/index.ts create mode 100644 services/content-publishing/apps/worker/src/interfaces/index.ts create mode 100644 services/content-publishing/libs/common/src/config/index.ts create mode 100644 services/content-publishing/libs/common/src/dtos/index.ts delete mode 100644 services/content-publishing/libs/common/src/index.ts create mode 100644 services/content-publishing/libs/common/src/interfaces/index.ts create mode 100644 services/content-publishing/libs/common/src/queues/index.ts rename services/content-publishing/libs/common/src/{utils/queues.ts => queues/queue.constants.ts} (97%) create mode 100644 services/content-publishing/libs/common/src/queues/queues.module.ts diff --git a/services/content-publishing/apps/api/src/api.module.ts b/services/content-publishing/apps/api/src/api.module.ts index 2e878202..3df00f97 100644 --- a/services/content-publishing/apps/api/src/api.module.ts +++ b/services/content-publishing/apps/api/src/api.module.ts @@ -1,22 +1,18 @@ import { Module } from '@nestjs/common'; import { EventEmitterModule } from '@nestjs/event-emitter'; -import { BullModule } from '@nestjs/bullmq'; import { ScheduleModule } from '@nestjs/schedule'; import { RedisModule } from '@songkeys/nestjs-redis'; import { BullBoardModule } from '@bull-board/nestjs'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; import { ExpressAdapter } from '@bull-board/express'; import { MulterModule } from '@nestjs/platform-express'; -import { DevelopmentController } from './development.controller'; -import * as QueueConstants from '../../../libs/common/src'; +import { DevelopmentControllerV1 } from './controllers/v1/development.controller.v1'; +import { ConfigModule, ConfigService } from '#libs/config'; +import { QueuesModule, QueueConstants } from '#libs/queues'; +import { IpfsService } from '#libs/utils/ipfs.client'; import { ApiService } from './api.service'; -import { IpfsService } from '../../../libs/common/src/utils/ipfs.client'; -import { ConfigModule } from '../../../libs/common/src/config/config.module'; -import { ConfigService } from '../../../libs/common/src/config/config.service'; -import { AssetController } from './asset.controller'; -import { ContentController } from './content.controller'; -import { HealthController } from './health.controller'; -import { ProfileController } from './profile.controller'; +import { HealthController } from './controllers/health.controller'; +import { AssetControllerV1, ContentControllerV1, ProfileControllerV1 } from './controllers/v1'; @Module({ imports: [ @@ -31,68 +27,7 @@ import { ProfileController } from './profile.controller'; }, true, // isGlobal ), - BullModule.forRootAsync({ - imports: [ConfigModule], - useFactory: (configService: ConfigService) => { - // Note: BullMQ doesn't honor a URL for the Redis connection, and - // JS URL doesn't parse 'redis://' as a valid protocol, so we fool - // it by changing the URL to use 'http://' in order to parse out - // the host, port, username, password, etc. - // We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but - // trying to keep the # of environment variables from proliferating - const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http')); - const { hostname, port, username, password, pathname } = url; - return { - connection: { - host: hostname || undefined, - port: port ? Number(port) : undefined, - username: username || undefined, - password: password || undefined, - db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined, - }, - }; - }, - inject: [ConfigService], - }), - BullModule.registerQueue( - { - name: QueueConstants.ASSET_QUEUE_NAME, - }, - { - name: QueueConstants.REQUEST_QUEUE_NAME, - }, - { - name: QueueConstants.BROADCAST_QUEUE_NAME, - }, - { - name: QueueConstants.REPLY_QUEUE_NAME, - }, - { - name: QueueConstants.REACTION_QUEUE_NAME, - }, - { - name: QueueConstants.TOMBSTONE_QUEUE_NAME, - }, - { - name: QueueConstants.UPDATE_QUEUE_NAME, - }, - { - name: QueueConstants.PROFILE_QUEUE_NAME, - }, - { - name: QueueConstants.BATCH_QUEUE_NAME, - }, - { - name: QueueConstants.PUBLISH_QUEUE_NAME, - }, - { - name: QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME, - }, - { - name: QueueConstants.STATUS_QUEUE_NAME, - }, - ), - + QueuesModule, // Bullboard UI BullBoardModule.forRoot({ route: '/queues', @@ -166,7 +101,6 @@ import { ProfileController } from './profile.controller'; }), ScheduleModule.forRoot(), MulterModule.registerAsync({ - imports: [ConfigModule], useFactory: async (configService: ConfigService) => ({ limits: { fileSize: configService.fileUploadMaxSizeInBytes, @@ -175,13 +109,13 @@ import { ProfileController } from './profile.controller'; inject: [ConfigService], }), ], - providers: [ConfigService, ApiService, IpfsService], + providers: [ApiService, IpfsService], // Controller order determines the order of display for docs // v[Desc first][ABC Second], Health, and then Dev only last controllers: process.env?.ENVIRONMENT === 'dev' - ? [AssetController, ContentController, ProfileController, HealthController, DevelopmentController] - : [AssetController, ContentController, ProfileController, HealthController], + ? [AssetControllerV1, ContentControllerV1, ProfileControllerV1, HealthController, DevelopmentControllerV1] + : [AssetControllerV1, ContentControllerV1, ProfileControllerV1, HealthController], exports: [], }) export class ApiModule {} diff --git a/services/content-publishing/apps/api/src/api.service.ts b/services/content-publishing/apps/api/src/api.service.ts index 3229e918..46868078 100644 --- a/services/content-publishing/apps/api/src/api.service.ts +++ b/services/content-publishing/apps/api/src/api.service.ts @@ -7,19 +7,17 @@ import { InjectRedis } from '@songkeys/nestjs-redis'; import Redis from 'ioredis'; import { HttpErrorByCode } from '@nestjs/common/utils/http-error-by-code.util'; import { - AnnouncementResponseDto, AnnouncementTypeDto, - ASSET_QUEUE_NAME, + RequestTypeDto, + AnnouncementResponseDto, AssetIncludedRequestDto, - IRequestJob, isImage, - REQUEST_QUEUE_NAME, - RequestTypeDto, UploadResponseDto, -} from '../../../libs/common/src'; -import { calculateIpfsCID } from '../../../libs/common/src/utils/ipfs'; -import { IAssetJob, IAssetMetadata } from '../../../libs/common/src/interfaces/asset-job.interface'; -import { getAssetDataKey, getAssetMetadataKey, STORAGE_EXPIRE_UPPER_LIMIT_SECONDS } from '../../../libs/common/src/utils/redis'; +} from '#libs/dtos'; +import { IRequestJob, IAssetMetadata, IAssetJob } from '#libs/interfaces'; +import { REQUEST_QUEUE_NAME, ASSET_QUEUE_NAME } from '#libs/queues/queue.constants'; +import { calculateIpfsCID } from '#libs/utils/ipfs'; +import { getAssetMetadataKey, getAssetDataKey, STORAGE_EXPIRE_UPPER_LIMIT_SECONDS } from '#libs/utils/redis'; @Injectable() export class ApiService { @@ -51,7 +49,11 @@ export class ApiService { // not used in id calculation since the order in map might not be deterministic data.assetToMimeType = assetToMimeType; } - const job = await this.requestQueue.add(`Request Job - ${data.id}`, data, { jobId: data.id, removeOnFail: false, removeOnComplete: 2000 }); // TODO: should come from queue configs + const job = await this.requestQueue.add(`Request Job - ${data.id}`, data, { + jobId: data.id, + removeOnFail: false, + removeOnComplete: 2000, + }); // TODO: should come from queue configs this.logger.debug('Enqueue Request Job: ', job); return { referenceId: data.id, @@ -61,7 +63,9 @@ export class ApiService { async validateAssetsAndFetchMetadata(content: AssetIncludedRequestDto): Promise | undefined> { const checkingList: { onlyImage: boolean; referenceId: string }[] = []; if (content.profile) { - content.profile.icon?.forEach((reference) => checkingList.push({ onlyImage: true, referenceId: reference.referenceId })); + content.profile.icon?.forEach((reference) => + checkingList.push({ onlyImage: true, referenceId: reference.referenceId }), + ); } else if (content.content) { content.content.assets?.forEach((asset) => asset.references?.forEach((reference) => @@ -73,12 +77,16 @@ export class ApiService { ); } - const redisResults = await Promise.all(checkingList.map((obj) => this.redis.get(getAssetMetadataKey(obj.referenceId)))); + const redisResults = await Promise.all( + checkingList.map((obj) => this.redis.get(getAssetMetadataKey(obj.referenceId))), + ); const errors: string[] = []; const map = new Map(); redisResults.forEach((res, index) => { if (res === null) { - errors.push(`${content.profile ? 'profile.icon' : 'content.assets'}.referenceId ${checkingList[index].referenceId} does not exist!`); + errors.push( + `${content.profile ? 'profile.icon' : 'content.assets'}.referenceId ${checkingList[index].referenceId} does not exist!`, + ); } else { const metadata: IAssetMetadata = JSON.parse(res); map[checkingList[index].referenceId] = metadata.mimeType; @@ -106,7 +114,11 @@ export class ApiService { const jobs: any[] = []; files.forEach((f, index) => { // adding data and metadata to the transaction - dataTransaction = dataTransaction.setex(getAssetDataKey(references[index]), STORAGE_EXPIRE_UPPER_LIMIT_SECONDS, f.buffer); + dataTransaction = dataTransaction.setex( + getAssetDataKey(references[index]), + STORAGE_EXPIRE_UPPER_LIMIT_SECONDS, + f.buffer, + ); metadataTransaction = metadataTransaction.setex( getAssetMetadataKey(references[index]), STORAGE_EXPIRE_UPPER_LIMIT_SECONDS, diff --git a/services/content-publishing/apps/api/src/health.controller.ts b/services/content-publishing/apps/api/src/controllers/health.controller.ts similarity index 100% rename from services/content-publishing/apps/api/src/health.controller.ts rename to services/content-publishing/apps/api/src/controllers/health.controller.ts diff --git a/services/content-publishing/apps/api/src/asset.controller.ts b/services/content-publishing/apps/api/src/controllers/v1/asset.controller.v1.ts similarity index 73% rename from services/content-publishing/apps/api/src/asset.controller.ts rename to services/content-publishing/apps/api/src/controllers/v1/asset.controller.v1.ts index 3b9c9d4f..79ecc3a3 100644 --- a/services/content-publishing/apps/api/src/asset.controller.ts +++ b/services/content-publishing/apps/api/src/controllers/v1/asset.controller.v1.ts @@ -1,12 +1,22 @@ -import { Controller, HttpCode, HttpStatus, Logger, ParseFilePipeBuilder, Put, UploadedFiles, UseInterceptors } from '@nestjs/common'; +import { FilesUploadDto, UploadResponseDto } from '#libs/dtos/common.dto'; +import { DSNP_VALID_MIME_TYPES } from '#libs/dtos/validation.dto'; +import { + Controller, + HttpCode, + HttpStatus, + Logger, + ParseFilePipeBuilder, + Put, + UploadedFiles, + UseInterceptors, +} from '@nestjs/common'; import { FilesInterceptor } from '@nestjs/platform-express'; import { ApiBody, ApiConsumes, ApiOperation, ApiTags } from '@nestjs/swagger'; -import { ApiService } from './api.service'; -import { DSNP_VALID_MIME_TYPES, FilesUploadDto, UploadResponseDto } from '../../../libs/common/src'; +import { ApiService } from '../../api.service'; @Controller('v1/asset') @ApiTags('v1/asset') -export class AssetController { +export class AssetControllerV1 { private readonly logger: Logger; constructor(private apiService: ApiService) { diff --git a/services/content-publishing/apps/api/src/content.controller.ts b/services/content-publishing/apps/api/src/controllers/v1/content.controller.v1.ts similarity index 79% rename from services/content-publishing/apps/api/src/content.controller.ts rename to services/content-publishing/apps/api/src/controllers/v1/content.controller.v1.ts index a6a952dd..a60c55e3 100644 --- a/services/content-publishing/apps/api/src/content.controller.ts +++ b/services/content-publishing/apps/api/src/controllers/v1/content.controller.v1.ts @@ -1,21 +1,21 @@ -import { Body, Controller, Delete, HttpCode, Logger, Param, Post, Put, UploadedFiles, UseInterceptors } from '@nestjs/common'; +import { Body, Controller, Delete, HttpCode, Logger, Param, Post, Put } from '@nestjs/common'; import { ApiOperation, ApiTags } from '@nestjs/swagger'; -import { ApiService } from './api.service'; +import { ApiService } from '../../api.service'; import { + DsnpUserIdParam, + BroadcastDto, AnnouncementResponseDto, - AnnouncementTypeDto, AssetIncludedRequestDto, - BroadcastDto, - DsnpUserIdParam, - ReactionDto, + AnnouncementTypeDto, ReplyDto, - TombstoneDto, + ReactionDto, UpdateDto, -} from '../../../libs/common/src'; + TombstoneDto, +} from '#libs/dtos'; @Controller('v1/content') @ApiTags('v1/content') -export class ContentController { +export class ContentControllerV1 { private readonly logger: Logger; constructor(private apiService: ApiService) { @@ -25,7 +25,10 @@ export class ContentController { @Post(':userDsnpId/broadcast') @ApiOperation({ summary: 'Create DSNP Broadcast for User' }) @HttpCode(202) - async broadcast(@Param() userDsnpId: DsnpUserIdParam, @Body() broadcastDto: BroadcastDto): Promise { + async broadcast( + @Param() userDsnpId: DsnpUserIdParam, + @Body() broadcastDto: BroadcastDto, + ): Promise { const metadata = await this.apiService.validateAssetsAndFetchMetadata(broadcastDto as AssetIncludedRequestDto); return this.apiService.enqueueRequest(AnnouncementTypeDto.BROADCAST, userDsnpId.userDsnpId, broadcastDto, metadata); } @@ -41,7 +44,10 @@ export class ContentController { @Post(':userDsnpId/reaction') @ApiOperation({ summary: 'Create DSNP Reaction for User' }) @HttpCode(202) - async reaction(@Param() userDsnpId: DsnpUserIdParam, @Body() reactionDto: ReactionDto): Promise { + async reaction( + @Param() userDsnpId: DsnpUserIdParam, + @Body() reactionDto: ReactionDto, + ): Promise { return this.apiService.enqueueRequest(AnnouncementTypeDto.REACTION, userDsnpId.userDsnpId, reactionDto); } @@ -56,7 +62,10 @@ export class ContentController { @Delete(':userDsnpId') @ApiOperation({ summary: 'Delete DSNP Content for User' }) @HttpCode(202) - async delete(@Param() userDsnpId: DsnpUserIdParam, @Body() tombstoneDto: TombstoneDto): Promise { + async delete( + @Param() userDsnpId: DsnpUserIdParam, + @Body() tombstoneDto: TombstoneDto, + ): Promise { return this.apiService.enqueueRequest(AnnouncementTypeDto.TOMBSTONE, userDsnpId.userDsnpId, tombstoneDto); } } diff --git a/services/content-publishing/apps/api/src/development.controller.ts b/services/content-publishing/apps/api/src/controllers/v1/development.controller.v1.ts similarity index 52% rename from services/content-publishing/apps/api/src/development.controller.ts rename to services/content-publishing/apps/api/src/controllers/v1/development.controller.v1.ts index eeb96ec8..306e6a1c 100644 --- a/services/content-publishing/apps/api/src/development.controller.ts +++ b/services/content-publishing/apps/api/src/controllers/v1/development.controller.v1.ts @@ -7,37 +7,52 @@ import { Controller, Get, Logger, NotFoundException, Param, Post } from '@nestjs import { InjectQueue } from '@nestjs/bullmq'; import { Queue } from 'bullmq'; import { Job } from 'bullmq/dist/esm/classes/job'; -import * as QueueConstants from '../../../libs/common/src'; -import { IpfsService } from '../../../libs/common/src/utils/ipfs.client'; -import { AnnouncementType, createBroadcast, createProfile, createReaction, createReply, createTombstone, createUpdate } from '../../../libs/common/src/interfaces/dsnp'; -import { calculateDsnpHash } from '../../../libs/common/src/utils/ipfs'; import { ApiOperation, ApiTags } from '@nestjs/swagger'; +import { + createBroadcast, + createProfile, + createUpdate, + AnnouncementType, + createReply, + createReaction, + createTombstone, + BroadcastAnnouncement, + ProfileAnnouncement, + UpdateAnnouncement, + ReplyAnnouncement, + ReactionAnnouncement, + TombstoneAnnouncement, +} from '#libs/interfaces'; +import { QueueConstants } from '#libs/queues'; +import { calculateDsnpHash } from '#libs/utils/ipfs'; +import { IpfsService } from '#libs/utils/ipfs.client'; +import { AnnouncementTypeDto } from '#libs/dtos'; @Controller('dev') @ApiTags('dev') -export class DevelopmentController { +export class DevelopmentControllerV1 { private readonly logger: Logger; - private readonly queueMapper: Map; + private readonly queueMapper: Map; constructor( @InjectQueue(QueueConstants.REQUEST_QUEUE_NAME) private requestQueue: Queue, - @InjectQueue(QueueConstants.BROADCAST_QUEUE_NAME) private broadcastQueue: Queue, - @InjectQueue(QueueConstants.REPLY_QUEUE_NAME) private replyQueue: Queue, - @InjectQueue(QueueConstants.REACTION_QUEUE_NAME) private reactionQueue: Queue, - @InjectQueue(QueueConstants.UPDATE_QUEUE_NAME) private updateQueue: Queue, - @InjectQueue(QueueConstants.PROFILE_QUEUE_NAME) private profileQueue: Queue, - @InjectQueue(QueueConstants.TOMBSTONE_QUEUE_NAME) private tombstoneQueue: Queue, + @InjectQueue(QueueConstants.BROADCAST_QUEUE_NAME) broadcastQueue: Queue, + @InjectQueue(QueueConstants.REPLY_QUEUE_NAME) replyQueue: Queue, + @InjectQueue(QueueConstants.REACTION_QUEUE_NAME) reactionQueue: Queue, + @InjectQueue(QueueConstants.UPDATE_QUEUE_NAME) updateQueue: Queue, + @InjectQueue(QueueConstants.PROFILE_QUEUE_NAME) profileQueue: Queue, + @InjectQueue(QueueConstants.TOMBSTONE_QUEUE_NAME) tombstoneQueue: Queue, private ipfsService: IpfsService, ) { this.logger = new Logger(this.constructor.name); this.queueMapper = new Map([ - [QueueConstants.AnnouncementTypeDto.BROADCAST, broadcastQueue], - [QueueConstants.AnnouncementTypeDto.REPLY, replyQueue], - [QueueConstants.AnnouncementTypeDto.REACTION, reactionQueue], - [QueueConstants.AnnouncementTypeDto.UPDATE, updateQueue], - [QueueConstants.AnnouncementTypeDto.PROFILE, profileQueue], - [QueueConstants.AnnouncementTypeDto.TOMBSTONE, tombstoneQueue], + [AnnouncementTypeDto.BROADCAST, broadcastQueue], + [AnnouncementTypeDto.REPLY, replyQueue], + [AnnouncementTypeDto.REACTION, reactionQueue], + [AnnouncementTypeDto.UPDATE, updateQueue], + [AnnouncementTypeDto.PROFILE, profileQueue], + [AnnouncementTypeDto.TOMBSTONE, tombstoneQueue], ]); } @@ -62,25 +77,37 @@ export class DevelopmentController { @Post('/dummy/announcement/:queueType/:count') @ApiOperation({ summary: 'Create dummy announcement data', description: 'ONLY enabled when ENVIRONMENT="dev".' }) - async populate(@Param('queueType') queueType: QueueConstants.AnnouncementTypeDto, @Param('count') count: number) { + async populate(@Param('queueType') queueType: AnnouncementTypeDto, @Param('count') count: number) { const promises: Promise[] = []; // eslint-disable-next-line no-plusplus for (let i = 0; i < count; i++) { - let data: any; + let data: + | BroadcastAnnouncement + | ProfileAnnouncement + | UpdateAnnouncement + | ReplyAnnouncement + | ReactionAnnouncement + | TombstoneAnnouncement; // eslint-disable-next-line default-case const fromId = `${Math.floor(Math.random() * 100000000)}`; const hash = `${Math.floor(Math.random() * 100000000)}`; switch (queueType) { - case QueueConstants.AnnouncementTypeDto.BROADCAST: + case AnnouncementTypeDto.BROADCAST: data = createBroadcast(fromId, `https://example.com/${Math.floor(Math.random() * 100000000)}`, hash); break; - case QueueConstants.AnnouncementTypeDto.PROFILE: + case AnnouncementTypeDto.PROFILE: data = createProfile(fromId, `https://example.com/${Math.floor(Math.random() * 100000000)}`, hash); break; - case QueueConstants.AnnouncementTypeDto.UPDATE: - data = createUpdate(fromId, `https://example.com/${Math.floor(Math.random() * 100000000)}`, hash, AnnouncementType.Broadcast, `${Math.floor(Math.random() * 100000000)}`); + case AnnouncementTypeDto.UPDATE: + data = createUpdate( + fromId, + `https://example.com/${Math.floor(Math.random() * 100000000)}`, + hash, + AnnouncementType.Broadcast, + `${Math.floor(Math.random() * 100000000)}`, + ); break; - case QueueConstants.AnnouncementTypeDto.REPLY: + case AnnouncementTypeDto.REPLY: data = createReply( fromId, `https://example.com/${Math.floor(Math.random() * 100000000)}`, @@ -88,10 +115,15 @@ export class DevelopmentController { `dsnp://0x${Math.floor(Math.random() * 100000000)}/0x${Math.floor(Math.random() * 100000000)}`, ); break; - case QueueConstants.AnnouncementTypeDto.REACTION: - data = createReaction(fromId, '🤌🏼', `dsnp://0x${Math.floor(Math.random() * 100000000)}/0x${Math.floor(Math.random() * 100000000)}`, 1); + case AnnouncementTypeDto.REACTION: + data = createReaction( + fromId, + '🤌🏼', + `dsnp://0x${Math.floor(Math.random() * 100000000)}/0x${Math.floor(Math.random() * 100000000)}`, + 1, + ); break; - case QueueConstants.AnnouncementTypeDto.TOMBSTONE: + case AnnouncementTypeDto.TOMBSTONE: data = createTombstone(fromId, AnnouncementType.Reply, hash); break; default: @@ -99,7 +131,12 @@ export class DevelopmentController { } // eslint-disable-next-line no-await-in-loop const jobId = await calculateDsnpHash(Buffer.from(JSON.stringify(data))); - promises.push(this.queueMapper.get(queueType)!.add(`Dummy Job - ${data.id}`, data, { jobId, removeOnFail: false, removeOnComplete: true })); + const queue = this.queueMapper.get(queueType); + if (queue) { + promises.push( + queue.add(`Dummy Job - ${data.id}`, data, { jobId, removeOnFail: false, removeOnComplete: true }), + ); + } } await Promise.all(promises); } diff --git a/services/content-publishing/apps/api/src/controllers/v1/index.ts b/services/content-publishing/apps/api/src/controllers/v1/index.ts new file mode 100644 index 00000000..3d41da51 --- /dev/null +++ b/services/content-publishing/apps/api/src/controllers/v1/index.ts @@ -0,0 +1,4 @@ +export * from './asset.controller.v1'; +export * from './content.controller.v1'; +export * from './development.controller.v1'; +export * from './profile.controller.v1'; diff --git a/services/content-publishing/apps/api/src/profile.controller.ts b/services/content-publishing/apps/api/src/controllers/v1/profile.controller.v1.ts similarity index 64% rename from services/content-publishing/apps/api/src/profile.controller.ts rename to services/content-publishing/apps/api/src/controllers/v1/profile.controller.v1.ts index dbdcdb09..b94e20d9 100644 --- a/services/content-publishing/apps/api/src/profile.controller.ts +++ b/services/content-publishing/apps/api/src/controllers/v1/profile.controller.v1.ts @@ -1,11 +1,17 @@ import { Body, Controller, HttpCode, Logger, Param, Put } from '@nestjs/common'; import { ApiOperation, ApiTags } from '@nestjs/swagger'; -import { ApiService } from './api.service'; -import { AnnouncementResponseDto, AnnouncementTypeDto, AssetIncludedRequestDto, DsnpUserIdParam, ProfileDto } from '../../../libs/common/src'; +import { ApiService } from '../../api.service'; +import { + DsnpUserIdParam, + ProfileDto, + AnnouncementResponseDto, + AssetIncludedRequestDto, + AnnouncementTypeDto, +} from '#libs/dtos'; @Controller('v1/profile') @ApiTags('v1/profile') -export class ProfileController { +export class ProfileControllerV1 { private readonly logger: Logger; constructor(private apiService: ApiService) { @@ -15,7 +21,10 @@ export class ProfileController { @Put(':userDsnpId') @ApiOperation({ summary: "Update a user's Profile" }) @HttpCode(202) - async profile(@Param() userDsnpId: DsnpUserIdParam, @Body() profileDto: ProfileDto): Promise { + async profile( + @Param() userDsnpId: DsnpUserIdParam, + @Body() profileDto: ProfileDto, + ): Promise { const metadata = await this.apiService.validateAssetsAndFetchMetadata(profileDto as AssetIncludedRequestDto); return this.apiService.enqueueRequest(AnnouncementTypeDto.PROFILE, userDsnpId.userDsnpId, profileDto, metadata); } diff --git a/services/content-publishing/apps/api/src/main.ts b/services/content-publishing/apps/api/src/main.ts index cc6d44fa..94880524 100644 --- a/services/content-publishing/apps/api/src/main.ts +++ b/services/content-publishing/apps/api/src/main.ts @@ -2,8 +2,8 @@ import { NestFactory } from '@nestjs/core'; import { Logger, ValidationPipe } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { ApiModule } from './api.module'; -import { initSwagger } from '../../../libs/common/src/config/swagger_config'; -import { ConfigService } from '../../../libs/common/src/config/config.service'; +import { ConfigService } from '#libs/config'; +import { initSwagger } from '#libs/config/swagger_config'; const logger = new Logger('main'); diff --git a/services/content-publishing/apps/api/src/metadata.ts b/services/content-publishing/apps/api/src/metadata.ts index 8630307e..bf1f6e63 100644 --- a/services/content-publishing/apps/api/src/metadata.ts +++ b/services/content-publishing/apps/api/src/metadata.ts @@ -1,112 +1,8 @@ /* eslint-disable */ export default async () => { - const t = { - ['../../../libs/common/src/dtos/activity.dto']: await import('../../../libs/common/src/dtos/activity.dto'), - ['../../../libs/common/src/dtos/announcement.dto']: await import('../../../libs/common/src/dtos/announcement.dto'), - ['../../../libs/common/src/dtos/common.dto']: await import('../../../libs/common/src/dtos/common.dto'), - }; - return { - '@nestjs/swagger': { - models: [ - [ - import('../../../libs/common/src/dtos/common.dto'), - { - DsnpUserIdParam: { userDsnpId: { required: true, type: () => String } }, - AnnouncementResponseDto: { referenceId: { required: true, type: () => String } }, - UploadResponseDto: { assetIds: { required: true, type: () => [String] } }, - FilesUploadDto: { files: { required: true, type: () => [Object] } }, - }, - ], - [ - import('../../../libs/common/src/dtos/activity.dto'), - { - LocationDto: { - name: { required: true, type: () => String, minLength: 1 }, - accuracy: { required: false, type: () => Number, minimum: 0, maximum: 100 }, - altitude: { required: false, type: () => Number }, - latitude: { required: false, type: () => Number }, - longitude: { required: false, type: () => Number }, - radius: { required: false, type: () => Number, minimum: 0 }, - units: { required: false, enum: t['../../../libs/common/src/dtos/activity.dto'].UnitTypeDto }, - }, - AssetReferenceDto: { - referenceId: { required: true, type: () => String, minLength: 1 }, - height: { required: false, type: () => Number, minimum: 1 }, - width: { required: false, type: () => Number, minimum: 1 }, - duration: { required: false, type: () => String, pattern: 'DURATION_REGEX' }, - }, - TagDto: { - type: { required: true, enum: t['../../../libs/common/src/dtos/activity.dto'].TagTypeDto }, - name: { required: false, type: () => String, minLength: 1 }, - mentionedId: { required: false, type: () => String, minLength: 1, pattern: 'DSNP_USER_URI_REGEX' }, - }, - AssetDto: { - type: { required: true, enum: t['../../../libs/common/src/dtos/activity.dto'].AttachmentTypeDto }, - references: { required: false, type: () => [t['../../../libs/common/src/dtos/activity.dto'].AssetReferenceDto] }, - name: { required: false, type: () => String, minLength: 1 }, - href: { required: false, type: () => String, minLength: 1 }, - }, - BaseActivityDto: { - name: { required: false, type: () => String }, - tag: { required: false, type: () => [t['../../../libs/common/src/dtos/activity.dto'].TagDto] }, - location: { required: false, type: () => t['../../../libs/common/src/dtos/activity.dto'].LocationDto }, - }, - NoteActivityDto: { - content: { required: true, type: () => String, minLength: 1 }, - published: { required: true, type: () => String, pattern: 'ISO8601_REGEX' }, - assets: { required: false, type: () => [t['../../../libs/common/src/dtos/activity.dto'].AssetDto] }, - }, - ProfileActivityDto: { - icon: { required: false, type: () => [t['../../../libs/common/src/dtos/activity.dto'].AssetReferenceDto] }, - summary: { required: false, type: () => String }, - published: { required: false, type: () => String, pattern: 'ISO8601_REGEX' }, - }, - }, - ], - [ - import('../../../libs/common/src/dtos/announcement.dto'), - { - BroadcastDto: { content: { required: true, type: () => t['../../../libs/common/src/dtos/activity.dto'].NoteActivityDto } }, - ReplyDto: { - inReplyTo: { required: true, type: () => String, pattern: 'DSNP_CONTENT_URI_REGEX' }, - content: { required: true, type: () => t['../../../libs/common/src/dtos/activity.dto'].NoteActivityDto }, - }, - TombstoneDto: { - targetContentHash: { required: true, type: () => String, pattern: 'DSNP_CONTENT_HASH_REGEX' }, - targetAnnouncementType: { required: true, enum: t['../../../libs/common/src/dtos/announcement.dto'].ModifiableAnnouncementTypeDto }, - }, - UpdateDto: { - targetContentHash: { required: true, type: () => String, pattern: 'DSNP_CONTENT_HASH_REGEX' }, - targetAnnouncementType: { required: true, enum: t['../../../libs/common/src/dtos/announcement.dto'].ModifiableAnnouncementTypeDto }, - content: { required: true, type: () => t['../../../libs/common/src/dtos/activity.dto'].NoteActivityDto }, - }, - ReactionDto: { - emoji: { required: true, type: () => String, minLength: 1, pattern: 'DSNP_EMOJI_REGEX' }, - apply: { required: true, type: () => Number, minimum: 0, maximum: 255 }, - inReplyTo: { required: true, type: () => String, pattern: 'DSNP_CONTENT_URI_REGEX' }, - }, - ProfileDto: { profile: { required: true, type: () => t['../../../libs/common/src/dtos/activity.dto'].ProfileActivityDto } }, - }, - ], - ], - controllers: [ - [import('./development.controller'), { DevelopmentController: { requestJob: {}, getAsset: { type: Object }, populate: {} } }], - [import('./asset.controller'), { AssetController: { assetUpload: { type: t['../../../libs/common/src/dtos/common.dto'].UploadResponseDto } } }], - [ - import('./content.controller'), - { - ContentController: { - broadcast: { type: t['../../../libs/common/src/dtos/common.dto'].AnnouncementResponseDto }, - reply: { type: t['../../../libs/common/src/dtos/common.dto'].AnnouncementResponseDto }, - reaction: { type: t['../../../libs/common/src/dtos/common.dto'].AnnouncementResponseDto }, - update: { type: t['../../../libs/common/src/dtos/common.dto'].AnnouncementResponseDto }, - delete: { type: t['../../../libs/common/src/dtos/common.dto'].AnnouncementResponseDto }, - }, - }, - ], - [import('./health.controller'), { HealthController: { healthz: {}, livez: {}, readyz: {} } }], - [import('./profile.controller'), { ProfileController: { profile: { type: t['../../../libs/common/src/dtos/common.dto'].AnnouncementResponseDto } } }], - ], - }, - }; -}; + const t = { + ["../../../libs/common/src/dtos/activity.dto"]: await import("../../../libs/common/src/dtos/activity.dto"), + ["../../../libs/common/src/dtos/announcement.dto"]: await import("../../../libs/common/src/dtos/announcement.dto") + }; + return { "@nestjs/swagger": { "models": [[import("../../../libs/common/src/dtos/common.dto"), { "DsnpUserIdParam": { userDsnpId: { required: true, type: () => String } }, "AnnouncementResponseDto": { referenceId: { required: true, type: () => String } }, "UploadResponseDto": { assetIds: { required: true, type: () => [String] } }, "FilesUploadDto": { files: { required: true, type: () => [Object] } } }], [import("../../../libs/common/src/dtos/activity.dto"), { "LocationDto": { name: { required: true, type: () => String, minLength: 1 }, accuracy: { required: false, type: () => Number, minimum: 0, maximum: 100 }, altitude: { required: false, type: () => Number }, latitude: { required: false, type: () => Number }, longitude: { required: false, type: () => Number }, radius: { required: false, type: () => Number, minimum: 0 }, units: { required: false, enum: t["../../../libs/common/src/dtos/activity.dto"].UnitTypeDto } }, "AssetReferenceDto": { referenceId: { required: true, type: () => String, minLength: 1 }, height: { required: false, type: () => Number, minimum: 1 }, width: { required: false, type: () => Number, minimum: 1 }, duration: { required: false, type: () => String, pattern: "DURATION_REGEX" } }, "TagDto": { type: { required: true, enum: t["../../../libs/common/src/dtos/activity.dto"].TagTypeDto }, name: { required: false, type: () => String, minLength: 1 }, mentionedId: { required: false, type: () => String } }, "AssetDto": { type: { required: true, enum: t["../../../libs/common/src/dtos/activity.dto"].AttachmentTypeDto }, references: { required: false, type: () => [t["../../../libs/common/src/dtos/activity.dto"].AssetReferenceDto] }, name: { required: false, type: () => String, minLength: 1 }, href: { required: false, type: () => String, minLength: 1 } }, "BaseActivityDto": { name: { required: false, type: () => String }, tag: { required: false, type: () => [t["../../../libs/common/src/dtos/activity.dto"].TagDto] }, location: { required: false, type: () => t["../../../libs/common/src/dtos/activity.dto"].LocationDto } }, "NoteActivityDto": { content: { required: true, type: () => String, minLength: 1 }, published: { required: true, type: () => String }, assets: { required: false, type: () => [t["../../../libs/common/src/dtos/activity.dto"].AssetDto] } }, "ProfileActivityDto": { icon: { required: false, type: () => [t["../../../libs/common/src/dtos/activity.dto"].AssetReferenceDto] }, summary: { required: false, type: () => String }, published: { required: false, type: () => String } } }], [import("../../../libs/common/src/dtos/announcement.dto"), { "BroadcastDto": { content: { required: true, type: () => t["../../../libs/common/src/dtos/activity.dto"].NoteActivityDto } }, "ReplyDto": { inReplyTo: { required: true, type: () => String }, content: { required: true, type: () => t["../../../libs/common/src/dtos/activity.dto"].NoteActivityDto } }, "TombstoneDto": { targetContentHash: { required: true, type: () => String }, targetAnnouncementType: { required: true, enum: t["../../../libs/common/src/dtos/announcement.dto"].ModifiableAnnouncementTypeDto } }, "UpdateDto": { targetContentHash: { required: true, type: () => String }, targetAnnouncementType: { required: true, enum: t["../../../libs/common/src/dtos/announcement.dto"].ModifiableAnnouncementTypeDto }, content: { required: true, type: () => t["../../../libs/common/src/dtos/activity.dto"].NoteActivityDto } }, "ReactionDto": { emoji: { required: true, type: () => String, minLength: 1, pattern: "DSNP_EMOJI_REGEX" }, apply: { required: true, type: () => Number, minimum: 0, maximum: 255 }, inReplyTo: { required: true, type: () => String } }, "ProfileDto": { profile: { required: true, type: () => t["../../../libs/common/src/dtos/activity.dto"].ProfileActivityDto } } }]], "controllers": [[import("./controllers/health.controller"), { "HealthController": { "healthz": {}, "livez": {}, "readyz": {} } }]] } }; +}; \ No newline at end of file diff --git a/services/content-publishing/apps/worker/src/BaseConsumer.ts b/services/content-publishing/apps/worker/src/BaseConsumer.ts index c1852148..a9bcefbc 100644 --- a/services/content-publishing/apps/worker/src/BaseConsumer.ts +++ b/services/content-publishing/apps/worker/src/BaseConsumer.ts @@ -1,7 +1,7 @@ +import { MAX_WAIT_FOR_GRACE_FULL_SHUTDOWN_MS, DELAY_TO_CHECK_FOR_SHUTDOWN_MS } from '#libs/utils/processing'; import { OnWorkerEvent, WorkerHost } from '@nestjs/bullmq'; import { Logger, OnModuleDestroy } from '@nestjs/common'; import { Job, Worker, delay } from 'bullmq'; -import { DELAY_TO_CHECK_FOR_SHUTDOWN_MS, MAX_WAIT_FOR_GRACE_FULL_SHUTDOWN_MS } from '../../../libs/common/src/utils/processing'; export abstract class BaseConsumer extends WorkerHost implements OnModuleDestroy { protected logger: Logger; diff --git a/services/content-publishing/apps/worker/src/asset_processor/asset.processor.module.ts b/services/content-publishing/apps/worker/src/asset_processor/asset.processor.module.ts index dd3aea9f..186cfc45 100644 --- a/services/content-publishing/apps/worker/src/asset_processor/asset.processor.module.ts +++ b/services/content-publishing/apps/worker/src/asset_processor/asset.processor.module.ts @@ -1,22 +1,13 @@ -/* -https://docs.nestjs.com/modules -*/ - -import { BullModule } from '@nestjs/bullmq'; +import { ConfigService } from '#libs/config'; +import { IpfsService } from '#libs/utils/ipfs.client'; import { Module } from '@nestjs/common'; import { RedisModule } from '@songkeys/nestjs-redis'; -import { ASSET_QUEUE_NAME } from '../../../../libs/common/src'; import { AssetProcessorService } from './asset.processor.service'; -import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client'; -import { ConfigModule } from '../../../../libs/common/src/config/config.module'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; @Module({ imports: [ - ConfigModule, RedisModule.forRootAsync( { - imports: [ConfigModule], useFactory: (configService: ConfigService) => ({ config: [{ url: configService.redisUrl.toString() }], }), @@ -24,34 +15,8 @@ import { ConfigService } from '../../../../libs/common/src/config/config.service }, true, // isGlobal ), - BullModule.forRootAsync({ - imports: [ConfigModule], - useFactory: (configService: ConfigService) => { - // Note: BullMQ doesn't honor a URL for the Redis connection, and - // JS URL doesn't parse 'redis://' as a valid protocol, so we fool - // it by changing the URL to use 'http://' in order to parse out - // the host, port, username, password, etc. - // We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but - // trying to keep the # of environment variables from proliferating - const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http')); - const { hostname, port, username, password, pathname } = url; - return { - connection: { - host: hostname || undefined, - port: port ? Number(port) : undefined, - username: username || undefined, - password: password || undefined, - db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined, - }, - }; - }, - inject: [ConfigService], - }), - BullModule.registerQueue({ - name: ASSET_QUEUE_NAME, - }), ], providers: [AssetProcessorService, IpfsService], - exports: [BullModule, AssetProcessorService, IpfsService], + exports: [AssetProcessorService, IpfsService], }) export class AssetProcessorModule {} diff --git a/services/content-publishing/apps/worker/src/asset_processor/asset.processor.service.ts b/services/content-publishing/apps/worker/src/asset_processor/asset.processor.service.ts index aba1b2f9..f3c17839 100644 --- a/services/content-publishing/apps/worker/src/asset_processor/asset.processor.service.ts +++ b/services/content-publishing/apps/worker/src/asset_processor/asset.processor.service.ts @@ -1,12 +1,12 @@ import { InjectRedis } from '@songkeys/nestjs-redis'; -import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; -import { Injectable, Logger } from '@nestjs/common'; +import { Processor, OnWorkerEvent } from '@nestjs/bullmq'; +import { Injectable } from '@nestjs/common'; import { Job } from 'bullmq'; import Redis from 'ioredis'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { ASSET_QUEUE_NAME } from '../../../../libs/common/src'; -import { IAssetJob } from '../../../../libs/common/src/interfaces/asset-job.interface'; -import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client'; +import { ConfigService } from '#libs/config'; +import { IAssetJob } from '#libs/interfaces'; +import { ASSET_QUEUE_NAME } from '#libs/queues/queue.constants'; +import { IpfsService } from '#libs/utils/ipfs.client'; import { BaseConsumer } from '../BaseConsumer'; @Injectable() @@ -20,7 +20,7 @@ export class AssetProcessorService extends BaseConsumer { super(); } - async process(job: Job): Promise { + async process(job: Job): Promise { this.logger.log(`Processing job ${job.id} of type ${job.name}`); this.logger.debug(job.asJSON()); const redisResults = await this.redis.getBuffer(job.data.contentLocation); @@ -41,7 +41,11 @@ export class AssetProcessorService extends BaseConsumer { const secondsPassed = Math.round((Date.now() - job.timestamp) / 1000); const expectedSecondsToExpire = this.configService.assetExpirationIntervalSeconds; const secondsToExpire = Math.max(0, expectedSecondsToExpire - secondsPassed); - const result = await this.redis.pipeline().expire(job.data.contentLocation, secondsToExpire, 'LT').expire(job.data.metadataLocation, secondsToExpire, 'LT').exec(); + const result = await this.redis + .pipeline() + .expire(job.data.contentLocation, secondsToExpire, 'LT') + .expire(job.data.metadataLocation, secondsToExpire, 'LT') + .exec(); this.logger.debug(result); // calling in the end for graceful shutdowns super.onCompleted(job); diff --git a/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.module.ts b/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.module.ts index c318f83b..9c788ba2 100644 --- a/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.module.ts +++ b/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.module.ts @@ -1,84 +1,14 @@ -/* -https://docs.nestjs.com/modules -*/ - -import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; import { EventEmitterModule } from '@nestjs/event-emitter'; -import { RedisModule } from '@songkeys/nestjs-redis'; import { BatchAnnouncementService } from './batch.announcer.service'; -import { ConfigModule } from '../../../../libs/common/src/config/config.module'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; import { BatchAnnouncer } from './batch.announcer'; -import { BATCH_QUEUE_NAME, PUBLISH_QUEUE_NAME } from '../../../../libs/common/src'; -import { BlockchainModule } from '../../../../libs/common/src/blockchain/blockchain.module'; -import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client'; +import { BlockchainModule } from '#libs/blockchain/blockchain.module'; +import { IpfsService } from '#libs/utils/ipfs.client'; @Module({ - imports: [ - ConfigModule, - BlockchainModule, - EventEmitterModule, - RedisModule.forRootAsync( - { - imports: [ConfigModule], - useFactory: (configService: ConfigService) => ({ - config: [{ url: configService.redisUrl.toString() }], - }), - inject: [ConfigService], - }, - true, // isGlobal - ), - BullModule.forRootAsync({ - imports: [ConfigModule], - useFactory: (configService: ConfigService) => { - // Note: BullMQ doesn't honor a URL for the Redis connection, and - // JS URL doesn't parse 'redis://' as a valid protocol, so we fool - // it by changing the URL to use 'http://' in order to parse out - // the host, port, username, password, etc. - // We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but - // trying to keep the # of environment variables from proliferating - const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http')); - const { hostname, port, username, password, pathname } = url; - return { - connection: { - host: hostname || undefined, - port: port ? Number(port) : undefined, - username: username || undefined, - password: password || undefined, - db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined, - }, - }; - }, - inject: [ConfigService], - }), - BullModule.registerQueue( - { - name: PUBLISH_QUEUE_NAME, - defaultJobOptions: { - attempts: 1, - backoff: { - type: 'exponential', - }, - removeOnComplete: true, - removeOnFail: false, - }, - }, - { - name: BATCH_QUEUE_NAME, - defaultJobOptions: { - attempts: 1, - backoff: { - type: 'exponential', - }, - removeOnComplete: true, - removeOnFail: false, - }, - }, - ), - ], + imports: [BlockchainModule, EventEmitterModule], controllers: [], providers: [BatchAnnouncementService, BatchAnnouncer, IpfsService], - exports: [BullModule, BatchAnnouncementService, BatchAnnouncer, IpfsService], + exports: [BatchAnnouncementService, BatchAnnouncer, IpfsService], }) export class BatchAnnouncerModule {} diff --git a/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.service.ts b/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.service.ts index 1bd5c7e8..04dc4024 100644 --- a/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.service.ts +++ b/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.service.ts @@ -1,16 +1,12 @@ -import { InjectRedis } from '@songkeys/nestjs-redis'; -import { Processor, WorkerHost, OnWorkerEvent, InjectQueue } from '@nestjs/bullmq'; -import { Injectable, Logger, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common'; +import { Processor, InjectQueue } from '@nestjs/bullmq'; +import { Injectable, OnModuleDestroy } from '@nestjs/common'; import { Job, Queue } from 'bullmq'; -import Redis from 'ioredis'; import { SchedulerRegistry } from '@nestjs/schedule'; -import { EventEmitter2 } from '@nestjs/event-emitter'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; import { BatchAnnouncer } from './batch.announcer'; -import { CAPACITY_EPOCH_TIMEOUT_NAME } from '../../../../libs/common/src/constants'; -import { IBatchAnnouncerJobData } from '../interfaces/batch-announcer.job.interface'; -import { BATCH_QUEUE_NAME, PUBLISH_QUEUE_NAME } from '../../../../libs/common/src'; import { BaseConsumer } from '../BaseConsumer'; +import { BATCH_QUEUE_NAME, PUBLISH_QUEUE_NAME } from '#libs/queues/queue.constants'; +import { IBatchAnnouncerJobData } from '../interfaces'; +import { CAPACITY_EPOCH_TIMEOUT_NAME } from '#libs/constants'; @Injectable() @Processor(BATCH_QUEUE_NAME, { @@ -18,12 +14,9 @@ import { BaseConsumer } from '../BaseConsumer'; }) export class BatchAnnouncementService extends BaseConsumer implements OnModuleDestroy { constructor( - @InjectRedis() private cacheManager: Redis, @InjectQueue(PUBLISH_QUEUE_NAME) private publishQueue: Queue, - private configService: ConfigService, private ipfsPublisher: BatchAnnouncer, private schedulerRegistry: SchedulerRegistry, - private eventEmitter: EventEmitter2, ) { super(); } @@ -43,7 +36,11 @@ export class BatchAnnouncementService extends BaseConsumer implements OnModuleDe try { const publisherJob = await this.ipfsPublisher.announce(job.data); // eslint-disable-next-line no-promise-executor-return - await this.publishQueue.add(publisherJob.id, publisherJob, { jobId: publisherJob.id, removeOnComplete: 1000, attempts: 3 }); + await this.publishQueue.add(publisherJob.id, publisherJob, { + jobId: publisherJob.id, + removeOnComplete: 1000, + attempts: 3, + }); this.logger.log(`Completed job ${job.id} of type ${job.name}`); return job.data; } catch (e) { diff --git a/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.spec.ts b/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.spec.ts index 94488b46..c52e6618 100644 --- a/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.spec.ts +++ b/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.spec.ts @@ -3,7 +3,6 @@ import assert from 'assert'; import { FrequencyParquetSchema } from '@dsnp/frequency-schemas/types/frequency'; import Redis from 'ioredis-mock'; import { stringToHex } from '@polkadot/util'; -import { Bytes } from '@polkadot/types'; import { BatchAnnouncer } from './batch.announcer'; // Create a mock for the dependencies @@ -62,7 +61,12 @@ describe('BatchAnnouncer', () => { const mockClient = new Redis(); beforeEach(async () => { - ipfsAnnouncer = new BatchAnnouncer(mockClient, mockConfigService as any, mockBlockchainService as any, mockIpfsService as any); + ipfsAnnouncer = new BatchAnnouncer( + mockClient, + mockConfigService as any, + mockBlockchainService as any, + mockIpfsService as any, + ); }); it('should be defined', () => { expect(ipfsAnnouncer).toBeDefined(); diff --git a/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.ts b/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.ts index fec9a5bc..b04e054a 100644 --- a/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.ts +++ b/services/content-publishing/apps/worker/src/batch_announcer/batch.announcer.ts @@ -5,12 +5,11 @@ import { fromFrequencySchema } from '@dsnp/frequency-schemas/parquet'; import { InjectRedis } from '@songkeys/nestjs-redis'; import Redis from 'ioredis'; import { hexToString } from '@polkadot/util'; -import { STORAGE_EXPIRE_UPPER_LIMIT_SECONDS } from '../../../../libs/common/src/utils/redis'; -import { BlockchainService } from '../../../../libs/common/src/blockchain/blockchain.service'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { IBatchAnnouncerJobData } from '../interfaces/batch-announcer.job.interface'; -import { IPublisherJob } from '../interfaces/publisher-job.interface'; -import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client'; +import { ConfigService } from '#libs/config'; +import { BlockchainService } from '#libs/blockchain/blockchain.service'; +import { IpfsService } from '#libs/utils/ipfs.client'; +import { STORAGE_EXPIRE_UPPER_LIMIT_SECONDS } from '#libs/utils/redis'; +import { IBatchAnnouncerJobData, IPublisherJob } from '../interfaces'; @Injectable() export class BatchAnnouncer { diff --git a/services/content-publishing/apps/worker/src/batching_processor/batching.processor.module.ts b/services/content-publishing/apps/worker/src/batching_processor/batching.processor.module.ts index 8b71ec57..8951577f 100644 --- a/services/content-publishing/apps/worker/src/batching_processor/batching.processor.module.ts +++ b/services/content-publishing/apps/worker/src/batching_processor/batching.processor.module.ts @@ -1,82 +1,19 @@ -/* -https://docs.nestjs.com/modules -*/ - -import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; -import { RedisModule } from '@songkeys/nestjs-redis'; import { ScheduleModule } from '@nestjs/schedule'; -import { ConfigModule } from '../../../../libs/common/src/config/config.module'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import * as QueueConstants from '../../../../libs/common/src'; import { BatchingProcessorService } from './batching.processor.service'; -import { BroadcastWorker } from './workers/broadcast.worker'; -import { ReplyWorker } from './workers/reply.worker'; -import { ReactionWorker } from './workers/reaction.worker'; -import { TombstoneWorker } from './workers/tombstone.worker'; -import { UpdateWorker } from './workers/update.worker'; -import { ProfileWorker } from './workers/profile.worker'; +import { BroadcastWorker, ReplyWorker, ReactionWorker, TombstoneWorker, UpdateWorker, ProfileWorker } from './workers'; @Module({ - imports: [ - ConfigModule, - RedisModule.forRootAsync( - { - imports: [ConfigModule], - useFactory: (configService: ConfigService) => ({ - config: [{ url: configService.redisUrl.toString() }], - }), - inject: [ConfigService], - }, - true, // isGlobal - ), - BullModule.forRootAsync({ - imports: [ConfigModule], - useFactory: (configService: ConfigService) => { - // Note: BullMQ doesn't honor a URL for the Redis connection, and - // JS URL doesn't parse 'redis://' as a valid protocol, so we fool - // it by changing the URL to use 'http://' in order to parse out - // the host, port, username, password, etc. - // We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but - // trying to keep the # of environment variables from proliferating - const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http')); - const { hostname, port, username, password, pathname } = url; - return { - connection: { - host: hostname || undefined, - port: port ? Number(port) : undefined, - username: username || undefined, - password: password || undefined, - db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined, - }, - }; - }, - inject: [ConfigService], - }), - ScheduleModule.forRoot(), - BullModule.registerQueue({ - name: QueueConstants.BATCH_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.BROADCAST_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.REPLY_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.REACTION_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.TOMBSTONE_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.UPDATE_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.PROFILE_QUEUE_NAME, - }), + imports: [ScheduleModule.forRoot()], + providers: [ + BatchingProcessorService, + BroadcastWorker, + ReplyWorker, + ReactionWorker, + TombstoneWorker, + UpdateWorker, + ProfileWorker, ], - providers: [BatchingProcessorService, BroadcastWorker, ReplyWorker, ReactionWorker, TombstoneWorker, UpdateWorker, ProfileWorker], - exports: [BullModule, BatchingProcessorService], + exports: [BatchingProcessorService], }) export class BatchingProcessorModule {} diff --git a/services/content-publishing/apps/worker/src/batching_processor/batching.processor.service.ts b/services/content-publishing/apps/worker/src/batching_processor/batching.processor.service.ts index 460979b7..cc2d28f6 100644 --- a/services/content-publishing/apps/worker/src/batching_processor/batching.processor.service.ts +++ b/services/content-publishing/apps/worker/src/batching_processor/batching.processor.service.ts @@ -7,13 +7,17 @@ import { SchedulerRegistry } from '@nestjs/schedule'; import { randomUUID } from 'crypto'; import * as fs from 'fs'; import { MILLISECONDS_PER_SECOND } from 'time-constants'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { Announcement } from '../../../../libs/common/src/interfaces/dsnp'; -import { getBatchMetadataKey, getBatchDataKey, getLockKey as getBatchLockKey, BATCH_LOCK_EXPIRE_SECONDS } from '../../../../libs/common/src/utils/redis'; -import { IBatchMetadata } from '../../../../libs/common/src/interfaces/batch.interface'; -import { IBatchAnnouncerJobData } from '../interfaces/batch-announcer.job.interface'; -import { getSchemaId } from '../../../../libs/common/src/utils/dsnp.schema'; -import { BATCH_QUEUE_NAME, QUEUE_NAME_TO_ANNOUNCEMENT_MAP } from '../../../../libs/common/src'; +import { + getBatchMetadataKey, + getBatchDataKey, + getLockKey as getBatchLockKey, + BATCH_LOCK_EXPIRE_SECONDS, +} from '#libs/utils/redis'; +import { BATCH_QUEUE_NAME, QUEUE_NAME_TO_ANNOUNCEMENT_MAP } from '#libs/queues/queue.constants'; +import { ConfigService } from '#libs/config'; +import { Announcement, IBatchMetadata } from '#libs/interfaces'; +import { IBatchAnnouncerJobData } from '../interfaces'; +import { getSchemaId } from '#libs/utils/dsnp.schema'; @Injectable() export class BatchingProcessorService { @@ -62,7 +66,13 @@ export class BatchingProcessorService { const newData = JSON.stringify(job.data); // @ts-expect-error addToBatch is a custom command - const rowCount = await this.redis.addToBatch(getBatchMetadataKey(queueName), getBatchDataKey(queueName), newMetadata, job.id!, newData); + const rowCount = await this.redis.addToBatch( + getBatchMetadataKey(queueName), + getBatchDataKey(queueName), + newMetadata, + job.id!, + newData, + ); this.logger.log(rowCount); if (rowCount === 1) { this.logger.log(`Processing job ${job.id} with a new batch`); @@ -71,7 +81,9 @@ export class BatchingProcessorService { } else if (rowCount >= this.configService.batchMaxCount) { await this.closeBatch(queueName, batchId, false); } else if (rowCount === -1) { - throw new Error(`invalid result from addingToBatch for job ${job.id} and queue ${queueName} ${this.configService.batchMaxCount}`); + throw new Error( + `invalid result from addingToBatch for job ${job.id} and queue ${queueName} ${this.configService.batchMaxCount}`, + ); } } @@ -87,7 +99,14 @@ export class BatchingProcessorService { const lockedBatchMetaDataKey = getBatchLockKey(batchMetaDataKey); const lockedBatchDataKey = getBatchLockKey(batchDataKey); // @ts-expect-error lockBatch is a custom command - const response = await this.redis.lockBatch(batchMetaDataKey, batchDataKey, lockedBatchMetaDataKey, lockedBatchDataKey, Date.now(), BATCH_LOCK_EXPIRE_SECONDS * 1000); + const response = await this.redis.lockBatch( + batchMetaDataKey, + batchDataKey, + lockedBatchMetaDataKey, + lockedBatchDataKey, + Date.now(), + BATCH_LOCK_EXPIRE_SECONDS * 1000, + ); this.logger.debug(JSON.stringify(response)); const status = response[0]; @@ -115,7 +134,11 @@ export class BatchingProcessorService { schemaId: getSchemaId(this.configService.environment, QUEUE_NAME_TO_ANNOUNCEMENT_MAP.get(queueName)!), announcements, } as IBatchAnnouncerJobData; - await this.outputQueue.add(`Batch Job - ${metaData.batchId}`, job, { jobId: metaData.batchId, removeOnFail: false, removeOnComplete: 1000 }); + await this.outputQueue.add(`Batch Job - ${metaData.batchId}`, job, { + jobId: metaData.batchId, + removeOnFail: false, + removeOnComplete: 1000, + }); } try { const result = await this.redis.multi().del(lockedBatchMetaDataKey).del(lockedBatchDataKey).exec(); diff --git a/services/content-publishing/apps/worker/src/batching_processor/workers/broadcast.worker.ts b/services/content-publishing/apps/worker/src/batching_processor/workers/broadcast.worker.ts index 4b05ca45..1e95e766 100644 --- a/services/content-publishing/apps/worker/src/batching_processor/workers/broadcast.worker.ts +++ b/services/content-publishing/apps/worker/src/batching_processor/workers/broadcast.worker.ts @@ -1,10 +1,10 @@ -import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; -import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { Announcement } from '#libs/interfaces'; +import { BROADCAST_QUEUE_NAME } from '#libs/queues/queue.constants'; +import { Processor, OnWorkerEvent } from '@nestjs/bullmq'; +import { Injectable, OnApplicationBootstrap } from '@nestjs/common'; import { Job } from 'bullmq'; -import { BROADCAST_QUEUE_NAME } from '../../../../../libs/common/src'; -import { BatchingProcessorService } from '../batching.processor.service'; -import { Announcement } from '../../../../../libs/common/src/interfaces/dsnp'; import { BaseConsumer } from '../../BaseConsumer'; +import { BatchingProcessorService } from '../batching.processor.service'; @Injectable() @Processor(BROADCAST_QUEUE_NAME, { concurrency: 2 }) diff --git a/services/content-publishing/apps/worker/src/batching_processor/workers/index.ts b/services/content-publishing/apps/worker/src/batching_processor/workers/index.ts new file mode 100644 index 00000000..87931b63 --- /dev/null +++ b/services/content-publishing/apps/worker/src/batching_processor/workers/index.ts @@ -0,0 +1,6 @@ +export * from './broadcast.worker'; +export * from './profile.worker'; +export * from './reaction.worker'; +export * from './reply.worker'; +export * from './tombstone.worker'; +export * from './update.worker'; diff --git a/services/content-publishing/apps/worker/src/batching_processor/workers/profile.worker.ts b/services/content-publishing/apps/worker/src/batching_processor/workers/profile.worker.ts index c3305348..5033f7f7 100644 --- a/services/content-publishing/apps/worker/src/batching_processor/workers/profile.worker.ts +++ b/services/content-publishing/apps/worker/src/batching_processor/workers/profile.worker.ts @@ -1,10 +1,10 @@ +import { Announcement } from '#libs/interfaces'; +import { PROFILE_QUEUE_NAME } from '#libs/queues/queue.constants'; import { Processor, OnWorkerEvent } from '@nestjs/bullmq'; import { Injectable, OnApplicationBootstrap } from '@nestjs/common'; import { Job } from 'bullmq'; -import { PROFILE_QUEUE_NAME } from '../../../../../libs/common/src'; -import { BatchingProcessorService } from '../batching.processor.service'; -import { Announcement } from '../../../../../libs/common/src/interfaces/dsnp'; import { BaseConsumer } from '../../BaseConsumer'; +import { BatchingProcessorService } from '../batching.processor.service'; @Injectable() @Processor(PROFILE_QUEUE_NAME, { concurrency: 2 }) diff --git a/services/content-publishing/apps/worker/src/batching_processor/workers/reaction.worker.ts b/services/content-publishing/apps/worker/src/batching_processor/workers/reaction.worker.ts index b698c555..5c71fba7 100644 --- a/services/content-publishing/apps/worker/src/batching_processor/workers/reaction.worker.ts +++ b/services/content-publishing/apps/worker/src/batching_processor/workers/reaction.worker.ts @@ -1,10 +1,10 @@ -import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; -import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { Announcement } from '#libs/interfaces'; +import { REACTION_QUEUE_NAME } from '#libs/queues/queue.constants'; +import { Processor, OnWorkerEvent } from '@nestjs/bullmq'; +import { Injectable, OnApplicationBootstrap } from '@nestjs/common'; import { Job } from 'bullmq'; -import { REACTION_QUEUE_NAME } from '../../../../../libs/common/src'; -import { BatchingProcessorService } from '../batching.processor.service'; -import { Announcement } from '../../../../../libs/common/src/interfaces/dsnp'; import { BaseConsumer } from '../../BaseConsumer'; +import { BatchingProcessorService } from '../batching.processor.service'; @Injectable() @Processor(REACTION_QUEUE_NAME, { concurrency: 2 }) diff --git a/services/content-publishing/apps/worker/src/batching_processor/workers/reply.worker.ts b/services/content-publishing/apps/worker/src/batching_processor/workers/reply.worker.ts index 9a1a7f3c..21ea8929 100644 --- a/services/content-publishing/apps/worker/src/batching_processor/workers/reply.worker.ts +++ b/services/content-publishing/apps/worker/src/batching_processor/workers/reply.worker.ts @@ -1,10 +1,10 @@ +import { Announcement } from '#libs/interfaces'; +import { REPLY_QUEUE_NAME } from '#libs/queues/queue.constants'; import { Processor, OnWorkerEvent } from '@nestjs/bullmq'; import { Injectable, OnApplicationBootstrap } from '@nestjs/common'; import { Job } from 'bullmq'; -import { REPLY_QUEUE_NAME } from '../../../../../libs/common/src'; -import { BatchingProcessorService } from '../batching.processor.service'; -import { Announcement } from '../../../../../libs/common/src/interfaces/dsnp'; import { BaseConsumer } from '../../BaseConsumer'; +import { BatchingProcessorService } from '../batching.processor.service'; @Injectable() @Processor(REPLY_QUEUE_NAME, { concurrency: 2 }) diff --git a/services/content-publishing/apps/worker/src/batching_processor/workers/tombstone.worker.ts b/services/content-publishing/apps/worker/src/batching_processor/workers/tombstone.worker.ts index 67df518a..3058b281 100644 --- a/services/content-publishing/apps/worker/src/batching_processor/workers/tombstone.worker.ts +++ b/services/content-publishing/apps/worker/src/batching_processor/workers/tombstone.worker.ts @@ -1,10 +1,10 @@ +import { Announcement } from '#libs/interfaces'; +import { TOMBSTONE_QUEUE_NAME } from '#libs/queues/queue.constants'; import { Processor, OnWorkerEvent } from '@nestjs/bullmq'; import { Injectable, OnApplicationBootstrap } from '@nestjs/common'; import { Job } from 'bullmq'; -import { TOMBSTONE_QUEUE_NAME } from '../../../../../libs/common/src'; -import { BatchingProcessorService } from '../batching.processor.service'; -import { Announcement } from '../../../../../libs/common/src/interfaces/dsnp'; import { BaseConsumer } from '../../BaseConsumer'; +import { BatchingProcessorService } from '../batching.processor.service'; @Injectable() @Processor(TOMBSTONE_QUEUE_NAME, { concurrency: 2 }) diff --git a/services/content-publishing/apps/worker/src/batching_processor/workers/update.worker.ts b/services/content-publishing/apps/worker/src/batching_processor/workers/update.worker.ts index 9c3f0050..e323a246 100644 --- a/services/content-publishing/apps/worker/src/batching_processor/workers/update.worker.ts +++ b/services/content-publishing/apps/worker/src/batching_processor/workers/update.worker.ts @@ -1,10 +1,10 @@ -import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; -import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; +import { Announcement } from '#libs/interfaces'; +import { UPDATE_QUEUE_NAME } from '#libs/queues/queue.constants'; +import { Processor, OnWorkerEvent } from '@nestjs/bullmq'; +import { Injectable, OnApplicationBootstrap } from '@nestjs/common'; import { Job } from 'bullmq'; -import { UPDATE_QUEUE_NAME } from '../../../../../libs/common/src'; -import { BatchingProcessorService } from '../batching.processor.service'; -import { Announcement } from '../../../../../libs/common/src/interfaces/dsnp'; import { BaseConsumer } from '../../BaseConsumer'; +import { BatchingProcessorService } from '../batching.processor.service'; @Injectable() @Processor(UPDATE_QUEUE_NAME, { concurrency: 2 }) diff --git a/services/content-publishing/apps/worker/src/interfaces/batch-announcer.job.interface.ts b/services/content-publishing/apps/worker/src/interfaces/batch-announcer.job.interface.ts index 10cb3b07..4c17d668 100644 --- a/services/content-publishing/apps/worker/src/interfaces/batch-announcer.job.interface.ts +++ b/services/content-publishing/apps/worker/src/interfaces/batch-announcer.job.interface.ts @@ -1,4 +1,4 @@ -import { Announcement } from '../../../../libs/common/src/interfaces/dsnp'; +import { Announcement } from '#libs/interfaces/dsnp'; export interface IBatchAnnouncerJobData { batchId: string; diff --git a/services/content-publishing/apps/worker/src/interfaces/index.ts b/services/content-publishing/apps/worker/src/interfaces/index.ts new file mode 100644 index 00000000..ee538107 --- /dev/null +++ b/services/content-publishing/apps/worker/src/interfaces/index.ts @@ -0,0 +1,3 @@ +export * from './batch-announcer.job.interface'; +export * from './publisher-job.interface'; +export * from './status-monitor.interface'; diff --git a/services/content-publishing/apps/worker/src/monitor/status.monitor.module.ts b/services/content-publishing/apps/worker/src/monitor/status.monitor.module.ts index 81cb910e..96d6fff4 100644 --- a/services/content-publishing/apps/worker/src/monitor/status.monitor.module.ts +++ b/services/content-publishing/apps/worker/src/monitor/status.monitor.module.ts @@ -1,70 +1,12 @@ -/* -https://docs.nestjs.com/modules -*/ - -import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; import { EventEmitterModule } from '@nestjs/event-emitter'; -import { RedisModule } from '@songkeys/nestjs-redis'; import { TxStatusMonitoringService } from './tx.status.monitor.service'; -import { ConfigModule } from '../../../../libs/common/src/config/config.module'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { BlockchainModule } from '../../../../libs/common/src/blockchain/blockchain.module'; -import { PUBLISH_QUEUE_NAME, TRANSACTION_RECEIPT_QUEUE_NAME } from '../../../../libs/common/src'; +import { BlockchainModule } from '#libs/blockchain/blockchain.module'; @Module({ - imports: [ - ConfigModule, - BlockchainModule, - EventEmitterModule, - RedisModule.forRootAsync( - { - imports: [ConfigModule], - useFactory: (configService: ConfigService) => ({ - config: [{ url: configService.redisUrl.toString() }], - }), - inject: [ConfigService], - }, - true, // isGlobal - ), - BullModule.forRootAsync({ - imports: [ConfigModule], - useFactory: (configService: ConfigService) => { - // Note: BullMQ doesn't honor a URL for the Redis connection, and - // JS URL doesn't parse 'redis://' as a valid protocol, so we fool - // it by changing the URL to use 'http://' in order to parse out - // the host, port, username, password, etc. - // We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but - // trying to keep the # of environment variables from proliferating - const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http')); - const { hostname, port, username, password, pathname } = url; - return { - connection: { - host: hostname || undefined, - port: port ? Number(port) : undefined, - username: username || undefined, - password: password || undefined, - db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined, - }, - }; - }, - inject: [ConfigService], - }), - BullModule.registerQueue( - { - name: TRANSACTION_RECEIPT_QUEUE_NAME, - defaultJobOptions: { - removeOnComplete: true, - removeOnFail: false, - }, - }, - { - name: PUBLISH_QUEUE_NAME, - }, - ), - ], + imports: [BlockchainModule, EventEmitterModule], controllers: [], providers: [TxStatusMonitoringService], - exports: [BullModule, TxStatusMonitoringService], + exports: [TxStatusMonitoringService], }) export class StatusMonitorModule {} diff --git a/services/content-publishing/apps/worker/src/monitor/tx.status.monitor.service.ts b/services/content-publishing/apps/worker/src/monitor/tx.status.monitor.service.ts index b1e31e8b..b27ae167 100644 --- a/services/content-publishing/apps/worker/src/monitor/tx.status.monitor.service.ts +++ b/services/content-publishing/apps/worker/src/monitor/tx.status.monitor.service.ts @@ -5,13 +5,12 @@ import { Job, Queue, UnrecoverableError } from 'bullmq'; import Redis from 'ioredis'; import { MILLISECONDS_PER_SECOND } from 'time-constants'; import { RegistryError } from '@polkadot/types/types'; -import { BlockchainService } from '../../../../libs/common/src/blockchain/blockchain.service'; -import { ITxMonitorJob } from '../interfaces/status-monitor.interface'; -import { PUBLISH_QUEUE_NAME, TRANSACTION_RECEIPT_QUEUE_NAME } from '../../../../libs/common/src'; -import { SECONDS_PER_BLOCK } from '../../../../libs/common/src/constants'; -import { NUMBER_BLOCKS_TO_CRAWL } from '../../../../libs/common/src/blockchain/blockchain-constants'; +import { NUMBER_BLOCKS_TO_CRAWL } from '#libs/blockchain/blockchain-constants'; +import { BlockchainService } from '#libs/blockchain/blockchain.service'; +import { TRANSACTION_RECEIPT_QUEUE_NAME, PUBLISH_QUEUE_NAME } from '#libs/queues/queue.constants'; import { BaseConsumer } from '../BaseConsumer'; -import { IPublisherJob } from '../interfaces/publisher-job.interface'; +import { ITxMonitorJob, IPublisherJob } from '../interfaces'; +import { SECONDS_PER_BLOCK } from '#libs/constants'; @Injectable() @Processor(TRANSACTION_RECEIPT_QUEUE_NAME, { @@ -30,14 +29,22 @@ export class TxStatusMonitoringService extends BaseConsumer { this.logger.log(`Monitoring job ${job.id} of type ${job.name}`); try { const numberBlocksToParse = NUMBER_BLOCKS_TO_CRAWL; - const previousKnownBlockNumber = (await this.blockchainService.getBlock(job.data.lastFinalizedBlockHash)).block.header.number.toBigInt(); + const previousKnownBlockNumber = ( + await this.blockchainService.getBlock(job.data.lastFinalizedBlockHash) + ).block.header.number.toNumber(); const currentFinalizedBlockNumber = await this.blockchainService.getLatestFinalizedBlockNumber(); - const blockList: bigint[] = []; + const blockList: number[] = []; - for (let i = previousKnownBlockNumber; i <= currentFinalizedBlockNumber && i < previousKnownBlockNumber + numberBlocksToParse; i += 1n) { + for ( + let i = previousKnownBlockNumber; + i <= currentFinalizedBlockNumber && i < previousKnownBlockNumber + numberBlocksToParse; + i += 1 + ) { blockList.push(i); } - const txResult = await this.blockchainService.crawlBlockListForTx(job.data.txHash, blockList, [{ pallet: 'system', event: 'ExtrinsicSuccess' }]); + const txResult = await this.blockchainService.crawlBlockListForTx(job.data.txHash, blockList, [ + { pallet: 'system', event: 'ExtrinsicSuccess' }, + ]); if (!txResult.found) { if (job.attemptsMade < (job.opts.attempts ?? 3)) { @@ -79,7 +86,10 @@ export class TxStatusMonitoringService extends BaseConsumer { } } - private async handleMessagesFailure(jobId: string, moduleError: RegistryError): Promise<{ pause: boolean; retry: boolean }> { + private async handleMessagesFailure( + jobId: string, + moduleError: RegistryError, + ): Promise<{ pause: boolean; retry: boolean }> { try { switch (moduleError.method) { case 'TooManyMessagesInBlock': diff --git a/services/content-publishing/apps/worker/src/publisher/ipfs.publisher.ts b/services/content-publishing/apps/worker/src/publisher/ipfs.publisher.ts index 3e53df6a..a950d3a0 100644 --- a/services/content-publishing/apps/worker/src/publisher/ipfs.publisher.ts +++ b/services/content-publishing/apps/worker/src/publisher/ipfs.publisher.ts @@ -3,11 +3,11 @@ import { KeyringPair } from '@polkadot/keyring/types'; import { ISubmittableResult } from '@polkadot/types/types'; import { SubmittableExtrinsic } from '@polkadot/api-base/types'; import { Hash } from '@polkadot/types/interfaces'; -import { BlockchainService } from '../../../../libs/common/src/blockchain/blockchain.service'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { IPublisherJob } from '../interfaces/publisher-job.interface'; -import { createKeys } from '../../../../libs/common/src/blockchain/create-keys'; import { NonceService } from './nonce.service'; +import { ConfigService } from '#libs/config'; +import { BlockchainService } from '#libs/blockchain/blockchain.service'; +import { createKeys } from '#libs/blockchain/create-keys'; +import { IPublisherJob } from '../interfaces'; @Injectable() export class IPFSPublisher { @@ -24,11 +24,19 @@ export class IPFSPublisher { public async publish(message: IPublisherJob): Promise { this.logger.debug(JSON.stringify(message)); const providerKeys = createKeys(this.configService.providerAccountSeedPhrase); - const tx = this.blockchainService.createExtrinsicCall({ pallet: 'messages', extrinsic: 'addIpfsMessage' }, message.schemaId, message.data.cid, message.data.payloadLength); + const tx = this.blockchainService.createExtrinsicCall( + { pallet: 'messages', extrinsic: 'addIpfsMessage' }, + message.schemaId, + message.data.cid, + message.data.payloadLength, + ); return this.processSingleBatch(providerKeys, tx); } - async processSingleBatch(providerKeys: KeyringPair, tx: SubmittableExtrinsic<'rxjs', ISubmittableResult>): Promise { + async processSingleBatch( + providerKeys: KeyringPair, + tx: SubmittableExtrinsic<'rxjs', ISubmittableResult>, + ): Promise { this.logger.debug(`Submitting tx of size ${tx.length}`); try { const ext = await this.blockchainService.createExtrinsic( diff --git a/services/content-publishing/apps/worker/src/publisher/nonce.service.ts b/services/content-publishing/apps/worker/src/publisher/nonce.service.ts index 1552c23f..f23f35c7 100644 --- a/services/content-publishing/apps/worker/src/publisher/nonce.service.ts +++ b/services/content-publishing/apps/worker/src/publisher/nonce.service.ts @@ -2,10 +2,10 @@ import { InjectRedis } from '@songkeys/nestjs-redis'; import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common'; import Redis from 'ioredis'; import fs from 'fs'; -import { createKeys } from '../../../../libs/common/src/blockchain/create-keys'; -import { NUMBER_OF_NONCE_KEYS_TO_CHECK, NONCE_KEY_EXPIRE_SECONDS, getNonceKey } from '../../../../libs/common/src/utils/redis'; -import { BlockchainService } from '../../../../libs/common/src/blockchain/blockchain.service'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; +import { ConfigService } from '#libs/config'; +import { BlockchainService } from '#libs/blockchain/blockchain.service'; +import { createKeys } from '#libs/blockchain/create-keys'; +import { NUMBER_OF_NONCE_KEYS_TO_CHECK, getNonceKey } from '#libs/utils/redis'; @Injectable() export class NonceService implements OnApplicationBootstrap { diff --git a/services/content-publishing/apps/worker/src/publisher/publisher.module.ts b/services/content-publishing/apps/worker/src/publisher/publisher.module.ts index b8b8a291..d49d61fb 100644 --- a/services/content-publishing/apps/worker/src/publisher/publisher.module.ts +++ b/services/content-publishing/apps/worker/src/publisher/publisher.module.ts @@ -1,84 +1,14 @@ -/* -https://docs.nestjs.com/modules -*/ - -import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; import { EventEmitterModule } from '@nestjs/event-emitter'; -import { RedisModule } from '@songkeys/nestjs-redis'; import { PublishingService } from './publishing.service'; -import { ConfigModule } from '../../../../libs/common/src/config/config.module'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { BlockchainModule } from '../../../../libs/common/src/blockchain/blockchain.module'; import { IPFSPublisher } from './ipfs.publisher'; -import { PUBLISH_QUEUE_NAME, TRANSACTION_RECEIPT_QUEUE_NAME } from '../../../../libs/common/src'; import { NonceService } from './nonce.service'; +import { BlockchainModule } from '#libs/blockchain/blockchain.module'; @Module({ - imports: [ - BlockchainModule, - ConfigModule, - EventEmitterModule, - RedisModule.forRootAsync( - { - imports: [ConfigModule], - useFactory: (configService: ConfigService) => ({ - config: [{ url: configService.redisUrl.toString() }], - }), - inject: [ConfigService], - }, - true, // isGlobal - ), - BullModule.forRootAsync({ - imports: [ConfigModule], - useFactory: (configService: ConfigService) => { - // Note: BullMQ doesn't honor a URL for the Redis connection, and - // JS URL doesn't parse 'redis://' as a valid protocol, so we fool - // it by changing the URL to use 'http://' in order to parse out - // the host, port, username, password, etc. - // We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but - // trying to keep the # of environment variables from proliferating - const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http')); - const { hostname, port, username, password, pathname } = url; - return { - connection: { - host: hostname || undefined, - port: port ? Number(port) : undefined, - username: username || undefined, - password: password || undefined, - db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined, - }, - }; - }, - inject: [ConfigService], - }), - BullModule.registerQueue( - { - name: PUBLISH_QUEUE_NAME, - defaultJobOptions: { - attempts: 1, - backoff: { - type: 'exponential', - }, - removeOnComplete: false, - removeOnFail: false, - }, - }, - { - name: TRANSACTION_RECEIPT_QUEUE_NAME, - defaultJobOptions: { - attempts: 3, - backoff: { - type: 'exponential', - }, - removeOnComplete: false, - removeOnFail: false, - }, - }, - ), - ], + imports: [BlockchainModule, EventEmitterModule], controllers: [], providers: [PublishingService, IPFSPublisher, NonceService], - exports: [BullModule, PublishingService, IPFSPublisher], + exports: [PublishingService, IPFSPublisher], }) export class PublisherModule {} diff --git a/services/content-publishing/apps/worker/src/publisher/publishing.service.ts b/services/content-publishing/apps/worker/src/publisher/publishing.service.ts index 84972fae..ad1e32e5 100644 --- a/services/content-publishing/apps/worker/src/publisher/publishing.service.ts +++ b/services/content-publishing/apps/worker/src/publisher/publishing.service.ts @@ -1,20 +1,19 @@ import { InjectRedis } from '@songkeys/nestjs-redis'; -import { Processor, WorkerHost, OnWorkerEvent, InjectQueue } from '@nestjs/bullmq'; -import { Injectable, Logger, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common'; +import { Processor, InjectQueue } from '@nestjs/bullmq'; +import { Injectable, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common'; import { Job, Queue } from 'bullmq'; import Redis from 'ioredis'; import { SchedulerRegistry } from '@nestjs/schedule'; import { EventEmitter2, OnEvent } from '@nestjs/event-emitter'; import { MILLISECONDS_PER_SECOND } from 'time-constants'; import { BlockHash, Hash } from '@polkadot/types/interfaces'; -import { BlockchainService } from '../../../../libs/common/src/blockchain/blockchain.service'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { IPublisherJob } from '../interfaces/publisher-job.interface'; -import { IPFSPublisher } from './ipfs.publisher'; -import { CAPACITY_EPOCH_TIMEOUT_NAME, SECONDS_PER_BLOCK } from '../../../../libs/common/src/constants'; -import { PUBLISH_QUEUE_NAME, TRANSACTION_RECEIPT_QUEUE_NAME } from '../../../../libs/common/src'; -import { ITxMonitorJob } from '../interfaces/status-monitor.interface'; +import { ConfigService } from '#libs/config'; +import { BlockchainService } from '#libs/blockchain/blockchain.service'; +import { CAPACITY_EPOCH_TIMEOUT_NAME, SECONDS_PER_BLOCK } from '#libs/constants'; +import { PUBLISH_QUEUE_NAME, TRANSACTION_RECEIPT_QUEUE_NAME } from '#libs/queues/queue.constants'; import { BaseConsumer } from '../BaseConsumer'; +import { IPublisherJob, ITxMonitorJob } from '../interfaces'; +import { IPFSPublisher } from './ipfs.publisher'; @Injectable() @Processor(PUBLISH_QUEUE_NAME, { @@ -40,7 +39,7 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot await this.checkCapacity(); } - async onModuleDestroy(): Promise { + async onModuleDestroy(): Promise { try { this.schedulerRegistry.deleteTimeout(CAPACITY_EPOCH_TIMEOUT_NAME); } catch (e) { @@ -69,7 +68,11 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot } } - async sendJobToTxReceiptQueue(jobData: IPublisherJob, txHash: Hash, lastFinalizedBlockHash: BlockHash): Promise { + async sendJobToTxReceiptQueue( + jobData: IPublisherJob, + txHash: Hash, + lastFinalizedBlockHash: BlockHash, + ): Promise { const job: ITxMonitorJob = { id: txHash.toString(), lastFinalizedBlockHash, @@ -79,7 +82,12 @@ export class PublishingService extends BaseConsumer implements OnApplicationBoot // add a delay of 1 block to allow the tx receipt to go through before checking const initialDelay = 1 * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND; const retryDelay = 3 * SECONDS_PER_BLOCK * MILLISECONDS_PER_SECOND; - await this.txReceiptQueue.add(`Receipt Job - ${job.id}`, job, { jobId: job.id, delay: initialDelay, attempts: 4, backoff: { type: 'exponential', delay: retryDelay } }); + await this.txReceiptQueue.add(`Receipt Job - ${job.id}`, job, { + jobId: job.id, + delay: initialDelay, + attempts: 4, + backoff: { type: 'exponential', delay: retryDelay }, + }); } private async checkCapacity(): Promise { diff --git a/services/content-publishing/apps/worker/src/request_processor/dsnp.announcement.processor.spec.ts b/services/content-publishing/apps/worker/src/request_processor/dsnp.announcement.processor.spec.ts index 5e6285ae..a3e156aa 100644 --- a/services/content-publishing/apps/worker/src/request_processor/dsnp.announcement.processor.spec.ts +++ b/services/content-publishing/apps/worker/src/request_processor/dsnp.announcement.processor.spec.ts @@ -2,9 +2,10 @@ import { Test, TestingModule } from '@nestjs/testing'; import { Queue } from 'bullmq'; import { expect, describe, it, beforeEach, jest } from '@jest/globals'; import { DsnpAnnouncementProcessor } from './dsnp.announcement.processor'; -import { AnnouncementTypeDto, IRequestJob, ModifiableAnnouncementTypeDto, TagTypeDto } from '../../../../libs/common/src'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client'; +import { ConfigService } from '#libs/config'; +import { AnnouncementTypeDto, ModifiableAnnouncementTypeDto, TagTypeDto } from '#libs/dtos'; +import { IRequestJob } from '#libs/interfaces'; +import { IpfsService } from '#libs/utils/ipfs.client'; const mockQueue = { add: jest.fn(), diff --git a/services/content-publishing/apps/worker/src/request_processor/dsnp.announcement.processor.ts b/services/content-publishing/apps/worker/src/request_processor/dsnp.announcement.processor.ts index 5c6ae0d7..296fcd0f 100644 --- a/services/content-publishing/apps/worker/src/request_processor/dsnp.announcement.processor.ts +++ b/services/content-publishing/apps/worker/src/request_processor/dsnp.announcement.processor.ts @@ -13,44 +13,46 @@ import { import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bullmq'; import { Queue } from 'bullmq'; +import { ConfigService } from '#libs/config'; import { - TagTypeDto, - AssetDto, - AttachmentTypeDto, - IRequestJob, + AnnouncementTypeDto, BroadcastDto, - ProfileDto, - ReactionDto, ReplyDto, + ReactionDto, UpdateDto, - AnnouncementTypeDto, + ProfileDto, TombstoneDto, ModifiableAnnouncementTypeDto, - BROADCAST_QUEUE_NAME, - PROFILE_QUEUE_NAME, - REACTION_QUEUE_NAME, - REPLY_QUEUE_NAME, - TOMBSTONE_QUEUE_NAME, - UPDATE_QUEUE_NAME, -} from '../../../../libs/common/src'; -import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { calculateDsnpHash } from '../../../../libs/common/src/utils/ipfs'; + TagTypeDto, + AttachmentTypeDto, + AssetDto, +} from '#libs/dtos'; import { + IRequestJob, AnnouncementType, + createTombstone, + createNote, BroadcastAnnouncement, - ProfileAnnouncement, - ReactionAnnouncement, - ReplyAnnouncement, - UpdateAnnouncement, createBroadcast, - createNote, - createProfile, - createReaction, + ReplyAnnouncement, createReply, - createTombstone, + ReactionAnnouncement, + createReaction, + UpdateAnnouncement, createUpdate, -} from '../../../../libs/common/src/interfaces/dsnp'; + ProfileAnnouncement, + createProfile, +} from '#libs/interfaces'; +import { + BROADCAST_QUEUE_NAME, + REPLY_QUEUE_NAME, + REACTION_QUEUE_NAME, + UPDATE_QUEUE_NAME, + PROFILE_QUEUE_NAME, + TOMBSTONE_QUEUE_NAME, +} from '#libs/queues/queue.constants'; +import { calculateDsnpHash } from '#libs/utils/ipfs'; +import { IpfsService } from '#libs/utils/ipfs.client'; @Injectable() export class DsnpAnnouncementProcessor { @@ -103,39 +105,75 @@ export class DsnpAnnouncementProcessor { private async queueBroadcast(data: IRequestJob) { const broadcast = await this.processBroadcast(data.content as BroadcastDto, data.dsnpUserId, data.assetToMimeType); - await this.broadcastQueue.add(`Broadcast Job - ${data.id}`, broadcast, { jobId: data.id, removeOnFail: false, removeOnComplete: 2000 }); + await this.broadcastQueue.add(`Broadcast Job - ${data.id}`, broadcast, { + jobId: data.id, + removeOnFail: false, + removeOnComplete: 2000, + }); } private async queueReply(data: IRequestJob) { const reply = await this.processReply(data.content as ReplyDto, data.dsnpUserId, data.assetToMimeType); - await this.replyQueue.add(`Reply Job - ${data.id}`, reply, { jobId: data.id, removeOnFail: false, removeOnComplete: 2000 }); + await this.replyQueue.add(`Reply Job - ${data.id}`, reply, { + jobId: data.id, + removeOnFail: false, + removeOnComplete: 2000, + }); } private async queueReaction(data: IRequestJob) { const reaction = await this.processReaction(data.content as ReactionDto, data.dsnpUserId); - await this.reactionQueue.add(`Reaction Job - ${data.id}`, reaction, { jobId: data.id, removeOnFail: false, removeOnComplete: 2000 }); + await this.reactionQueue.add(`Reaction Job - ${data.id}`, reaction, { + jobId: data.id, + removeOnFail: false, + removeOnComplete: 2000, + }); } private async queueUpdate(data: IRequestJob) { const updateDto = data.content as UpdateDto; - const updateAnnouncementType: AnnouncementType = await this.getAnnouncementTypeFromModifiableAnnouncementType(updateDto.targetAnnouncementType); - const update = await this.processUpdate(updateDto, updateAnnouncementType, updateDto.targetContentHash ?? '', data.dsnpUserId, data.assetToMimeType); - await this.updateQueue.add(`Update Job - ${data.id}`, update, { jobId: data.id, removeOnFail: false, removeOnComplete: 2000 }); + const updateAnnouncementType: AnnouncementType = await this.getAnnouncementTypeFromModifiableAnnouncementType( + updateDto.targetAnnouncementType, + ); + const update = await this.processUpdate( + updateDto, + updateAnnouncementType, + updateDto.targetContentHash ?? '', + data.dsnpUserId, + data.assetToMimeType, + ); + await this.updateQueue.add(`Update Job - ${data.id}`, update, { + jobId: data.id, + removeOnFail: false, + removeOnComplete: 2000, + }); } private async queueProfile(data: IRequestJob) { const profile = await this.processProfile(data.content as ProfileDto, data.dsnpUserId, data.assetToMimeType); - await this.profileQueue.add(`Profile Job - ${data.id}`, profile, { jobId: data.id, removeOnFail: false, removeOnComplete: 2000 }); + await this.profileQueue.add(`Profile Job - ${data.id}`, profile, { + jobId: data.id, + removeOnFail: false, + removeOnComplete: 2000, + }); } private async queueTombstone(data: IRequestJob) { const tombStoneDto = data.content as TombstoneDto; - const announcementType: AnnouncementType = await this.getAnnouncementTypeFromModifiableAnnouncementType(tombStoneDto.targetAnnouncementType); + const announcementType: AnnouncementType = await this.getAnnouncementTypeFromModifiableAnnouncementType( + tombStoneDto.targetAnnouncementType, + ); const tombstone = createTombstone(data.dsnpUserId, announcementType, tombStoneDto.targetContentHash ?? ''); - await this.tombstoneQueue.add(`Tombstone Job - ${data.id}`, tombstone, { jobId: data.id, removeOnFail: false, removeOnComplete: 2000 }); + await this.tombstoneQueue.add(`Tombstone Job - ${data.id}`, tombstone, { + jobId: data.id, + removeOnFail: false, + removeOnComplete: 2000, + }); } - private async getAnnouncementTypeFromModifiableAnnouncementType(modifiableAnnouncementType: ModifiableAnnouncementTypeDto): Promise { + private async getAnnouncementTypeFromModifiableAnnouncementType( + modifiableAnnouncementType: ModifiableAnnouncementTypeDto, + ): Promise { this.logger.debug(`Getting announcement type from modifiable announcement type`); switch (modifiableAnnouncementType) { case ModifiableAnnouncementTypeDto.BROADCAST: @@ -150,7 +188,10 @@ export class DsnpAnnouncementProcessor { public async prepareNote(noteContent: any, assetToMimeType?: Map): Promise<[string, string, string]> { this.logger.debug(`Preparing note`); const tags: ActivityContentTag[] = this.prepareTags(noteContent?.content.tag); - const attachments: ActivityContentAttachment[] = await this.prepareAttachments(noteContent.content.assets, assetToMimeType); + const attachments: ActivityContentAttachment[] = await this.prepareAttachments( + noteContent.content.assets, + assetToMimeType, + ); const note = createNote(noteContent.content.content ?? '', new Date(noteContent.content.published ?? ''), { name: noteContent.content.name, @@ -188,7 +229,10 @@ export class DsnpAnnouncementProcessor { return tags; } - private async prepareAttachments(assetData?: any[], assetToMimeType?: Map): Promise { + private async prepareAttachments( + assetData?: any[], + assetToMimeType?: Map, + ): Promise { const attachments: ActivityContentAttachment[] = []; if (assetData) { const promises = assetData.map(async (asset) => { @@ -224,7 +268,10 @@ export class DsnpAnnouncementProcessor { }; } - private async prepareImageAttachment(asset: AssetDto, assetToMimeType?: Map): Promise { + private async prepareImageAttachment( + asset: AssetDto, + assetToMimeType?: Map, + ): Promise { const imageLinks: ActivityContentImageLink[] = []; if (asset.references) { const promises = asset.references.map(async (reference) => { @@ -254,7 +301,10 @@ export class DsnpAnnouncementProcessor { }; } - private async prepareVideoAttachment(asset: AssetDto, assetToMimeType?: Map): Promise { + private async prepareVideoAttachment( + asset: AssetDto, + assetToMimeType?: Map, + ): Promise { const videoLinks: ActivityContentVideoLink[] = []; let duration: string | undefined = ''; @@ -288,7 +338,10 @@ export class DsnpAnnouncementProcessor { }; } - private async prepareAudioAttachment(asset: AssetDto, assetToMimeType?: Map): Promise { + private async prepareAudioAttachment( + asset: AssetDto, + assetToMimeType?: Map, + ): Promise { const audioLinks: ActivityContentAudioLink[] = []; let duration = ''; if (asset.references) { @@ -320,13 +373,21 @@ export class DsnpAnnouncementProcessor { }; } - private async processBroadcast(content: BroadcastDto, dsnpUserId: string, assetToMimeType?: Map): Promise { + private async processBroadcast( + content: BroadcastDto, + dsnpUserId: string, + assetToMimeType?: Map, + ): Promise { this.logger.debug(`Processing broadcast`); const [cid, ipfsUrl, hash] = await this.prepareNote(content, assetToMimeType); return createBroadcast(dsnpUserId, ipfsUrl, hash); } - private async processReply(content: ReplyDto, dsnpUserId: string, assetToMimeType?: Map): Promise { + private async processReply( + content: ReplyDto, + dsnpUserId: string, + assetToMimeType?: Map, + ): Promise { this.logger.debug(`Processing reply for ${content.inReplyTo}`); const [cid, ipfsUrl, hash] = await this.prepareNote(content, assetToMimeType); return createReply(dsnpUserId, ipfsUrl, hash, content.inReplyTo); @@ -349,9 +410,16 @@ export class DsnpAnnouncementProcessor { return createUpdate(dsnpUserId, ipfsUrl, hash, targetAnnouncementType, targetContentHash); } - private async processProfile(content: ProfileDto, dsnpUserId: string, assetToMimeType?: Map): Promise { + private async processProfile( + content: ProfileDto, + dsnpUserId: string, + assetToMimeType?: Map, + ): Promise { this.logger.debug(`Processing profile`); - const attachments: ActivityContentImageLink[] = await this.prepareProfileIconAttachments(content.profile.icon ?? [], assetToMimeType); + const attachments: ActivityContentImageLink[] = await this.prepareProfileIconAttachments( + content.profile.icon ?? [], + assetToMimeType, + ); const profileActivity: ActivityContentProfile = { '@context': 'https://www.w3.org/ns/activitystreams', @@ -368,7 +436,10 @@ export class DsnpAnnouncementProcessor { return createProfile(dsnpUserId, this.formIpfsUrl(cid), hash); } - private async prepareProfileIconAttachments(icons: any[], assetToMimeType?: Map): Promise { + private async prepareProfileIconAttachments( + icons: any[], + assetToMimeType?: Map, + ): Promise { const attachments: ActivityContentImageLink[] = []; const promises = icons.map(async (icon) => { diff --git a/services/content-publishing/apps/worker/src/request_processor/request.processor.module.ts b/services/content-publishing/apps/worker/src/request_processor/request.processor.module.ts index 50ab35a7..27d3bc89 100644 --- a/services/content-publishing/apps/worker/src/request_processor/request.processor.module.ts +++ b/services/content-publishing/apps/worker/src/request_processor/request.processor.module.ts @@ -2,78 +2,14 @@ https://docs.nestjs.com/modules */ -import { BullModule } from '@nestjs/bullmq'; import { Module } from '@nestjs/common'; -import { RedisModule } from '@songkeys/nestjs-redis'; -import { ConfigModule } from '../../../../libs/common/src/config/config.module'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import * as QueueConstants from '../../../../libs/common/src'; import { RequestProcessorService } from './request.processor.service'; import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client'; import { DsnpAnnouncementProcessor } from './dsnp.announcement.processor'; @Module({ - imports: [ - ConfigModule, - RedisModule.forRootAsync( - { - imports: [ConfigModule], - useFactory: (configService: ConfigService) => ({ - config: [{ url: configService.redisUrl.toString() }], - }), - inject: [ConfigService], - }, - true, // isGlobal - ), - BullModule.forRootAsync({ - imports: [ConfigModule], - useFactory: (configService: ConfigService) => { - // Note: BullMQ doesn't honor a URL for the Redis connection, and - // JS URL doesn't parse 'redis://' as a valid protocol, so we fool - // it by changing the URL to use 'http://' in order to parse out - // the host, port, username, password, etc. - // We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but - // trying to keep the # of environment variables from proliferating - const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http')); - const { hostname, port, username, password, pathname } = url; - return { - connection: { - host: hostname || undefined, - port: port ? Number(port) : undefined, - username: username || undefined, - password: password || undefined, - db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined, - }, - }; - }, - inject: [ConfigService], - }), - BullModule.registerQueue({ - name: QueueConstants.ASSET_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.REQUEST_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.BROADCAST_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.REPLY_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.REACTION_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.TOMBSTONE_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.UPDATE_QUEUE_NAME, - }), - BullModule.registerQueue({ - name: QueueConstants.PROFILE_QUEUE_NAME, - }), - ], + imports: [], providers: [RequestProcessorService, IpfsService, DsnpAnnouncementProcessor], - exports: [BullModule, RequestProcessorService, IpfsService, DsnpAnnouncementProcessor], + exports: [RequestProcessorService, IpfsService, DsnpAnnouncementProcessor], }) export class RequestProcessorModule {} diff --git a/services/content-publishing/apps/worker/src/request_processor/request.processor.service.ts b/services/content-publishing/apps/worker/src/request_processor/request.processor.service.ts index 89e8c278..ac3ef54d 100644 --- a/services/content-publishing/apps/worker/src/request_processor/request.processor.service.ts +++ b/services/content-publishing/apps/worker/src/request_processor/request.processor.service.ts @@ -1,20 +1,18 @@ -import { InjectRedis } from '@songkeys/nestjs-redis'; import { Processor } from '@nestjs/bullmq'; import { Injectable } from '@nestjs/common'; import { DelayedError, Job } from 'bullmq'; -import Redis from 'ioredis'; import { MILLISECONDS_PER_SECOND } from 'time-constants'; -import { ConfigService } from '../../../../libs/common/src/config/config.service'; -import { IRequestJob, REQUEST_QUEUE_NAME } from '../../../../libs/common/src'; -import { IpfsService } from '../../../../libs/common/src/utils/ipfs.client'; -import { DsnpAnnouncementProcessor } from './dsnp.announcement.processor'; +import { ConfigService } from '#libs/config'; +import { IRequestJob } from '#libs/interfaces'; +import { REQUEST_QUEUE_NAME } from '#libs/queues/queue.constants'; +import { IpfsService } from '#libs/utils/ipfs.client'; import { BaseConsumer } from '../BaseConsumer'; +import { DsnpAnnouncementProcessor } from './dsnp.announcement.processor'; @Injectable() @Processor(REQUEST_QUEUE_NAME) export class RequestProcessorService extends BaseConsumer { constructor( - @InjectRedis() private cacheManager: Redis, private dsnpAnnouncementProcessor: DsnpAnnouncementProcessor, private configService: ConfigService, private ipfsService: IpfsService, @@ -47,7 +45,8 @@ export class RequestProcessorService extends BaseConsumer { data.dependencyAttempt += 1; if (data.dependencyAttempt <= 3) { // exponential backoff - const delayedTime = 2 ** data.dependencyAttempt * this.configService.assetUploadVerificationDelaySeconds * MILLISECONDS_PER_SECOND; + const delayedTime = + 2 ** data.dependencyAttempt * this.configService.assetUploadVerificationDelaySeconds * MILLISECONDS_PER_SECOND; await job.moveToDelayed(Date.now() + delayedTime, job.token); await job.updateData(data); throw new DelayedError(); diff --git a/services/content-publishing/apps/worker/src/worker.module.ts b/services/content-publishing/apps/worker/src/worker.module.ts index f950eb7a..f7fd8da7 100644 --- a/services/content-publishing/apps/worker/src/worker.module.ts +++ b/services/content-publishing/apps/worker/src/worker.module.ts @@ -1,26 +1,19 @@ import { Module } from '@nestjs/common'; -import { BullModule } from '@nestjs/bullmq'; import { ScheduleModule } from '@nestjs/schedule'; import { EventEmitterModule } from '@nestjs/event-emitter'; import { RedisModule } from '@songkeys/nestjs-redis'; -import { PublishingService } from './publisher/publishing.service'; import { PublisherModule } from './publisher/publisher.module'; -import { BlockchainModule } from '../../../libs/common/src/blockchain/blockchain.module'; -import { BatchAnnouncementService } from './batch_announcer/batch.announcer.service'; +import { QueuesModule } from '#libs/queues'; +import { ConfigModule, ConfigService } from '#libs/config'; +import { BlockchainModule } from '#libs/blockchain/blockchain.module'; +import { AssetProcessorModule } from './asset_processor/asset.processor.module'; import { BatchAnnouncerModule } from './batch_announcer/batch.announcer.module'; +import { BatchingProcessorModule } from './batching_processor/batching.processor.module'; import { StatusMonitorModule } from './monitor/status.monitor.module'; -import { TxStatusMonitoringService } from './monitor/tx.status.monitor.service'; -import { AssetProcessorModule } from './asset_processor/asset.processor.module'; -import { AssetProcessorService } from './asset_processor/asset.processor.service'; import { RequestProcessorModule } from './request_processor/request.processor.module'; -import { RequestProcessorService } from './request_processor/request.processor.service'; -import { BatchingProcessorModule } from './batching_processor/batching.processor.module'; -import { ConfigModule } from '../../../libs/common/src/config/config.module'; -import { ConfigService } from '../../../libs/common/src/config/config.service'; @Module({ imports: [ - BullModule, ConfigModule, RedisModule.forRootAsync( { @@ -32,6 +25,7 @@ import { ConfigService } from '../../../libs/common/src/config/config.service'; }, true, // isGlobal ), + QueuesModule, EventEmitterModule.forRoot({ // Use this instance throughout the application global: true, @@ -59,6 +53,6 @@ import { ConfigService } from '../../../libs/common/src/config/config.service'; RequestProcessorModule, BatchingProcessorModule, ], - providers: [BatchAnnouncementService, ConfigService, PublishingService, TxStatusMonitoringService, AssetProcessorService, RequestProcessorService], + providers: [], }) export class WorkerModule {} diff --git a/services/content-publishing/libs/common/src/blockchain/blockchain-constants.ts b/services/content-publishing/libs/common/src/blockchain/blockchain-constants.ts index d01a7e04..49fcd62e 100644 --- a/services/content-publishing/libs/common/src/blockchain/blockchain-constants.ts +++ b/services/content-publishing/libs/common/src/blockchain/blockchain-constants.ts @@ -9,4 +9,4 @@ * @description * The number of blocks to crawl for a given job */ -export const NUMBER_BLOCKS_TO_CRAWL = 32n; // TODO: take from tx, keeping it constant to default tx mortality +export const NUMBER_BLOCKS_TO_CRAWL = 32; // TODO: take from tx, keeping it constant to default tx mortality diff --git a/services/content-publishing/libs/common/src/blockchain/blockchain.module.ts b/services/content-publishing/libs/common/src/blockchain/blockchain.module.ts index facacf9c..e198e958 100644 --- a/services/content-publishing/libs/common/src/blockchain/blockchain.module.ts +++ b/services/content-publishing/libs/common/src/blockchain/blockchain.module.ts @@ -1,13 +1,8 @@ -/* -https://docs.nestjs.com/modules -*/ - import { Module } from '@nestjs/common'; import { BlockchainService } from './blockchain.service'; -import { ConfigModule } from '../config/config.module'; @Module({ - imports: [ConfigModule], + imports: [], controllers: [], providers: [BlockchainService], exports: [BlockchainService], diff --git a/services/content-publishing/libs/common/src/blockchain/blockchain.service.ts b/services/content-publishing/libs/common/src/blockchain/blockchain.service.ts index c406bc0e..5d118125 100644 --- a/services/content-publishing/libs/common/src/blockchain/blockchain.service.ts +++ b/services/content-publishing/libs/common/src/blockchain/blockchain.service.ts @@ -1,16 +1,16 @@ /* eslint-disable no-underscore-dangle */ import { Injectable, Logger, OnApplicationBootstrap, OnApplicationShutdown } from '@nestjs/common'; import { ApiPromise, ApiRx, HttpProvider, WsProvider } from '@polkadot/api'; -import { firstValueFrom, from } from 'rxjs'; +import { firstValueFrom } from 'rxjs'; import { options } from '@frequency-chain/api-augment'; import { KeyringPair } from '@polkadot/keyring/types'; -import { BlockHash, BlockNumber, DispatchError, DispatchInfo, Hash, SignedBlock } from '@polkadot/types/interfaces'; +import { BlockHash, BlockNumber, Hash, SignedBlock } from '@polkadot/types/interfaces'; import { SubmittableExtrinsic } from '@polkadot/api/types'; import { AnyNumber, ISubmittableResult, RegistryError } from '@polkadot/types/types'; -import { u32, Option, u128, Bytes } from '@polkadot/types'; +import { u32, Option, Bytes } from '@polkadot/types'; import { PalletCapacityCapacityDetails, PalletCapacityEpochInfo } from '@polkadot/types/lookup'; -import { ConfigService } from '../config/config.service'; import { Extrinsic } from './extrinsic'; +import { ConfigService } from '#libs/config'; @Injectable() export class BlockchainService implements OnApplicationBootstrap, OnApplicationShutdown { @@ -23,8 +23,9 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS private logger: Logger; public async onApplicationBootstrap() { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const providerUrl = this.configService.frequencyUrl!; - let provider: any; + let provider: WsProvider | HttpProvider; if (/^ws/.test(providerUrl.toString())) { provider = new WsProvider(providerUrl.toString()); } else if (/^http/.test(providerUrl.toString())) { @@ -39,8 +40,8 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS this.logger.log('Blockchain API ready.'); } - public async onApplicationShutdown(signal?: string | undefined) { - const promises: Promise[] = []; + public async onApplicationShutdown(_signal?: string | undefined) { + const promises: Promise[] = []; if (this.api) { promises.push(this.api.disconnect()); } @@ -68,8 +69,8 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS return (await this.apiPromise.rpc.chain.getFinalizedHead()) as BlockHash; } - public async getLatestFinalizedBlockNumber(): Promise { - return (await this.apiPromise.rpc.chain.getBlock()).block.header.number.toBigInt(); + public async getLatestFinalizedBlockNumber(): Promise { + return (await this.apiPromise.rpc.chain.getBlock()).block.header.number.toNumber(); } public async getBlockNumberForHash(hash: string): Promise { @@ -86,7 +87,10 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS return this.api.registry.createType(type, ...args); } - public createExtrinsicCall({ pallet, extrinsic }: { pallet: string; extrinsic: string }, ...args: (any | undefined)[]): SubmittableExtrinsic<'rxjs', ISubmittableResult> { + public createExtrinsicCall( + { pallet, extrinsic }: { pallet: string; extrinsic: string }, + ...args: (any | undefined)[] + ): SubmittableExtrinsic<'rxjs', ISubmittableResult> { return this.api.tx[pallet][extrinsic](...args); } @@ -108,7 +112,12 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS return args ? this.apiPromise.query[pallet][extrinsic](...args) : this.apiPromise.query[pallet][extrinsic](); } - public async queryAt(blockHash: BlockHash, pallet: string, extrinsic: string, ...args: (any | undefined)[]): Promise { + public async queryAt( + blockHash: BlockHash, + pallet: string, + extrinsic: string, + ...args: (any | undefined)[] + ): Promise { const newApi = await this.apiPromise.at(blockHash); return newApi.query[pallet][extrinsic](...args); } @@ -124,8 +133,15 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS const providerU64 = this.api.createType('u64', providerId); const { epochStart }: PalletCapacityEpochInfo = await this.query('capacity', 'currentEpochInfo'); const epochBlockLength: u32 = await this.query('capacity', 'epochLength'); - const capacityDetailsOption: Option = await this.query('capacity', 'capacityLedger', providerU64); - const { remainingCapacity, totalCapacityIssued } = capacityDetailsOption.unwrapOr({ remainingCapacity: 0, totalCapacityIssued: 0 }); + const capacityDetailsOption: Option = await this.query( + 'capacity', + 'capacityLedger', + providerU64, + ); + const { remainingCapacity, totalCapacityIssued } = capacityDetailsOption.unwrapOr({ + remainingCapacity: 0, + totalCapacityIssued: 0, + }); const currentBlock: u32 = await this.query('system', 'number'); const currentEpoch = await this.getCurrentCapacityEpoch(); return { @@ -133,8 +149,10 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS providerId, currentBlockNumber: currentBlock.toNumber(), nextEpochStart: epochStart.add(epochBlockLength).toNumber(), - remainingCapacity: typeof remainingCapacity === 'number' ? BigInt(remainingCapacity) : remainingCapacity.toBigInt(), - totalCapacityIssued: typeof totalCapacityIssued === 'number' ? BigInt(totalCapacityIssued) : totalCapacityIssued.toBigInt(), + remainingCapacity: + typeof remainingCapacity === 'number' ? BigInt(remainingCapacity) : remainingCapacity.toBigInt(), + totalCapacityIssued: + typeof totalCapacityIssued === 'number' ? BigInt(totalCapacityIssued) : totalCapacityIssued.toBigInt(), }; } @@ -163,65 +181,87 @@ export class BlockchainService implements OnApplicationBootstrap, OnApplicationS public async crawlBlockListForTx( txHash: Hash, - blockList: bigint[], + blockList: number[], successEvents: [{ pallet: string; event: string }], - ): Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityEpoch?: number; capacityWithdrawn?: bigint; error?: RegistryError }> { - const txReceiptPromises: Promise<{ found: boolean; success: boolean; blockHash?: BlockHash; capacityWithdrawn?: bigint; error?: RegistryError }>[] = blockList.map( - async (blockNumber) => { - const blockHash = await this.getBlockHash(blockNumber); - const block = await this.getBlock(blockHash); - const txIndex = block.block.extrinsics.findIndex((extrinsic) => extrinsic.hash.toString() === txHash.toString()); - - if (txIndex === -1) { - return { found: false, success: false }; - } - - this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`); - const at = await this.apiPromise.at(blockHash.toHex()); - const capacityEpoch = (await at.query.capacity.currentEpoch()).toNumber(); - const eventsPromise = at.query.system.events(); - - let isTxSuccess = false; - let totalBlockCapacity = 0n; - let txError: RegistryError | undefined; - - try { - const events = (await eventsPromise).filter(({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex)); - - events.forEach((record) => { - const { event } = record; - const eventName = event.section; - const { method } = event; - const { data } = event; - this.logger.debug(`Received event: ${eventName} ${method} ${data}`); - - // find capacity withdrawn event - if (at.events.capacity.CapacityWithdrawn.is(event)) { - totalBlockCapacity += event.data.amount.toBigInt(); - } - - // check custom success events - if (successEvents.find((successEvent) => successEvent.pallet === eventName && successEvent.event === method)) { - this.logger.debug(`Found success event ${eventName} ${method}`); - isTxSuccess = true; - } - - // check for system extrinsic failure - if (at.events.system.ExtrinsicFailed.is(event)) { - const { dispatchError } = event.data; - const moduleThatErrored = dispatchError.asModule; - const moduleError = dispatchError.registry.findMetaError(moduleThatErrored); - txError = moduleError; - this.logger.error(`Extrinsic failed with error: ${JSON.stringify(moduleError)}`); - } - }); - } catch (error) { - this.logger.error(error); - } - this.logger.debug(`Total capacity withdrawn in block: ${totalBlockCapacity.toString()}`); - return { found: true, success: isTxSuccess, blockHash, capacityEpoch, capacityWithDrawn: totalBlockCapacity, error: txError }; - }, - ); + ): Promise<{ + found: boolean; + success: boolean; + blockHash?: BlockHash; + capacityEpoch?: number; + capacityWithdrawn?: bigint; + error?: RegistryError; + }> { + const txReceiptPromises: Promise<{ + found: boolean; + success: boolean; + blockHash?: BlockHash; + capacityWithdrawn?: bigint; + error?: RegistryError; + }>[] = blockList.map(async (blockNumber) => { + const blockHash = await this.getBlockHash(blockNumber); + const block = await this.getBlock(blockHash); + const txIndex = block.block.extrinsics.findIndex((extrinsic) => extrinsic.hash.toString() === txHash.toString()); + + if (txIndex === -1) { + return { found: false, success: false }; + } + + this.logger.verbose(`Found tx ${txHash} in block ${blockNumber}`); + const at = await this.apiPromise.at(blockHash.toHex()); + const capacityEpoch = (await at.query.capacity.currentEpoch()).toNumber(); + const eventsPromise = at.query.system.events(); + + let isTxSuccess = false; + let totalBlockCapacity = 0n; + let txError: RegistryError | undefined; + + try { + const events = (await eventsPromise).filter( + ({ phase }) => phase.isApplyExtrinsic && phase.asApplyExtrinsic.eq(txIndex), + ); + + events.forEach((record) => { + const { event } = record; + const eventName = event.section; + const { method } = event; + const { data } = event; + this.logger.debug(`Received event: ${eventName} ${method} ${data}`); + + // find capacity withdrawn event + if (at.events.capacity.CapacityWithdrawn.is(event)) { + totalBlockCapacity += event.data.amount.toBigInt(); + } + + // check custom success events + if ( + successEvents.find((successEvent) => successEvent.pallet === eventName && successEvent.event === method) + ) { + this.logger.debug(`Found success event ${eventName} ${method}`); + isTxSuccess = true; + } + + // check for system extrinsic failure + if (at.events.system.ExtrinsicFailed.is(event)) { + const { dispatchError } = event.data; + const moduleThatErrored = dispatchError.asModule; + const moduleError = dispatchError.registry.findMetaError(moduleThatErrored); + txError = moduleError; + this.logger.error(`Extrinsic failed with error: ${JSON.stringify(moduleError)}`); + } + }); + } catch (error) { + this.logger.error(error); + } + this.logger.debug(`Total capacity withdrawn in block: ${totalBlockCapacity.toString()}`); + return { + found: true, + success: isTxSuccess, + blockHash, + capacityEpoch, + capacityWithDrawn: totalBlockCapacity, + error: txError, + }; + }); const results = await Promise.all(txReceiptPromises); const result = results.find((receipt) => receipt.found); this.logger.debug(`Found tx receipt: ${JSON.stringify(result)}`); diff --git a/services/content-publishing/libs/common/src/blockchain/extrinsic.ts b/services/content-publishing/libs/common/src/blockchain/extrinsic.ts index cf4a77d1..ebe18b80 100644 --- a/services/content-publishing/libs/common/src/blockchain/extrinsic.ts +++ b/services/content-publishing/libs/common/src/blockchain/extrinsic.ts @@ -25,13 +25,12 @@ */ import { ApiRx } from '@polkadot/api'; -import { SubmittableExtrinsic, ApiTypes, AugmentedEvent } from '@polkadot/api/types'; +import { SubmittableExtrinsic } from '@polkadot/api/types'; import { Call, Event, EventRecord, Hash } from '@polkadot/types/interfaces'; import { IsEvent } from '@polkadot/types/metadata/decorate/types'; -import { Codec, ISubmittableResult, AnyTuple } from '@polkadot/types/types'; -import { filter, firstValueFrom, map, pipe, tap } from 'rxjs'; +import { Codec, ISubmittableResult } from '@polkadot/types/types'; +import { firstValueFrom } from 'rxjs'; import { KeyringPair } from '@polkadot/keyring/types'; -import { EventError } from './event-error'; export type EventMap = Record; diff --git a/services/content-publishing/libs/common/src/config/config.module.ts b/services/content-publishing/libs/common/src/config/config.module.ts index 0b28661f..90bb1047 100644 --- a/services/content-publishing/libs/common/src/config/config.module.ts +++ b/services/content-publishing/libs/common/src/config/config.module.ts @@ -1,8 +1,9 @@ import { ConfigModule as NestConfigModule } from '@nestjs/config'; -import { Module } from '@nestjs/common'; +import { Global, Module } from '@nestjs/common'; import { ConfigService } from './config.service'; import { configModuleOptions } from './env.config'; +@Global() @Module({ imports: [NestConfigModule.forRoot(configModuleOptions)], controllers: [], diff --git a/services/content-publishing/libs/common/src/config/config.service.ts b/services/content-publishing/libs/common/src/config/config.service.ts index 9a1548bd..bb626254 100644 --- a/services/content-publishing/libs/common/src/config/config.service.ts +++ b/services/content-publishing/libs/common/src/config/config.service.ts @@ -1,11 +1,7 @@ -/* -https://docs.nestjs.com/providers#services -*/ - import { Injectable } from '@nestjs/common'; import { ConfigService as NestConfigService } from '@nestjs/config'; -import { ChainEnvironment } from '..'; import { ICapacityLimit } from '../interfaces/capacity-limit.interface'; +import { ChainEnvironment } from '#libs/dtos'; export interface ConfigEnvironmentVariables { CHAIN_ENVIRONMENT: ChainEnvironment; diff --git a/services/content-publishing/libs/common/src/config/env.config.ts b/services/content-publishing/libs/common/src/config/env.config.ts index 4f559bb0..b22d3f4c 100644 --- a/services/content-publishing/libs/common/src/config/env.config.ts +++ b/services/content-publishing/libs/common/src/config/env.config.ts @@ -1,6 +1,6 @@ import Joi from 'joi'; import { ConfigModuleOptions } from '@nestjs/config'; -import { ChainEnvironment } from '..'; +import { ChainEnvironment } from '#libs/dtos'; export const configModuleOptions: ConfigModuleOptions = { isGlobal: true, @@ -41,7 +41,11 @@ export const configModuleOptions: ConfigModuleOptions = { .required() .pattern(/^(percentage|amount)$/), value: Joi.alternatives() - .conditional('type', { is: 'percentage', then: Joi.number().min(0).max(100), otherwise: Joi.number().min(0) }) + .conditional('type', { + is: 'percentage', + then: Joi.number().min(0).max(100), + otherwise: Joi.number().min(0), + }) .required(), }); const result = schema.validate(obj); diff --git a/services/content-publishing/libs/common/src/config/index.ts b/services/content-publishing/libs/common/src/config/index.ts new file mode 100644 index 00000000..4bb549e2 --- /dev/null +++ b/services/content-publishing/libs/common/src/config/index.ts @@ -0,0 +1,2 @@ +export * from './config.module'; +export * from './config.service'; diff --git a/services/content-publishing/libs/common/src/dtos/activity.dto.ts b/services/content-publishing/libs/common/src/dtos/activity.dto.ts index d3835b15..053cb24c 100644 --- a/services/content-publishing/libs/common/src/dtos/activity.dto.ts +++ b/services/content-publishing/libs/common/src/dtos/activity.dto.ts @@ -24,7 +24,7 @@ import { } from 'class-validator'; import { Type } from 'class-transformer'; import { DURATION_REGEX } from './validation.dto'; -import { IsDsnpUserURI } from '../utils/dsnp-validation.decorator'; +import { IsDsnpUserURI } from '#libs/utils/dsnp-validation.decorator'; // eslint-disable-next-line no-shadow export enum UnitTypeDto { diff --git a/services/content-publishing/libs/common/src/dtos/announcement.dto.ts b/services/content-publishing/libs/common/src/dtos/announcement.dto.ts index 302c19b2..cfbf3b80 100644 --- a/services/content-publishing/libs/common/src/dtos/announcement.dto.ts +++ b/services/content-publishing/libs/common/src/dtos/announcement.dto.ts @@ -6,7 +6,7 @@ import { IsEnum, IsInt, IsNotEmpty, IsString, Matches, Max, Min, MinLength, Vali import { Type } from 'class-transformer'; import { NoteActivityDto, ProfileActivityDto } from './activity.dto'; import { DSNP_EMOJI_REGEX } from './validation.dto'; -import { IsDsnpContentHash, IsDsnpContentURI } from '../utils/dsnp-validation.decorator'; +import { IsDsnpContentURI, IsDsnpContentHash } from '#libs/utils/dsnp-validation.decorator'; // eslint-disable-next-line no-shadow export enum ModifiableAnnouncementTypeDto { diff --git a/services/content-publishing/libs/common/src/dtos/index.ts b/services/content-publishing/libs/common/src/dtos/index.ts new file mode 100644 index 00000000..c790d133 --- /dev/null +++ b/services/content-publishing/libs/common/src/dtos/index.ts @@ -0,0 +1,4 @@ +export * from './activity.dto'; +export * from './announcement.dto'; +export * from './common.dto'; +export * from './validation.dto'; diff --git a/services/content-publishing/libs/common/src/index.ts b/services/content-publishing/libs/common/src/index.ts deleted file mode 100644 index d12f6422..00000000 --- a/services/content-publishing/libs/common/src/index.ts +++ /dev/null @@ -1,6 +0,0 @@ -export * from './dtos/announcement.dto'; -export * from './dtos/activity.dto'; -export * from './dtos/common.dto'; -export * from './dtos/validation.dto'; -export * from './interfaces/request-job.interface'; -export * from './utils/queues'; diff --git a/services/content-publishing/libs/common/src/interfaces/index.ts b/services/content-publishing/libs/common/src/interfaces/index.ts new file mode 100644 index 00000000..68e5b9f7 --- /dev/null +++ b/services/content-publishing/libs/common/src/interfaces/index.ts @@ -0,0 +1,5 @@ +export * from './asset-job.interface'; +export * from './batch.interface'; +export * from './capacity-limit.interface'; +export * from './dsnp'; +export * from './request-job.interface'; diff --git a/services/content-publishing/libs/common/src/interfaces/request-job.interface.ts b/services/content-publishing/libs/common/src/interfaces/request-job.interface.ts index b75b2b7d..c2822a92 100644 --- a/services/content-publishing/libs/common/src/interfaces/request-job.interface.ts +++ b/services/content-publishing/libs/common/src/interfaces/request-job.interface.ts @@ -1,5 +1,4 @@ -import { RequestTypeDto } from '../dtos/announcement.dto'; -import { AnnouncementTypeDto } from '../dtos/common.dto'; +import { AnnouncementTypeDto, RequestTypeDto } from '#libs/dtos'; export interface IRequestJob { id: string; diff --git a/services/content-publishing/libs/common/src/queues/index.ts b/services/content-publishing/libs/common/src/queues/index.ts new file mode 100644 index 00000000..e217efc3 --- /dev/null +++ b/services/content-publishing/libs/common/src/queues/index.ts @@ -0,0 +1,2 @@ +export * as QueueConstants from './queue.constants'; +export * from './queues.module'; diff --git a/services/content-publishing/libs/common/src/utils/queues.ts b/services/content-publishing/libs/common/src/queues/queue.constants.ts similarity index 97% rename from services/content-publishing/libs/common/src/utils/queues.ts rename to services/content-publishing/libs/common/src/queues/queue.constants.ts index cfa4aeff..8075ec1f 100644 --- a/services/content-publishing/libs/common/src/utils/queues.ts +++ b/services/content-publishing/libs/common/src/queues/queue.constants.ts @@ -1,4 +1,4 @@ -import { AnnouncementTypeDto } from '../dtos/common.dto'; +import { AnnouncementTypeDto } from '#libs/dtos'; /** * Name of the queue that has all incoming asset uploads diff --git a/services/content-publishing/libs/common/src/queues/queues.module.ts b/services/content-publishing/libs/common/src/queues/queues.module.ts new file mode 100644 index 00000000..e34ec268 --- /dev/null +++ b/services/content-publishing/libs/common/src/queues/queues.module.ts @@ -0,0 +1,96 @@ +import { ConfigService } from '#libs/config/config.service'; +import { BullModule } from '@nestjs/bullmq'; +import { Global, Module } from '@nestjs/common'; +import * as QueueConstants from './queue.constants'; + +@Global() +@Module({ + imports: [ + BullModule.forRootAsync({ + useFactory: (configService: ConfigService) => { + // Note: BullMQ doesn't honor a URL for the Redis connection, and + // JS URL doesn't parse 'redis://' as a valid protocol, so we fool + // it by changing the URL to use 'http://' in order to parse out + // the host, port, username, password, etc. + // We could pass REDIS_HOST, REDIS_PORT, etc, in the environment, but + // trying to keep the # of environment variables from proliferating + const url = new URL(configService.redisUrl.toString().replace(/^redis[s]*/, 'http')); + const { hostname, port, username, password, pathname } = url; + return { + connection: { + host: hostname || undefined, + port: port ? Number(port) : undefined, + username: username || undefined, + password: password || undefined, + db: pathname?.length > 1 ? Number(pathname.slice(1)) : undefined, + }, + }; + }, + inject: [ConfigService], + }), + BullModule.registerQueue( + { + name: QueueConstants.ASSET_QUEUE_NAME, + }, + { + name: QueueConstants.REQUEST_QUEUE_NAME, + }, + { + name: QueueConstants.BROADCAST_QUEUE_NAME, + }, + { + name: QueueConstants.REPLY_QUEUE_NAME, + }, + { + name: QueueConstants.REACTION_QUEUE_NAME, + }, + { + name: QueueConstants.TOMBSTONE_QUEUE_NAME, + }, + { + name: QueueConstants.UPDATE_QUEUE_NAME, + }, + { + name: QueueConstants.PROFILE_QUEUE_NAME, + }, + { + name: QueueConstants.BATCH_QUEUE_NAME, + defaultJobOptions: { + attempts: 1, + backoff: { + type: 'exponential', + }, + removeOnComplete: true, + removeOnFail: false, + }, + }, + { + name: QueueConstants.PUBLISH_QUEUE_NAME, + defaultJobOptions: { + attempts: 1, + backoff: { + type: 'exponential', + }, + removeOnComplete: true, + removeOnFail: false, + }, + }, + { + name: QueueConstants.TRANSACTION_RECEIPT_QUEUE_NAME, + defaultJobOptions: { + attempts: 3, + backoff: { + type: 'exponential', + }, + removeOnComplete: false, + removeOnFail: false, + }, + }, + { + name: QueueConstants.STATUS_QUEUE_NAME, + }, + ), + ], + exports: [BullModule], +}) +export class QueuesModule {} diff --git a/services/content-publishing/libs/common/src/utils/dsnp.schema.ts b/services/content-publishing/libs/common/src/utils/dsnp.schema.ts index 7a274614..d201330b 100644 --- a/services/content-publishing/libs/common/src/utils/dsnp.schema.ts +++ b/services/content-publishing/libs/common/src/utils/dsnp.schema.ts @@ -1,4 +1,4 @@ -import { AnnouncementTypeDto, ChainEnvironment } from '../dtos/common.dto'; +import { AnnouncementTypeDto, ChainEnvironment } from '#libs/dtos'; /** * Map between announcement type and it's DSNP schema id for DEV environment diff --git a/services/content-publishing/libs/common/src/utils/ipfs.client.ts b/services/content-publishing/libs/common/src/utils/ipfs.client.ts index c7547990..667da524 100644 --- a/services/content-publishing/libs/common/src/utils/ipfs.client.ts +++ b/services/content-publishing/libs/common/src/utils/ipfs.client.ts @@ -9,7 +9,7 @@ import { blake2b256 as hasher } from '@multiformats/blake2/blake2b'; import { create } from 'multiformats/hashes/digest'; import { randomUUID } from 'crypto'; import { base58btc } from 'multiformats/bases/base58'; -import { ConfigService } from '../config/config.service'; +import { ConfigService } from '#libs/config'; export interface FilePin { cid: string; @@ -37,7 +37,10 @@ export class IpfsService { const ipfsAuthUser = this.configService.ipfsBasicAuthUser; const ipfsAuthSecret = this.configService.ipfsBasicAuthSecret; - const ipfsAuth = ipfsAuthUser && ipfsAuthSecret ? `Basic ${Buffer.from(`${ipfsAuthUser}:${ipfsAuthSecret}`).toString('base64')}` : ''; + const ipfsAuth = + ipfsAuthUser && ipfsAuthSecret + ? `Basic ${Buffer.from(`${ipfsAuthUser}:${ipfsAuthSecret}`).toString('base64')}` + : ''; const headers = { 'Content-Type': `multipart/form-data; boundary=${form.getBoundary()}`, @@ -90,7 +93,10 @@ export class IpfsService { const ipfsGet = `${this.configService.ipfsEndpoint}/api/v0/cat?arg=${cid}`; const ipfsAuthUser = this.configService.ipfsBasicAuthUser; const ipfsAuthSecret = this.configService.ipfsBasicAuthSecret; - const ipfsAuth = ipfsAuthUser && ipfsAuthSecret ? `Basic ${Buffer.from(`${ipfsAuthUser}:${ipfsAuthSecret}`).toString('base64')}` : ''; + const ipfsAuth = + ipfsAuthUser && ipfsAuthSecret + ? `Basic ${Buffer.from(`${ipfsAuthUser}:${ipfsAuthSecret}`).toString('base64')}` + : ''; const headers = { Accept: '*/*', @@ -110,7 +116,10 @@ export class IpfsService { const ipfsGet = `${this.configService.ipfsEndpoint}/api/v0/pin/ls?type=all&quiet=true&arg=${v0Cid}`; const ipfsAuthUser = this.configService.ipfsBasicAuthUser; const ipfsAuthSecret = this.configService.ipfsBasicAuthSecret; - const ipfsAuth = ipfsAuthUser && ipfsAuthSecret ? `Basic ${Buffer.from(`${ipfsAuthUser}:${ipfsAuthSecret}`).toString('base64')}` : ''; + const ipfsAuth = + ipfsAuthUser && ipfsAuthSecret + ? `Basic ${Buffer.from(`${ipfsAuthUser}:${ipfsAuthSecret}`).toString('base64')}` + : ''; const headers = { Accept: '*/*', diff --git a/services/content-publishing/libs/common/src/utils/processing.ts b/services/content-publishing/libs/common/src/utils/processing.ts index b29ad2d1..28bb031e 100644 --- a/services/content-publishing/libs/common/src/utils/processing.ts +++ b/services/content-publishing/libs/common/src/utils/processing.ts @@ -1,6 +1,6 @@ export const MAX_WAIT_FOR_GRACE_FULL_SHUTDOWN_MS = 6 * 1000; export const DELAY_TO_CHECK_FOR_SHUTDOWN_MS = 300; -export async function delay(ms): Promise { +export async function delay(ms): Promise { // eslint-disable-next-line no-promise-executor-return return new Promise((resolve) => setTimeout(resolve, ms)); } diff --git a/services/content-publishing/tsconfig.json b/services/content-publishing/tsconfig.json index f8ebc6d9..718e40e2 100644 --- a/services/content-publishing/tsconfig.json +++ b/services/content-publishing/tsconfig.json @@ -4,15 +4,48 @@ "baseUrl": ".", "outDir": "dist", "paths": { - "#app/*": [ - "*" - ], - "@app/common": [ + "#libs": [ "libs/common/src" ], - "@app/common/*": [ + "#libs/*": [ "libs/common/src/*" - ] + ], + "#libs/blockchain": [ + "libs/common/src/blockchain" + ], + "#libs/blockchain/*": [ + "libs/common/src/blockchain/*" + ], + "#libs/config": [ + "libs/common/src/config" + ], + "#libs/config/*": [ + "libs/common/src/config/*" + ], + "#libs/dtos": [ + "libs/common/src/dtos" + ], + "#libs/dtos/*": [ + "libs/common/src/dtos/*" + ], + "#libs/interfaces": [ + "libs/common/src/interfaces" + ], + "#libs/interfaces/*": [ + "libs/common/src/interfaces/*" + ], + "#libs/queues": [ + "libs/common/src/queues" + ], + "#libs/queues/*": [ + "libs/common/src/queues/*" + ], + "#libs/utils": [ + "libs/common/src/utils" + ], + "#libs/utils/*": [ + "libs/common/src/utils/*" + ], }, "typeRoots": [ "node_modules/@types"