Skip to content

Commit

Permalink
add support for promises as callbackHandler result
Browse files Browse the repository at this point in the history
  • Loading branch information
vzickner committed Jan 16, 2024
1 parent e8a8a6e commit 21ac1b9
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
88 changes: 88 additions & 0 deletions src/external-worker-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,50 @@ describe('ExternalWorkerClient', () => {
expect(job).toBeDefined();
});


it('and complete job with promise', async () => {
const scope = nockInstance
.post(`/external-job-api/acquire/jobs`, {
topic: "myTopic",
lockDuration: "PT1M",
numberOfTasks: 1,
numberOfRetries: 5,
workerId: "test-worker",
scopeType: null
})
.reply(200, acquireJobResponse)
.post(`/external-job-api/acquire/jobs`, {
topic: "myTopic",
lockDuration: "PT1M",
numberOfTasks: 1,
numberOfRetries: 5,
workerId: "test-worker",
scopeType: null
})
.reply(200, [])
.post(`/external-job-api/acquire/jobs/${acquireJobResponse[0].id}/complete`, {
variables: null,
workerId: "test-worker"
})
.reply(204);

let job: ExternalWorkerAcquireJobResponse = undefined;
const subscription = externalWorkerClient.subscribe({
topic: 'myTopic',
callbackHandler(acquiredJob: ExternalWorkerAcquireJobResponse, workResultBuilder: WorkerResultBuilder) {
job = acquiredJob;
return Promise.resolve(workResultBuilder.success());
},
waitPeriodSeconds: 0.2
});
const done = await waitForRequestsToComplete(scope);
expect(done).toBeTruthy();

subscription.unsubscribe();
scope.done();
expect(job).toBeDefined();
});

it('and complete job with variables', async () => {
const scope = nockInstance
.post(`/external-job-api/acquire/jobs`, {
Expand Down Expand Up @@ -161,6 +205,50 @@ describe('ExternalWorkerClient', () => {
expect(job).toBeDefined();
});

it('and fail job with promise', async () => {
const scope = nockInstance
.post(`/external-job-api/acquire/jobs`, {
topic: "myTopic",
lockDuration: "PT1M",
numberOfTasks: 1,
numberOfRetries: 5,
workerId: "test-worker",
scopeType: null
})
.reply(200, acquireJobResponse)
.post(`/external-job-api/acquire/jobs`, {
topic: "myTopic",
lockDuration: "PT1M",
numberOfTasks: 1,
numberOfRetries: 5,
workerId: "test-worker",
scopeType: null
})
.reply(200, [])
.post(`/external-job-api/acquire/jobs/${acquireJobResponse[0].id}/fail`, {
workerId: "test-worker",
errorMessage: "Some error message"
})
.reply(204);

let job: ExternalWorkerAcquireJobResponse = undefined;
const subscription = externalWorkerClient.subscribe({
topic: 'myTopic',
callbackHandler(acquiredJob: ExternalWorkerAcquireJobResponse, workResultBuilder: WorkerResultBuilder) {
job = acquiredJob;
return Promise.reject("Some error message");
},
waitPeriodSeconds: 0.2
});

const done = await waitForRequestsToComplete(scope);
expect(done).toBeTruthy();

subscription.unsubscribe();
scope.done();
expect(job).toBeDefined();
});

it('and bpmn error job', async () => {
const scope = nockInstance
.post(`/external-job-api/acquire/jobs`, {
Expand Down
8 changes: 4 additions & 4 deletions src/external-worker-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ export class ExternalWorkerClient {
for (const job of jobs) {
const jobId = job.id;
try {
const workResult = params.callbackHandler(job, new WorkerResultBuilder(job));
if (!workResult) {
const workResult = await Promise.resolve(params.callbackHandler(job, new WorkerResultBuilder(job)));
if (!workResult || !workResult.execute) {
await this._restClient.completeJob({jobId});
} else {
await workResult.execute(this._restClient);
}
} catch (e) {
console.error('Failed to execute job with exception', e);
await this._restClient.failJob({jobId, errorDetails: JSON.stringify(e)});
await this._restClient.failJob({jobId, errorMessage: typeof e === 'string' ? e : JSON.stringify(e)});
}
}
if (timeoutInformation.unsubscribed) {
Expand Down Expand Up @@ -211,7 +211,7 @@ export type ExternalWorkerClientParams = {

export type ExternalWorkerSubscriptionParams = {
topic: string;
callbackHandler: (acquiredJob: ExternalWorkerAcquireJobResponse, workResultBuilder?: WorkerResultBuilder) => WorkResult | void,
callbackHandler: (acquiredJob: ExternalWorkerAcquireJobResponse, workResultBuilder: WorkerResultBuilder) => WorkResult | Promise<WorkResult> | void,
lockDuration?: string;
numberOfRetries?: number;
numberOfTasks?: number;
Expand Down

0 comments on commit 21ac1b9

Please sign in to comment.