Skip to content

Commit

Permalink
fix: contentHash encoding as string for webhook notification (#361)
Browse files Browse the repository at this point in the history
Make sure we encode the `contentHash` field of content announcements as
a string instead of a UTF-16 buffer.
  • Loading branch information
JoeCap08055 authored Aug 9, 2024
1 parent dcb8211 commit c60f83a
Showing 1 changed file with 26 additions and 20 deletions.
46 changes: 26 additions & 20 deletions services/content-watcher/libs/common/src/ipfs/ipfs.dsnp.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
import { Injectable, Logger } from '@nestjs/common';
import { Job, JobsOptions, Queue } from 'bullmq';
import { Job, Queue } from 'bullmq';
import { InjectQueue, Processor } from '@nestjs/bullmq';
import { hexToString } from '@polkadot/util';
import parquet from '@dsnp/parquetjs';
import { bases } from 'multiformats/basics';
import { AppConfigService } from '../config/config.service';
import { calculateJobId } from '..';
import * as QueueConstants from '../queues/queue-constants';
import { IIPFSJob } from '../interfaces/ipfs.job.interface';
import { BaseConsumer } from '../utils/base-consumer';
import { IpfsService } from '../utils/ipfs.client';
import {
AnnouncementResponse,
AnnouncementType,
BroadcastAnnouncement,
ProfileAnnouncement,
ReactionAnnouncement,
ReplyAnnouncement,
TombstoneAnnouncement,
UpdateAnnouncement,
} from '../types/content-announcement';
import { isBroadcast, isProfile, isReaction, isReply, isTombstone, isTypedAnnouncement, isUpdate } from '../utils/type-guards';
import { AnnouncementResponse } from '../types/content-announcement';
import { isBroadcast, isProfile, isReaction, isReply, isTombstone, isUpdate } from '../utils/type-guards';

@Injectable()
@Processor(QueueConstants.IPFS_QUEUE, {
Expand Down Expand Up @@ -76,7 +66,11 @@ export class IPFSContentProcessor extends BaseConsumer {
}
}

private async enqueueAnnouncementResponse(announcementResponse: AnnouncementResponse, name: string, queue: Queue): Promise<void> {
private async enqueueAnnouncementResponse(
announcementResponse: AnnouncementResponse,
name: string,
queue: Queue,
): Promise<void> {
if (!(await this.isQueueFull(queue))) {
const jobId = calculateJobId(announcementResponse);
await queue.add(name, announcementResponse, { jobId });
Expand All @@ -98,7 +92,9 @@ export class IPFSContentProcessor extends BaseConsumer {
if (isBroadcast(mapRecord)) {
announcementResponse.announcement = {
fromId: mapRecord.fromId,
contentHash: mapRecord.contentHash,
contentHash: Buffer.isBuffer(mapRecord.contentHash)
? mapRecord.contentHash.toString()
: mapRecord.contentHash,
url: mapRecord.url,
announcementType: mapRecord.announcementType,
};
Expand All @@ -108,7 +104,9 @@ export class IPFSContentProcessor extends BaseConsumer {
announcementResponse.announcement = {
fromId: mapRecord.fromId,
targetAnnouncementType: mapRecord.targetAnnouncementType,
targetContentHash: mapRecord.targetContentHash,
targetContentHash: Buffer.isBuffer(mapRecord.targetContentHash)
? mapRecord.targetContentHash.toString()
: mapRecord.targetContentHash,
announcementType: mapRecord.announcementType,
};
queue = this.tombstoneQueue;
Expand All @@ -129,7 +127,9 @@ export class IPFSContentProcessor extends BaseConsumer {
announcementType: mapRecord.announcementType,
url: mapRecord.url,
inReplyTo: mapRecord.inReplyTo,
contentHash: mapRecord.contentHash,
contentHash: Buffer.isBuffer(mapRecord.contentHash)
? mapRecord.contentHash.toString()
: mapRecord.contentHash,
};
queue = this.replyQueue;
typeName = 'Reply';
Expand All @@ -138,7 +138,9 @@ export class IPFSContentProcessor extends BaseConsumer {
fromId: mapRecord.fromId,
announcementType: mapRecord.announcementType,
url: mapRecord.url,
contentHash: mapRecord.contentHash,
contentHash: Buffer.isBuffer(mapRecord.contentHash)
? mapRecord.contentHash.toString()
: mapRecord.contentHash,
};
queue = this.profileQueue;
typeName = 'Profile';
Expand All @@ -147,9 +149,13 @@ export class IPFSContentProcessor extends BaseConsumer {
fromId: mapRecord.fromId,
announcementType: mapRecord.announcementType,
url: mapRecord.url,
contentHash: mapRecord.contentHash,
contentHash: Buffer.isBuffer(mapRecord.contentHash)
? mapRecord.contentHash.toString()
: mapRecord.contentHash,
targetAnnouncementType: mapRecord.targetAnnouncementType,
targetContentHash: mapRecord.targetContentHash,
targetContentHash: Buffer.isBuffer(mapRecord.targetContentHash)
? mapRecord.targetContentHash.toString()
: mapRecord.targetContentHash,
};
queue = this.updateQueue;
typeName = 'Update';
Expand Down

0 comments on commit c60f83a

Please sign in to comment.