Skip to content

Commit

Permalink
separate PageBulkExportJobStreamManager to a different file
Browse files Browse the repository at this point in the history
  • Loading branch information
arafubeatbox committed Aug 16, 2024
1 parent 07ae16f commit 59634fb
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import type Crowi from '~/server/crowi';
import type { ApiV3Response } from '~/server/routes/apiv3/interfaces/apiv3-response';
import loggerFactory from '~/utils/logger';

import { DuplicateBulkExportJobError, pageBulkExportService } from '../../service/page-bulk-export';
import { pageBulkExportService } from '../../service/page-bulk-export';
import { DuplicateBulkExportJobError } from '../../service/page-bulk-export/errors';

const logger = loggerFactory('growi:routes:apiv3:page-bulk-export');

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export class DuplicateBulkExportJobError extends Error {

constructor() {
super('Duplicate bulk export job is in progress');
}

}

export class BulkExportJobExpiredError extends Error {

constructor() {
super('Bulk export job has expired');
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import fs from 'fs';
import path from 'path';
import type { Readable } from 'stream';
import { Writable } from 'stream';
import { pipeline as pipelinePromise } from 'stream/promises';

Expand All @@ -17,7 +16,6 @@ import mongoose from 'mongoose';
import type { SupportedActionType } from '~/interfaces/activity';
import { SupportedAction, SupportedTargetModel } from '~/interfaces/activity';
import { AttachmentType, FilePathOnStoragePrefix } from '~/server/interfaces/attachment';
import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';
import type { IAttachmentDocument } from '~/server/models';
import { Attachment } from '~/server/models';
import type { ActivityDocument } from '~/server/models/activity';
Expand All @@ -29,11 +27,14 @@ import { preNotifyService } from '~/server/service/pre-notify';
import { getBufferToFixedSizeTransform } from '~/server/util/stream';
import loggerFactory from '~/utils/logger';

import { PageBulkExportFormat, PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '../../interfaces/page-bulk-export';
import type { PageBulkExportJobDocument } from '../models/page-bulk-export-job';
import PageBulkExportJob from '../models/page-bulk-export-job';
import type { PageBulkExportPageSnapshotDocument } from '../models/page-bulk-export-page-snapshot';
import PageBulkExportPageSnapshot from '../models/page-bulk-export-page-snapshot';
import { PageBulkExportFormat, PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '../../../interfaces/page-bulk-export';
import type { PageBulkExportJobDocument } from '../../models/page-bulk-export-job';
import PageBulkExportJob from '../../models/page-bulk-export-job';
import type { PageBulkExportPageSnapshotDocument } from '../../models/page-bulk-export-page-snapshot';
import PageBulkExportPageSnapshot from '../../models/page-bulk-export-page-snapshot';

import { BulkExportJobExpiredError, DuplicateBulkExportJobError } from './errors';
import { PageBulkExportJobStreamManager } from './page-bulk-export-job-stream-manager';


const logger = loggerFactory('growi:services:PageBulkExportService');
Expand All @@ -43,47 +44,6 @@ type ActivityParameters ={
endpoint: string;
}

export class DuplicateBulkExportJobError extends Error {

constructor() {
super('Duplicate bulk export job is in progress');
}

}

class BulkExportJobExpiredError extends Error {

constructor() {
super('Bulk export job has expired');
}

}

/**
* Used to keep track of streams currently being executed, and enable destroying them
*/
class PageBulkExportJobStreamManager {

private jobStreams: Record<string, Readable> = {};

addJobStream(jobId: ObjectIdLike, stream: Readable) {
this.jobStreams[jobId.toString()] = stream;
}

removeJobStream(jobId: ObjectIdLike) {
delete this.jobStreams[jobId.toString()];
}

destroyJobStream(jobId: ObjectIdLike) {
const stream = this.jobStreams[jobId.toString()];
if (stream != null) {
stream.destroy(new BulkExportJobExpiredError());
}
this.removeJobStream(jobId);
}

}

class PageBulkExportService {

crowi: any;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import type { Readable } from 'stream';

import type { ObjectIdLike } from '~/server/interfaces/mongoose-utils';

import { BulkExportJobExpiredError } from './errors';

/**
* Used to keep track of streams currently being executed, and enable destroying them
*/
export class PageBulkExportJobStreamManager {

private jobStreams: Record<string, Readable> = {};

addJobStream(jobId: ObjectIdLike, stream: Readable): void {
this.jobStreams[jobId.toString()] = stream;
}

removeJobStream(jobId: ObjectIdLike): void {
delete this.jobStreams[jobId.toString()];
}

destroyJobStream(jobId: ObjectIdLike): void {
const stream = this.jobStreams[jobId.toString()];
if (stream != null) {
stream.destroy(new BulkExportJobExpiredError());
}
this.removeJobStream(jobId);
}

}
2 changes: 1 addition & 1 deletion apps/app/src/server/crowi/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import pkg from '^/package.json';

import { KeycloakUserGroupSyncService } from '~/features/external-user-group/server/service/keycloak-user-group-sync';
import { LdapUserGroupSyncService } from '~/features/external-user-group/server/service/ldap-user-group-sync';
import { PageBulkExportJobInProgressStatus, PageBulkExportJobStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
import { PageBulkExportJobInProgressStatus } from '~/features/page-bulk-export/interfaces/page-bulk-export';
import PageBulkExportJob from '~/features/page-bulk-export/server/models/page-bulk-export-job';
import instanciatePageBulkExportService, { pageBulkExportService } from '~/features/page-bulk-export/server/service/page-bulk-export';
import instanciatePageBulkExportJobCronService, { pageBulkExportJobCronService } from '~/features/page-bulk-export/server/service/page-bulk-export-job-cron';
Expand Down

0 comments on commit 59634fb

Please sign in to comment.