diff --git a/services/content-watcher/libs/common/src/ipfs/ipfs.dsnp.ts b/services/content-watcher/libs/common/src/ipfs/ipfs.dsnp.ts index 47316c89..4de616e3 100644 --- a/services/content-watcher/libs/common/src/ipfs/ipfs.dsnp.ts +++ b/services/content-watcher/libs/common/src/ipfs/ipfs.dsnp.ts @@ -1,26 +1,16 @@ import { Injectable, Logger } from '@nestjs/common'; -import { Job, JobsOptions, Queue } from 'bullmq'; +import { Job, Queue } from 'bullmq'; import { InjectQueue, Processor } from '@nestjs/bullmq'; import { hexToString } from '@polkadot/util'; import parquet from '@dsnp/parquetjs'; -import { bases } from 'multiformats/basics'; import { AppConfigService } from '../config/config.service'; import { calculateJobId } from '..'; import * as QueueConstants from '../queues/queue-constants'; import { IIPFSJob } from '../interfaces/ipfs.job.interface'; import { BaseConsumer } from '../utils/base-consumer'; import { IpfsService } from '../utils/ipfs.client'; -import { - AnnouncementResponse, - AnnouncementType, - BroadcastAnnouncement, - ProfileAnnouncement, - ReactionAnnouncement, - ReplyAnnouncement, - TombstoneAnnouncement, - UpdateAnnouncement, -} from '../types/content-announcement'; -import { isBroadcast, isProfile, isReaction, isReply, isTombstone, isTypedAnnouncement, isUpdate } from '../utils/type-guards'; +import { AnnouncementResponse } from '../types/content-announcement'; +import { isBroadcast, isProfile, isReaction, isReply, isTombstone, isUpdate } from '../utils/type-guards'; @Injectable() @Processor(QueueConstants.IPFS_QUEUE, { @@ -76,7 +66,11 @@ export class IPFSContentProcessor extends BaseConsumer { } } - private async enqueueAnnouncementResponse(announcementResponse: AnnouncementResponse, name: string, queue: Queue): Promise { + private async enqueueAnnouncementResponse( + announcementResponse: AnnouncementResponse, + name: string, + queue: Queue, + ): Promise { if (!(await this.isQueueFull(queue))) { const jobId = calculateJobId(announcementResponse); await queue.add(name, announcementResponse, { jobId }); @@ -98,7 +92,9 @@ export class IPFSContentProcessor extends BaseConsumer { if (isBroadcast(mapRecord)) { announcementResponse.announcement = { fromId: mapRecord.fromId, - contentHash: mapRecord.contentHash, + contentHash: Buffer.isBuffer(mapRecord.contentHash) + ? mapRecord.contentHash.toString() + : mapRecord.contentHash, url: mapRecord.url, announcementType: mapRecord.announcementType, }; @@ -108,7 +104,9 @@ export class IPFSContentProcessor extends BaseConsumer { announcementResponse.announcement = { fromId: mapRecord.fromId, targetAnnouncementType: mapRecord.targetAnnouncementType, - targetContentHash: mapRecord.targetContentHash, + targetContentHash: Buffer.isBuffer(mapRecord.targetContentHash) + ? mapRecord.targetContentHash.toString() + : mapRecord.targetContentHash, announcementType: mapRecord.announcementType, }; queue = this.tombstoneQueue; @@ -129,7 +127,9 @@ export class IPFSContentProcessor extends BaseConsumer { announcementType: mapRecord.announcementType, url: mapRecord.url, inReplyTo: mapRecord.inReplyTo, - contentHash: mapRecord.contentHash, + contentHash: Buffer.isBuffer(mapRecord.contentHash) + ? mapRecord.contentHash.toString() + : mapRecord.contentHash, }; queue = this.replyQueue; typeName = 'Reply'; @@ -138,7 +138,9 @@ export class IPFSContentProcessor extends BaseConsumer { fromId: mapRecord.fromId, announcementType: mapRecord.announcementType, url: mapRecord.url, - contentHash: mapRecord.contentHash, + contentHash: Buffer.isBuffer(mapRecord.contentHash) + ? mapRecord.contentHash.toString() + : mapRecord.contentHash, }; queue = this.profileQueue; typeName = 'Profile'; @@ -147,9 +149,13 @@ export class IPFSContentProcessor extends BaseConsumer { fromId: mapRecord.fromId, announcementType: mapRecord.announcementType, url: mapRecord.url, - contentHash: mapRecord.contentHash, + contentHash: Buffer.isBuffer(mapRecord.contentHash) + ? mapRecord.contentHash.toString() + : mapRecord.contentHash, targetAnnouncementType: mapRecord.targetAnnouncementType, - targetContentHash: mapRecord.targetContentHash, + targetContentHash: Buffer.isBuffer(mapRecord.targetContentHash) + ? mapRecord.targetContentHash.toString() + : mapRecord.targetContentHash, }; queue = this.updateQueue; typeName = 'Update';