Skip to content

Commit

Permalink
Fix: restart job when policy FromSavepointOnFailure and no savepoint (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Apr 28, 2023
1 parent e0a045d commit 25d3e4a
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 9 deletions.
11 changes: 3 additions & 8 deletions apis/flinkcluster/v1beta1/flinkcluster_types_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,15 @@ func (j *JobStatus) IsSavepointUpToDate(spec *JobSpec, compareTime time.Time) bo
}

// ShouldRestart returns true if the controller should restart failed job.
// The controller can restart the job only if there is a savepoint that is close to the end time of the job.
// The controller can restart the job if policy is set to FromSavepointOnFailure.
// Job will restart from savepoint if the savepoint was taken successfully.
func (j *JobStatus) ShouldRestart(spec *JobSpec) bool {
if j == nil || !j.IsFailed() || spec == nil {
return false
}

restartEnabled := spec.RestartPolicy != nil && *spec.RestartPolicy == JobRestartPolicyFromSavepointOnFailure

var jobCompletionTime time.Time
if j.CompletionTime != nil {
jobCompletionTime = j.CompletionTime.Time
}

return restartEnabled && j.IsSavepointUpToDate(spec, jobCompletionTime)
return restartEnabled
}

// UpdateReady returns true if job is ready to proceed update.
Expand Down
2 changes: 1 addition & 1 deletion apis/flinkcluster/v1beta1/flinkcluster_types_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestShouldRestartJob(t *testing.T) {
CompletionTime: &metav1.Time{Time: jobCompletionTime},
}
restart = jobStatus.ShouldRestart(&jobSpec)
assert.Equal(t, restart, false)
assert.Equal(t, restart, true)

// Not restart with restartPolicy Never
jobSpec = JobSpec{
Expand Down

0 comments on commit 25d3e4a

Please sign in to comment.