Skip to content

Commit

Permalink
content-watcher: move the processors into the app (#593)
Browse files Browse the repository at this point in the history
# Problem

closes #590 

# changes
- Moved worker processors into the app
- Renamed watcher queues to be different from publishing
  • Loading branch information
aramikm authored Oct 8, 2024
1 parent 69aa9b9 commit 3ba7db8
Show file tree
Hide file tree
Showing 25 changed files with 2,531 additions and 2,515 deletions.
8 changes: 4 additions & 4 deletions apps/content-watcher/src/api.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import { ScheduleModule } from '@nestjs/schedule';
import { ApiService } from './api.service';
import { HealthController, ScanControllerV1, SearchControllerV1, WebhookControllerV1 } from './controllers';
import { BlockchainModule } from '#blockchain/blockchain.module';
import { CrawlerModule } from '#content-watcher-lib/crawler/crawler.module';
import { IPFSProcessorModule } from '#content-watcher-lib/ipfs/ipfs.processor.module';
import { PubSubModule } from '#content-watcher-lib/pubsub/pubsub.module';
import { ScannerModule } from '#content-watcher-lib/scanner/scanner.module';
import { ContentWatcherQueues as QueueConstants } from '#types/constants/queue.constants';
import { CacheModule } from '#cache/cache.module';
Expand All @@ -18,9 +15,12 @@ import queueConfig from '#queue';
import { QueueModule } from '#queue/queue.module';
import ipfsConfig from '#storage/ipfs/ipfs.config';
import scannerConfig from '#content-watcher-lib/scanner/scanner.config';
import pubsubConfig from '#content-watcher-lib/pubsub/pubsub.config';
import { APP_FILTER } from '@nestjs/core';
import { AllExceptionsFilter } from '#utils/filters/exceptions.filter';
import pubsubConfig from '#content-watcher/pubsub/pubsub.config';
import { PubSubModule } from '#content-watcher/pubsub/pubsub.module';
import { CrawlerModule } from '#content-watcher/crawler/crawler.module';
import { IPFSProcessorModule } from '#content-watcher/ipfs/ipfs.processor.module';

@Module({
imports: [
Expand Down
2 changes: 1 addition & 1 deletion apps/content-watcher/src/api.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class ApiService {

constructor(
@InjectRedis() private redis: Redis,
@InjectQueue(QueueConstants.REQUEST_QUEUE_NAME) private requestQueue: Queue,
@InjectQueue(QueueConstants.WATCHER_REQUEST_QUEUE_NAME) private requestQueue: Queue,
private readonly scannerService: ScannerService,
) {
this.logger = new Logger(this.constructor.name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import { ContentWatcherQueues as QueueConstants } from '#types/constants/queue.c
import { ChainWatchOptionsDto } from '#types/dtos/content-watcher/chain.watch.dto';
import { BaseConsumer } from '#consumer';
import { ContentSearchRequestDto } from '#types/dtos/content-watcher/content-search-request.dto';
import { ChainEventProcessorService } from '../utils/chain-event-processor.service';
import { BlockchainRpcQueryService } from '#blockchain/blockchain-rpc-query.service';
import { ChainEventProcessorService } from '#content-watcher-lib/utils/chain-event-processor.service';

const CRAWLER_BLOCK_CHUNK_SIZE = 500;

@Injectable()
@Processor(QueueConstants.REQUEST_QUEUE_NAME, {
@Processor(QueueConstants.WATCHER_REQUEST_QUEUE_NAME, {
concurrency: 2,
})
export class CrawlerService extends BaseConsumer {
constructor(
@InjectRedis() private readonly cache: Redis,
@InjectQueue(QueueConstants.IPFS_QUEUE) private readonly ipfsQueue: Queue,
@InjectQueue(QueueConstants.WATCHER_IPFS_QUEUE) private readonly ipfsQueue: Queue,
private readonly chainEventService: ChainEventProcessorService,
private readonly blockchainService: BlockchainRpcQueryService,
) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Module } from '@nestjs/common';
import { IPFSStorageModule } from '#storage/ipfs/ipfs.module';
import { IPFSContentProcessor } from '#content-watcher-lib/ipfs/ipfs.processor';
import { IPFSContentProcessor } from './ipfs.processor';

@Module({
imports: [IPFSStorageModule],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,31 @@ import { ContentWatcherQueues as QueueConstants } from '#types/constants/queue.c
import { IIPFSJob } from '#types/interfaces/content-watcher/ipfs.job.interface';
import { BaseConsumer } from '#consumer';
import { AnnouncementResponse } from '#types/content-announcement';
import { isBroadcast, isProfile, isReaction, isReply, isTombstone, isUpdate } from '../utils/type-guards';
import {
isBroadcast,
isProfile,
isReaction,
isReply,
isTombstone,
isUpdate,
} from '#content-watcher-lib/utils/type-guards';
import scannerConfig, { IScannerConfig } from '#content-watcher-lib/scanner/scanner.config';
import { IpfsService } from '#storage/ipfs/ipfs.service';

@Injectable()
@Processor(QueueConstants.IPFS_QUEUE, {
@Processor(QueueConstants.WATCHER_IPFS_QUEUE, {
concurrency: 2,
})
export class IPFSContentProcessor extends BaseConsumer {
public logger: Logger;

constructor(
@InjectQueue(QueueConstants.BROADCAST_QUEUE_NAME) private broadcastQueue: Queue,
@InjectQueue(QueueConstants.TOMBSTONE_QUEUE_NAME) private tombstoneQueue: Queue,
@InjectQueue(QueueConstants.REACTION_QUEUE_NAME) private reactionQueue: Queue,
@InjectQueue(QueueConstants.REPLY_QUEUE_NAME) private replyQueue: Queue,
@InjectQueue(QueueConstants.PROFILE_QUEUE_NAME) private profileQueue: Queue,
@InjectQueue(QueueConstants.UPDATE_QUEUE_NAME) private updateQueue: Queue,
@InjectQueue(QueueConstants.WATCHER_BROADCAST_QUEUE_NAME) private broadcastQueue: Queue,
@InjectQueue(QueueConstants.WATCHER_TOMBSTONE_QUEUE_NAME) private tombstoneQueue: Queue,
@InjectQueue(QueueConstants.WATCHER_REACTION_QUEUE_NAME) private reactionQueue: Queue,
@InjectQueue(QueueConstants.WATCHER_REPLY_QUEUE_NAME) private replyQueue: Queue,
@InjectQueue(QueueConstants.WATCHER_PROFILE_QUEUE_NAME) private profileQueue: Queue,
@InjectQueue(QueueConstants.WATCHER_UPDATE_QUEUE_NAME) private updateQueue: Queue,
@Inject(scannerConfig.KEY) private config: IScannerConfig,
private ipfsService: IpfsService,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PubSubService } from '../pubsub.service';
import { AnnouncementResponse } from '#types/content-announcement';

@Injectable()
@Processor(QueueConstants.BROADCAST_QUEUE_NAME, { concurrency: 2 })
@Processor(QueueConstants.WATCHER_BROADCAST_QUEUE_NAME, { concurrency: 2 })
export class BroadcastSubscriber extends BaseConsumer {
constructor(private readonly pubsubService: PubSubService) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PubSubService } from '../pubsub.service';
import { AnnouncementResponse } from '#types/content-announcement';

@Injectable()
@Processor(QueueConstants.PROFILE_QUEUE_NAME, { concurrency: 2 })
@Processor(QueueConstants.WATCHER_PROFILE_QUEUE_NAME, { concurrency: 2 })
export class ProfileSubscriber extends BaseConsumer {
constructor(private readonly pubsubService: PubSubService) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PubSubService } from '../pubsub.service';
import { AnnouncementResponse } from '#types/content-announcement';

@Injectable()
@Processor(QueueConstants.REACTION_QUEUE_NAME, { concurrency: 2 })
@Processor(QueueConstants.WATCHER_REACTION_QUEUE_NAME, { concurrency: 2 })
export class ReactionSubscriber extends BaseConsumer {
constructor(private readonly pubsubService: PubSubService) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PubSubService } from '../pubsub.service';
import { AnnouncementResponse } from '#types/content-announcement';

@Injectable()
@Processor(QueueConstants.REPLY_QUEUE_NAME, { concurrency: 2 })
@Processor(QueueConstants.WATCHER_REPLY_QUEUE_NAME, { concurrency: 2 })
export class ReplySubscriber extends BaseConsumer {
constructor(private readonly pubsubService: PubSubService) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PubSubService } from '../pubsub.service';
import { AnnouncementResponse } from '#types/content-announcement';

@Injectable()
@Processor(QueueConstants.TOMBSTONE_QUEUE_NAME, { concurrency: 2 })
@Processor(QueueConstants.WATCHER_TOMBSTONE_QUEUE_NAME, { concurrency: 2 })
export class TomstoneSubscriber extends BaseConsumer {
constructor(private readonly pubsubService: PubSubService) {
super();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { PubSubService } from '../pubsub.service';
import { AnnouncementResponse } from '#types/content-announcement';

@Injectable()
@Processor(QueueConstants.UPDATE_QUEUE_NAME, { concurrency: 2 })
@Processor(QueueConstants.WATCHER_UPDATE_QUEUE_NAME, { concurrency: 2 })
export class UpdateSubscriber extends BaseConsumer {
constructor(private readonly pubsubService: PubSubService) {
super();
Expand Down
Loading

0 comments on commit 3ba7db8

Please sign in to comment.