Skip to content

Commit

Permalink
fix: propagate aborted status correctly on GKE's OOMKilled (#5762)
Browse files Browse the repository at this point in the history
* fix: propagate aborted status correctly on GKE's OOMKilled
* fix: don't trust step statuses in Test Workflow after it's aborted
* fix: show abort information about pod/job timeout
* fix: isolate abort error to single step
  • Loading branch information
rangoo94 authored Aug 14, 2024
1 parent c7a1224 commit 3dd3301
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 62 deletions.
14 changes: 7 additions & 7 deletions pkg/api/v1/testkube/model_test_workflow_result_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,10 @@ func predictTestWorkflowStepStatus(v TestWorkflowStepResult, sig TestWorkflowSig
}
}

if getTestWorkflowStepStatus(v) == FAILED_TestWorkflowStepStatus {
return FAILED_TestWorkflowStepStatus, finished
} else if aborted {
if aborted {
return ABORTED_TestWorkflowStepStatus, finished
} else if getTestWorkflowStepStatus(v) == FAILED_TestWorkflowStepStatus {
return FAILED_TestWorkflowStepStatus, finished
} else if (failed && !sig.Negative) || (!failed && sig.Negative) {
return FAILED_TestWorkflowStepStatus, finished
} else if skipped {
Expand Down Expand Up @@ -593,15 +593,15 @@ func recomputeTestWorkflowStepResult(v TestWorkflowStepResult, sig TestWorkflowS
v.StartedAt = r.Steps[children[0].Ref].StartedAt
v.FinishedAt = r.Steps[children[len(children)-1].Ref].StartedAt

// It has been already marked as failed internally from some step below
if getTestWorkflowStepStatus(v) == FAILED_TestWorkflowStepStatus {
// It has been already marked internally from some step below
predicted, finished := predictTestWorkflowStepStatus(v, sig, r)
if finished && getTestWorkflowStepStatus(v) == predicted {
return v
}

// It is finished already
if !v.FinishedAt.IsZero() {
predicted, finished := predictTestWorkflowStepStatus(v, sig, r)
if finished && (v.Status == nil || !(*v.Status).Finished()) {
if finished && (v.Status == nil || predicted == ABORTED_TestWorkflowStepStatus || !(*v.Status).Finished()) {
v.Status = common.Ptr(predicted)
}
return v
Expand Down
3 changes: 3 additions & 0 deletions pkg/testworkflows/testworkflowcontroller/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface,
if !strings.Contains(err.Error(), "is waiting to start") {
return nil, err
}
if !follow {
return bytes.NewReader(nil), io.EOF
}
p := <-pod.Peek(ctx)
if p == nil {
return bytes.NewReader(nil), io.EOF
Expand Down
35 changes: 35 additions & 0 deletions pkg/testworkflows/testworkflowcontroller/podstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (

type podState struct {
pod *corev1.Pod
job *batchv1.Job
queued map[string]time.Time
started map[string]time.Time
finished map[string]time.Time
Expand Down Expand Up @@ -279,6 +280,12 @@ func (p *podState) RegisterPod(pod *corev1.Pod) {
}

func (p *podState) RegisterJob(job *batchv1.Job) {
if job == nil {
return
}
p.mu.Lock()
p.job = job
p.mu.Unlock()
p.setQueuedAt("", job.CreationTimestamp.Time)
if job.Status.CompletionTime != nil {
p.setFinishedAt("", job.Status.CompletionTime.Time)
Expand Down Expand Up @@ -349,6 +356,18 @@ func (p *podState) FinishedAt(name string) time.Time {
func (p *podState) containerResult(name string) (ContainerResult, error) {
status := p.containerStatus(name)
if status == nil || status.State.Terminated == nil {
if p.job != nil && IsJobDone(p.job) {
for _, c := range p.job.Status.Conditions {
if c.Type == batchv1.JobFailed {
if c.Status == corev1.ConditionTrue && c.Reason == "DeadlineExceeded" {
result := UnknownContainerResult
result.Details = fmt.Sprintf("Job timed out after %d seconds.", *p.job.Spec.ActiveDeadlineSeconds)
return result, nil
}
break
}
}
}
if p.pod != nil && IsPodDone(p.pod) {
result := UnknownContainerResult
for _, c := range p.pod.Status.Conditions {
Expand Down Expand Up @@ -377,6 +396,22 @@ func (p *podState) containerResult(name string) (ContainerResult, error) {
result.Details = status.State.Terminated.Reason
}

// Handle the pod timeout
if result.Details == "Error" && p.pod.Status.Reason == "DeadlineExceeded" && p.pod.Spec.ActiveDeadlineSeconds != nil {
result.Details = fmt.Sprintf("Pod timed out after %d seconds.", *p.pod.Spec.ActiveDeadlineSeconds)
}

if p.job != nil && p.job.Spec.ActiveDeadlineSeconds != nil {
for _, c := range p.job.Status.Conditions {
if c.Type == batchv1.JobFailed {
if c.Status == corev1.ConditionTrue && c.Reason == "DeadlineExceeded" {
result.Details = fmt.Sprintf("Job timed out after %d seconds.", *p.job.Spec.ActiveDeadlineSeconds)
}
break
}
}
}

re := regexp.MustCompile(`^([^,]),(0|[1-9]\d*)$`)
for _, message := range strings.Split(status.State.Terminated.Message, "/") {
match := re.FindStringSubmatch(message)
Expand Down
106 changes: 51 additions & 55 deletions pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// For each container:
lastTs := s.result.Initialization.FinishedAt
aborted := false
for _, container := range append(podObj.Spec.InitContainers, podObj.Spec.Containers...) {
// Ignore non-standard TestWorkflow containers
number, err := strconv.Atoi(container.Name)
Expand Down Expand Up @@ -171,8 +172,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
}

// Watch the container logs
follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(containerName) && !state.IsFinished("")
aborted := false
follow := !aborted && common.ResolvePtr(opts.Follow, true) && !state.IsFinished(containerName) && !state.IsFinished("")
lastStarted := initialRef
executionStatuses := map[string]constants.ExecutionResult{}
for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, containerName, follow, 10, pod).Channel() {
Expand All @@ -192,23 +192,25 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
switch v.Value.Hint.Name {
case constants.InstructionStart:
lastStarted = v.Value.Hint.Ref
s.Start(v.Value.Hint.Ref, v.Value.Time)
if !aborted {
s.Start(v.Value.Hint.Ref, v.Value.Time)
}
case constants.InstructionEnd:
status := testkube.TestWorkflowStepStatus(v.Value.Hint.Value.(string))
if status == "" {
status = testkube.PASSED_TestWorkflowStepStatus
}
s.FinishStep(v.Value.Hint.Ref, ContainerResultStep{
Status: status,
Details: executionStatuses[v.Value.Hint.Ref].Details,
ExitCode: int(executionStatuses[v.Value.Hint.Ref].ExitCode),
FinishedAt: v.Value.Time,
})

// Escape when the job was aborted
if !aborted {
s.FinishStep(v.Value.Hint.Ref, ContainerResultStep{
Status: status,
Details: executionStatuses[v.Value.Hint.Ref].Details,
ExitCode: int(executionStatuses[v.Value.Hint.Ref].ExitCode),
FinishedAt: v.Value.Time,
})
}
if status == testkube.ABORTED_TestWorkflowStepStatus {
aborted = true
break
continue
}
case constants.InstructionExecution:
serialized, _ := json.Marshal(v.Value.Hint.Value)
Expand All @@ -235,15 +237,13 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
}
}

if aborted {
// Don't wait for any other statuses if we already know that some task has been aborted
} else if follow {
if follow {
select {
case <-state.Finished(container.Name):
case <-state.Finished(""):
// Finish fast when the whole execution has been finished
}
} else {
} else if !aborted {
select {
case <-state.Finished(container.Name):
case <-state.Finished(""):
Expand All @@ -254,49 +254,46 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
}

// Fall back results to the termination log
if !aborted {
result, err := state.ContainerResult(container.Name)
if err != nil {
s.Error(err)
break
result, _ := state.ContainerResult(container.Name)
for i, ref := range endRefs[index] {
// Ignore tree root hints
if ref == "root" {
continue
}
status := ContainerResultStep{
Status: testkube.ABORTED_TestWorkflowStepStatus,
ExitCode: -1,
Details: result.Details,
FinishedAt: s.GetStepResult(ref).FinishedAt,
}
if status.FinishedAt.IsZero() {
status.FinishedAt = result.FinishedAt
}
if status.FinishedAt.IsZero() {
status.FinishedAt = state.FinishedAt("")
}
if status.FinishedAt.IsZero() {
status.FinishedAt = s.GetLastTimestamp(lastStarted)
}

for i, ref := range endRefs[index] {
// Ignore tree root hints
if ref == "root" {
continue
if len(result.Steps) > i {
status = result.Steps[i]
if status.Details == "" {
status.Details = result.Details
}
status := ContainerResultStep{
Status: testkube.ABORTED_TestWorkflowStepStatus,
ExitCode: -1,
Details: result.Details,
FinishedAt: s.GetStepResult(ref).FinishedAt,
}
if status.FinishedAt.IsZero() {
status.FinishedAt = result.FinishedAt
}
if status.FinishedAt.IsZero() {
status.FinishedAt = state.FinishedAt("")
}
if status.FinishedAt.IsZero() {
status.FinishedAt = s.GetLastTimestamp(lastStarted)
}

if len(result.Steps) > i {
status = result.Steps[i]
if status.Details == "" {
status.Details = result.Details
}
finishedAt := s.GetStepResult(ref).FinishedAt
if !finishedAt.IsZero() {
status.FinishedAt = finishedAt
}
finishedAt := s.GetStepResult(ref).FinishedAt
if !finishedAt.IsZero() {
status.FinishedAt = finishedAt
}
}
// Ignore if it's already finished
currentStatus := s.GetStepResult(ref).Status
if currentStatus == nil || !currentStatus.Finished() || *currentStatus == testkube.ABORTED_TestWorkflowStepStatus {
s.FinishStep(ref, status)
if status.Status == testkube.ABORTED_TestWorkflowStepStatus {
lastStarted = ref
break
}
}
if status.Status == testkube.ABORTED_TestWorkflowStepStatus {
lastStarted = ref
break
}
}

Expand All @@ -311,7 +308,6 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
// because due to GKE bug, the Job is still pending,
// so it will get stuck there.
if s.IsAnyAborted() {
result, _ := state.ContainerResult(container.Name)
reason := s.result.Steps[lastStarted].ErrorMessage
if reason == "" {
reason = result.Details
Expand Down

0 comments on commit 3dd3301

Please sign in to comment.