Skip to content

Commit

Permalink
Merge pull request #2480 from atmire/feature-process_polling
Browse files Browse the repository at this point in the history
Feature process polling
  • Loading branch information
tdonohue authored Feb 22, 2024
2 parents 1c7f098 + 049fbb8 commit 45650c1
Show file tree
Hide file tree
Showing 28 changed files with 652 additions and 417 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ describe('CollectionSourceControlsComponent', () => {
invoke: createSuccessfulRemoteDataObject$(process),
});
processDataService = jasmine.createSpyObj('processDataService', {
findById: createSuccessfulRemoteDataObject$(process),
autoRefreshUntilCompletion: createSuccessfulRemoteDataObject$(process),
});
bitstreamService = jasmine.createSpyObj('bitstreamService', {
findByHref: createSuccessfulRemoteDataObject$(bitstream),
Expand Down Expand Up @@ -137,7 +137,7 @@ describe('CollectionSourceControlsComponent', () => {
{name: '-i', value: new ContentSourceSetSerializer().Serialize(contentSource.oaiSetId)},
], []);

expect(processDataService.findById).toHaveBeenCalledWith(process.processId, false);
expect(processDataService.autoRefreshUntilCompletion).toHaveBeenCalledWith(process.processId);
expect(bitstreamService.findByHref).toHaveBeenCalledWith(process._links.output.href);
expect(notificationsService.info).toHaveBeenCalledWith(jasmine.anything() as any, 'Script text');
});
Expand All @@ -151,7 +151,7 @@ describe('CollectionSourceControlsComponent', () => {
{name: '-r', value: null},
{name: '-c', value: collection.uuid},
], []);
expect(processDataService.findById).toHaveBeenCalledWith(process.processId, false);
expect(processDataService.autoRefreshUntilCompletion).toHaveBeenCalledWith(process.processId);
expect(notificationsService.success).toHaveBeenCalled();
});
});
Expand All @@ -164,7 +164,7 @@ describe('CollectionSourceControlsComponent', () => {
{name: '-o', value: null},
{name: '-c', value: collection.uuid},
], []);
expect(processDataService.findById).toHaveBeenCalledWith(process.processId, false);
expect(processDataService.autoRefreshUntilCompletion).toHaveBeenCalledWith(process.processId);
expect(notificationsService.success).toHaveBeenCalled();
});
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ import { ScriptDataService } from '../../../../core/data/processes/script-data.s
import { ContentSource } from '../../../../core/shared/content-source.model';
import { ProcessDataService } from '../../../../core/data/processes/process-data.service';
import {
getAllCompletedRemoteData,
getAllSucceededRemoteDataPayload,
getFirstCompletedRemoteData,
getFirstSucceededRemoteDataPayload
} from '../../../../core/shared/operators';
import { filter, map, switchMap, tap } from 'rxjs/operators';
import { hasValue, hasValueOperator } from '../../../../shared/empty.util';
import { hasValue } from '../../../../shared/empty.util';
import { ProcessStatus } from '../../../../process-page/processes/process-status.model';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';
import { RequestService } from '../../../../core/data/request.service';
Expand Down Expand Up @@ -95,36 +94,25 @@ export class CollectionSourceControlsComponent implements OnDestroy {
}),
// filter out responses that aren't successful since the pinging of the process only needs to happen when the invocation was successful.
filter((rd) => rd.hasSucceeded && hasValue(rd.payload)),
switchMap((rd) => this.processDataService.findById(rd.payload.processId, false)),
getAllCompletedRemoteData(),
filter((rd) => !rd.isStale && (rd.hasSucceeded || rd.hasFailed)),
map((rd) => rd.payload),
hasValueOperator(),
switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)),
map((rd) => rd.payload)
).subscribe((process: Process) => {
if (process.processStatus.toString() !== ProcessStatus[ProcessStatus.COMPLETED].toString() &&
process.processStatus.toString() !== ProcessStatus[ProcessStatus.FAILED].toString()) {
// Ping the current process state every 5s
setTimeout(() => {
this.requestService.setStaleByHrefSubstring(process._links.self.href);
}, 5000);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
this.notificationsService.error(this.translateService.get('collection.source.controls.test.failed'));
this.testConfigRunning$.next(false);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.COMPLETED].toString()) {
this.bitstreamService.findByHref(process._links.output.href).pipe(getFirstSucceededRemoteDataPayload()).subscribe((bitstream) => {
this.httpClient.get(bitstream._links.content.href, {responseType: 'text'}).subscribe((data: any) => {
const output = data.replaceAll(new RegExp('.*\\@(.*)', 'g'), '$1')
.replaceAll('The script has started', '')
.replaceAll('The script has completed', '');
this.notificationsService.info(this.translateService.get('collection.source.controls.test.completed'), output);
});
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
this.notificationsService.error(this.translateService.get('collection.source.controls.test.failed'));
this.testConfigRunning$.next(false);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.COMPLETED].toString()) {
this.bitstreamService.findByHref(process._links.output.href).pipe(getFirstSucceededRemoteDataPayload()).subscribe((bitstream) => {
this.httpClient.get(bitstream._links.content.href, {responseType: 'text'}).subscribe((data: any) => {
const output = data.replaceAll(new RegExp('.*\\@(.*)', 'g'), '$1')
.replaceAll('The script has started', '')
.replaceAll('The script has completed', '');
this.notificationsService.info(this.translateService.get('collection.source.controls.test.completed'), output);
});
this.testConfigRunning$.next(false);
}
});
this.testConfigRunning$.next(false);
}
));
}));
}

