From 21ac1b916297d91d34d156c0d1a9f4f4dd3e127b Mon Sep 17 00:00:00 2001 From: Valentin Zickner Date: Tue, 16 Jan 2024 21:25:48 +0100 Subject: [PATCH] add support for promises as callbackHandler result --- src/external-worker-client.spec.ts | 88 ++++++++++++++++++++++++++++++ src/external-worker-client.ts | 8 +-- 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/src/external-worker-client.spec.ts b/src/external-worker-client.spec.ts index 53fed33..1e8307d 100644 --- a/src/external-worker-client.spec.ts +++ b/src/external-worker-client.spec.ts @@ -75,6 +75,50 @@ describe('ExternalWorkerClient', () => { expect(job).toBeDefined(); }); + + it('and complete job with promise', async () => { + const scope = nockInstance + .post(`/external-job-api/acquire/jobs`, { + topic: "myTopic", + lockDuration: "PT1M", + numberOfTasks: 1, + numberOfRetries: 5, + workerId: "test-worker", + scopeType: null + }) + .reply(200, acquireJobResponse) + .post(`/external-job-api/acquire/jobs`, { + topic: "myTopic", + lockDuration: "PT1M", + numberOfTasks: 1, + numberOfRetries: 5, + workerId: "test-worker", + scopeType: null + }) + .reply(200, []) + .post(`/external-job-api/acquire/jobs/${acquireJobResponse[0].id}/complete`, { + variables: null, + workerId: "test-worker" + }) + .reply(204); + + let job: ExternalWorkerAcquireJobResponse = undefined; + const subscription = externalWorkerClient.subscribe({ + topic: 'myTopic', + callbackHandler(acquiredJob: ExternalWorkerAcquireJobResponse, workResultBuilder: WorkerResultBuilder) { + job = acquiredJob; + return Promise.resolve(workResultBuilder.success()); + }, + waitPeriodSeconds: 0.2 + }); + const done = await waitForRequestsToComplete(scope); + expect(done).toBeTruthy(); + + subscription.unsubscribe(); + scope.done(); + expect(job).toBeDefined(); + }); + it('and complete job with variables', async () => { const scope = nockInstance .post(`/external-job-api/acquire/jobs`, { @@ -161,6 +205,50 @@ describe('ExternalWorkerClient', () => { expect(job).toBeDefined(); }); + it('and fail job with promise', async () => { + const scope = nockInstance + .post(`/external-job-api/acquire/jobs`, { + topic: "myTopic", + lockDuration: "PT1M", + numberOfTasks: 1, + numberOfRetries: 5, + workerId: "test-worker", + scopeType: null + }) + .reply(200, acquireJobResponse) + .post(`/external-job-api/acquire/jobs`, { + topic: "myTopic", + lockDuration: "PT1M", + numberOfTasks: 1, + numberOfRetries: 5, + workerId: "test-worker", + scopeType: null + }) + .reply(200, []) + .post(`/external-job-api/acquire/jobs/${acquireJobResponse[0].id}/fail`, { + workerId: "test-worker", + errorMessage: "Some error message" + }) + .reply(204); + + let job: ExternalWorkerAcquireJobResponse = undefined; + const subscription = externalWorkerClient.subscribe({ + topic: 'myTopic', + callbackHandler(acquiredJob: ExternalWorkerAcquireJobResponse, workResultBuilder: WorkerResultBuilder) { + job = acquiredJob; + return Promise.reject("Some error message"); + }, + waitPeriodSeconds: 0.2 + }); + + const done = await waitForRequestsToComplete(scope); + expect(done).toBeTruthy(); + + subscription.unsubscribe(); + scope.done(); + expect(job).toBeDefined(); + }); + it('and bpmn error job', async () => { const scope = nockInstance .post(`/external-job-api/acquire/jobs`, { diff --git a/src/external-worker-client.ts b/src/external-worker-client.ts index 12b1a43..3d29d1a 100644 --- a/src/external-worker-client.ts +++ b/src/external-worker-client.ts @@ -43,15 +43,15 @@ export class ExternalWorkerClient { for (const job of jobs) { const jobId = job.id; try { - const workResult = params.callbackHandler(job, new WorkerResultBuilder(job)); - if (!workResult) { + const workResult = await Promise.resolve(params.callbackHandler(job, new WorkerResultBuilder(job))); + if (!workResult || !workResult.execute) { await this._restClient.completeJob({jobId}); } else { await workResult.execute(this._restClient); } } catch (e) { console.error('Failed to execute job with exception', e); - await this._restClient.failJob({jobId, errorDetails: JSON.stringify(e)}); + await this._restClient.failJob({jobId, errorMessage: typeof e === 'string' ? e : JSON.stringify(e)}); } } if (timeoutInformation.unsubscribed) { @@ -211,7 +211,7 @@ export type ExternalWorkerClientParams = { export type ExternalWorkerSubscriptionParams = { topic: string; - callbackHandler: (acquiredJob: ExternalWorkerAcquireJobResponse, workResultBuilder?: WorkerResultBuilder) => WorkResult | void, + callbackHandler: (acquiredJob: ExternalWorkerAcquireJobResponse, workResultBuilder: WorkerResultBuilder) => WorkResult | Promise | void, lockDuration?: string; numberOfRetries?: number; numberOfTasks?: number;