Skip to content

Commit

Permalink
Exit Job early when the Pollable Task is already finished
Browse files Browse the repository at this point in the history
Pollable tasks are marked as finished before the Quartz job actually terminates. There is no global transaction that binds the finishing of the PollableTask with the Quartz job. As such it is possible that a Pollable Task is marked as finished but the Quartz job is not and I’m suspecting that if the scheduler goes down after the pollable task being marked as finished and before the Quartz job is marked as finished then the Quartz job might be retriggered. That would lead to running again a finished PollableTask.

This exit early when the Pollable Task is already finished
  • Loading branch information
aurambaj committed Jul 20, 2023
1 parent d7db1a4 commit 601672f
Showing 1 changed file with 33 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,33 +46,40 @@ public void execute(JobExecutionContext context) throws JobExecutionException {
Long pollableTaskId = context.getMergedJobDataMap().getLong(POLLABLE_TASK_ID);
currentPollableTask = pollableTaskService.getPollableTask(pollableTaskId);

ExceptionHolder exceptionHolder = new ExceptionHolder(currentPollableTask);

try {
I callInput;

String inputStringFromJob = context.getMergedJobDataMap().getString(INPUT);

if (inputStringFromJob != null) {
logger.debug("Inlined data, read from job data");
callInput =
(I) objectMapper.readValueUnchecked(inputStringFromJob, typeTokenInput.getRawType());
} else {
logger.debug("No inlined data, read from blob storage");
callInput =
(I) pollableTaskBlobStorage.getInput(pollableTaskId, typeTokenInput.getRawType());
}

O callOutput = call(callInput);

if (!typeTokenOutput.getRawType().equals(Void.class)) {
pollableTaskBlobStorage.saveOutput(pollableTaskId, callOutput);
if (currentPollableTask.getFinishedDate() != null) {
logger.error(
"PollableTask is already finished. Suspected invalid retry (potential scheduler shutdown "
+ "after pollable was marked finished but befre Quartz job was recorded as finished)");
} else {
ExceptionHolder exceptionHolder = new ExceptionHolder(currentPollableTask);

try {
I callInput;

String inputStringFromJob = context.getMergedJobDataMap().getString(INPUT);

if (inputStringFromJob != null) {
logger.debug("Inlined data, read from job data");
callInput =
(I) objectMapper.readValueUnchecked(inputStringFromJob, typeTokenInput.getRawType());
} else {
logger.debug("No inlined data, read from blob storage");
callInput =
(I) pollableTaskBlobStorage.getInput(pollableTaskId, typeTokenInput.getRawType());
}

O callOutput = call(callInput);

if (!typeTokenOutput.getRawType().equals(Void.class)) {
pollableTaskBlobStorage.saveOutput(pollableTaskId, callOutput);
}
} catch (Throwable t) {
pollableTaskExceptionUtils.processException(t, exceptionHolder);
} finally {
currentPollableTask =
pollableTaskService.finishTask(
currentPollableTask.getId(), null, exceptionHolder, null);
}
} catch (Throwable t) {
pollableTaskExceptionUtils.processException(t, exceptionHolder);
} finally {
currentPollableTask =
pollableTaskService.finishTask(currentPollableTask.getId(), null, exceptionHolder, null);
}
}

Expand Down

0 comments on commit 601672f

Please sign in to comment.