/**
Expand All @@ -147,31 +135,19 @@ export class CollectionSourceControlsComponent implements OnDestroy {
}
}),
filter((rd) => rd.hasSucceeded && hasValue(rd.payload)),
switchMap((rd) => this.processDataService.findById(rd.payload.processId, false)),
getAllCompletedRemoteData(),
filter((rd) => !rd.isStale && (rd.hasSucceeded || rd.hasFailed)),
map((rd) => rd.payload),
hasValueOperator(),
switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)),
map((rd) => rd.payload)
).subscribe((process) => {
if (process.processStatus.toString() !== ProcessStatus[ProcessStatus.COMPLETED].toString() &&
process.processStatus.toString() !== ProcessStatus[ProcessStatus.FAILED].toString()) {
// Ping the current process state every 5s
setTimeout(() => {
this.requestService.setStaleByHrefSubstring(process._links.self.href);
this.requestService.setStaleByHrefSubstring(this.collection._links.self.href);
}, 5000);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
this.notificationsService.error(this.translateService.get('collection.source.controls.import.failed'));
this.importRunning$.next(false);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.COMPLETED].toString()) {
this.notificationsService.success(this.translateService.get('collection.source.controls.import.completed'));
this.requestService.setStaleByHrefSubstring(this.collection._links.self.href);
this.importRunning$.next(false);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
this.notificationsService.error(this.translateService.get('collection.source.controls.import.failed'));
this.importRunning$.next(false);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.COMPLETED].toString()) {
this.notificationsService.success(this.translateService.get('collection.source.controls.import.completed'));
this.requestService.setStaleByHrefSubstring(this.collection._links.self.href);
this.importRunning$.next(false);
}
));
}));
}

/**
Expand All @@ -194,31 +170,19 @@ export class CollectionSourceControlsComponent implements OnDestroy {
}
}),
filter((rd) => rd.hasSucceeded && hasValue(rd.payload)),
switchMap((rd) => this.processDataService.findById(rd.payload.processId, false)),
getAllCompletedRemoteData(),
filter((rd) => !rd.isStale && (rd.hasSucceeded || rd.hasFailed)),
map((rd) => rd.payload),
hasValueOperator(),
switchMap((rd) => this.processDataService.autoRefreshUntilCompletion(rd.payload.processId)),
map((rd) => rd.payload)
).subscribe((process) => {
if (process.processStatus.toString() !== ProcessStatus[ProcessStatus.COMPLETED].toString() &&
process.processStatus.toString() !== ProcessStatus[ProcessStatus.FAILED].toString()) {
// Ping the current process state every 5s
setTimeout(() => {
this.requestService.setStaleByHrefSubstring(process._links.self.href);
this.requestService.setStaleByHrefSubstring(this.collection._links.self.href);
}, 5000);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
this.notificationsService.error(this.translateService.get('collection.source.controls.reset.failed'));
this.reImportRunning$.next(false);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.COMPLETED].toString()) {
this.notificationsService.success(this.translateService.get('collection.source.controls.reset.completed'));
this.requestService.setStaleByHrefSubstring(this.collection._links.self.href);
this.reImportRunning$.next(false);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.FAILED].toString()) {
this.notificationsService.error(this.translateService.get('collection.source.controls.reset.failed'));
this.reImportRunning$.next(false);
}
if (process.processStatus.toString() === ProcessStatus[ProcessStatus.COMPLETED].toString()) {
this.notificationsService.success(this.translateService.get('collection.source.controls.reset.completed'));
this.requestService.setStaleByHrefSubstring(this.collection._links.self.href);
this.reImportRunning$.next(false);
}
));
}));
}

ngOnDestroy(): void {
Expand Down
5 changes: 3 additions & 2 deletions src/app/core/cache/builders/remote-data-build.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,12 +273,13 @@ export class RemoteDataBuildService {
return isStale(r2.state) ? r1 : r2;
}
}),
distinctUntilKeyChanged('lastUpdated')
);

const payload$ = this.buildPayload<T>(requestEntry$, href$, ...linksToFollow);

return this.toRemoteDataObservable<T>(requestEntry$, payload$);
return this.toRemoteDataObservable<T>(requestEntry$, payload$).pipe(
distinctUntilKeyChanged('lastUpdated'),
);
}

/**
Expand Down
Loading

0 comments on commit 45650c1

Please sign in to comment.