Skip to content

Commit

Permalink
Provide method for stopping Batch 5 Jobs upon user request
Browse files Browse the repository at this point in the history
In the previous release of SCDF we used the JsrJobOperator to stop job executions.
The 2 stages of stopping jobs is as follows:
1) Sets the Batch Status of the job execution to STOPPING.  This signals to Spring Batch to stop execution at the next step.
2) If the Job is a StepLocator it will go through each of the StoppableTasklets and stop them.

So when using Batch 4.x it was just a quick check of the JobRegistry to retrieve the job, which was always empty since SCDF never deals with Jobs directly.
But with Batch 5.x they loaded the Job Registry and attempted to retrieve the Job in a different way using the SimpleJobOperator.
In the updated solution, SCDF doesn't use the SimpleJobOperator since SCDF doesn't have access to the StepLocator for the Job, nor does it use the JobRegistry and even if it did, it would always be empty.

w

* Modify stopAll so that it calls stop() instead of using its own logic
* Add Test for stopAll

Updated based on code review

Extract job stop code from stop(long) and place into stopJobExecution method.
The stopJobExecution  method will be used by stop(long) and stopAll.

Updated Based on Code Review

Moved Job Stop Code from assert methods to the tests.
Updated the JobExecution Create methods so that they verify that the job is not Stopping
  • Loading branch information
cppwfs committed Oct 17, 2024
1 parent df26691 commit 13a46ab
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ public int stopAll() {
Collection<JobExecution> result = jobExecutionDao.getRunningJobExecutions();
for (JobExecution jobExecution : result) {
try {
jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly);
jobExecution.setStatus( BatchStatus.STOPPING);
jobRepository.update(jobExecution);
stopJobExecution(jobExecution);
} catch (Exception e) {
throw new IllegalArgumentException("The following JobExecutionId was not found: " + jobExecution.getId(), e);
}
Expand All @@ -238,14 +236,19 @@ public int stopAll() {

@Override
public JobExecution stop(Long jobExecutionId) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
JobExecution jobExecution = getJobExecution(jobExecutionId);
return stopJobExecution(getJobExecution(jobExecutionId));
}

private JobExecution stopJobExecution(JobExecution jobExecution) throws JobExecutionNotRunningException{
if (!jobExecution.isRunning()) {
throw new JobExecutionNotRunningException("JobExecution is not running and therefore cannot be stopped");
}

logger.info("Stopping job execution: " + jobExecution);

jobExecution.setStatus(BatchStatus.STOPPED);
// Indicate the execution should be stopped by setting it's status to
// 'STOPPING'. It is assumed that
// the step implementation will check this status at chunk boundaries.
logger.info("Stopping job execution: {}", jobExecution);
jobExecution.getStepExecutions().forEach(StepExecution::setTerminateOnly);
jobExecution.setStatus(BatchStatus.STOPPING);
jobRepository.update(jobExecution);
return jobExecution;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import javax.sql.DataSource;

import org.junit.jupiter.api.Test;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;
import org.testcontainers.containers.JdbcDatabaseContainer;

import org.springframework.batch.core.BatchStatus;
Expand Down Expand Up @@ -191,13 +193,25 @@ void exceptionsShouldBeThrownIfRequestForNonExistingJobInstance() {

@Test
void stoppingJobExecutionShouldLeaveJobExecutionWithStatusOfStopping() throws Exception {
JobExecution jobExecution = createJobExecution(BASE_JOB_INST_NAME,true);
jobExecution = jobService.getJobExecution(jobExecution.getId());
assertThat(jobExecution.isRunning()).isTrue();
assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING);
JobExecution jobExecution = createRunningJobExecution(BASE_JOB_INST_NAME);
jobService.stop(jobExecution.getId());
assertJobHasStopped(jobExecution);
}

@Test
void stoppingAllJobExecutionsShouldLeaveJobExecutionsWithStatusOfStopping() throws Exception {
JobExecution jobExecutionOne = createRunningJobExecution(BASE_JOB_INST_NAME);
JobExecution jobExecutionTwo = createRunningJobExecution(BASE_JOB_INST_NAME+"_TWO");
jobService.stop(jobExecutionOne.getId());
assertJobHasStopped(jobExecutionOne);
jobService.stop(jobExecutionTwo.getId());
assertJobHasStopped(jobExecutionTwo);
}

private void assertJobHasStopped(JobExecution jobExecution) throws NoSuchJobExecutionException, JobExecutionNotRunningException {
jobExecution = jobService.getJobExecution(jobExecution.getId());
assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPED);
assertThat(jobExecution.getStatus()).isEqualTo(BatchStatus.STOPPING);
assertThat(jobExecution.isRunning()).isTrue();
}

private void verifyJobInstance(long id, String name) throws Exception {
Expand All @@ -221,9 +235,13 @@ private JobExecution createJobExecution(String name) throws Exception {
return createJobExecution(name, BatchStatus.STARTING, false);
}

private JobExecution createJobExecution(String name, boolean isRunning)
private JobExecution createRunningJobExecution(String name)
throws Exception {
return createJobExecution(name, BatchStatus.STARTING, isRunning);
JobExecution jobExecution = createJobExecution(name, BatchStatus.STARTING, true);
jobExecution = jobService.getJobExecution(jobExecution.getId());
assertThat(jobExecution.isRunning()).isTrue();
assertThat(jobExecution.getStatus()).isNotEqualTo(BatchStatus.STOPPING);
return jobExecution;
}

private JobExecution createJobExecution(String name, BatchStatus batchStatus, boolean isRunning) throws Exception {
Expand Down

0 comments on commit 13a46ab

Please sign in to comment.