diff --git a/pkg/api/v1/testkube/model_test_workflow_result_extended.go b/pkg/api/v1/testkube/model_test_workflow_result_extended.go index 4909d47a1c..6dfe857c06 100644 --- a/pkg/api/v1/testkube/model_test_workflow_result_extended.go +++ b/pkg/api/v1/testkube/model_test_workflow_result_extended.go @@ -540,10 +540,10 @@ func predictTestWorkflowStepStatus(v TestWorkflowStepResult, sig TestWorkflowSig } } - if getTestWorkflowStepStatus(v) == FAILED_TestWorkflowStepStatus { - return FAILED_TestWorkflowStepStatus, finished - } else if aborted { + if aborted { return ABORTED_TestWorkflowStepStatus, finished + } else if getTestWorkflowStepStatus(v) == FAILED_TestWorkflowStepStatus { + return FAILED_TestWorkflowStepStatus, finished } else if (failed && !sig.Negative) || (!failed && sig.Negative) { return FAILED_TestWorkflowStepStatus, finished } else if skipped { @@ -593,15 +593,15 @@ func recomputeTestWorkflowStepResult(v TestWorkflowStepResult, sig TestWorkflowS v.StartedAt = r.Steps[children[0].Ref].StartedAt v.FinishedAt = r.Steps[children[len(children)-1].Ref].StartedAt - // It has been already marked as failed internally from some step below - if getTestWorkflowStepStatus(v) == FAILED_TestWorkflowStepStatus { + // It has been already marked internally from some step below + predicted, finished := predictTestWorkflowStepStatus(v, sig, r) + if finished && getTestWorkflowStepStatus(v) == predicted { return v } // It is finished already if !v.FinishedAt.IsZero() { - predicted, finished := predictTestWorkflowStepStatus(v, sig, r) - if finished && (v.Status == nil || !(*v.Status).Finished()) { + if finished && (v.Status == nil || predicted == ABORTED_TestWorkflowStepStatus || !(*v.Status).Finished()) { v.Status = common.Ptr(predicted) } return v diff --git a/pkg/testworkflows/testworkflowcontroller/logs.go b/pkg/testworkflows/testworkflowcontroller/logs.go index bcd303924a..3512836df1 100644 --- a/pkg/testworkflows/testworkflowcontroller/logs.go +++ b/pkg/testworkflows/testworkflowcontroller/logs.go @@ -88,6 +88,9 @@ func getContainerLogsStream(ctx context.Context, clientSet kubernetes.Interface, if !strings.Contains(err.Error(), "is waiting to start") { return nil, err } + if !follow { + return bytes.NewReader(nil), io.EOF + } p := <-pod.Peek(ctx) if p == nil { return bytes.NewReader(nil), io.EOF diff --git a/pkg/testworkflows/testworkflowcontroller/podstate.go b/pkg/testworkflows/testworkflowcontroller/podstate.go index 21e159c306..40099baf2a 100644 --- a/pkg/testworkflows/testworkflowcontroller/podstate.go +++ b/pkg/testworkflows/testworkflowcontroller/podstate.go @@ -31,6 +31,7 @@ var ( type podState struct { pod *corev1.Pod + job *batchv1.Job queued map[string]time.Time started map[string]time.Time finished map[string]time.Time @@ -279,6 +280,12 @@ func (p *podState) RegisterPod(pod *corev1.Pod) { } func (p *podState) RegisterJob(job *batchv1.Job) { + if job == nil { + return + } + p.mu.Lock() + p.job = job + p.mu.Unlock() p.setQueuedAt("", job.CreationTimestamp.Time) if job.Status.CompletionTime != nil { p.setFinishedAt("", job.Status.CompletionTime.Time) @@ -349,6 +356,18 @@ func (p *podState) FinishedAt(name string) time.Time { func (p *podState) containerResult(name string) (ContainerResult, error) { status := p.containerStatus(name) if status == nil || status.State.Terminated == nil { + if p.job != nil && IsJobDone(p.job) { + for _, c := range p.job.Status.Conditions { + if c.Type == batchv1.JobFailed { + if c.Status == corev1.ConditionTrue && c.Reason == "DeadlineExceeded" { + result := UnknownContainerResult + result.Details = fmt.Sprintf("Job timed out after %d seconds.", *p.job.Spec.ActiveDeadlineSeconds) + return result, nil + } + break + } + } + } if p.pod != nil && IsPodDone(p.pod) { result := UnknownContainerResult for _, c := range p.pod.Status.Conditions { @@ -377,6 +396,22 @@ func (p *podState) containerResult(name string) (ContainerResult, error) { result.Details = status.State.Terminated.Reason } + // Handle the pod timeout + if result.Details == "Error" && p.pod.Status.Reason == "DeadlineExceeded" && p.pod.Spec.ActiveDeadlineSeconds != nil { + result.Details = fmt.Sprintf("Pod timed out after %d seconds.", *p.pod.Spec.ActiveDeadlineSeconds) + } + + if p.job != nil && p.job.Spec.ActiveDeadlineSeconds != nil { + for _, c := range p.job.Status.Conditions { + if c.Type == batchv1.JobFailed { + if c.Status == corev1.ConditionTrue && c.Reason == "DeadlineExceeded" { + result.Details = fmt.Sprintf("Job timed out after %d seconds.", *p.job.Spec.ActiveDeadlineSeconds) + } + break + } + } + } + re := regexp.MustCompile(`^([^,]),(0|[1-9]\d*)$`) for _, message := range strings.Split(status.State.Terminated.Message, "/") { match := re.FindStringSubmatch(message) diff --git a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go index f0a2ec88e9..35dfac38ef 100644 --- a/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go +++ b/pkg/testworkflows/testworkflowcontroller/watchinstrumentedpod.go @@ -116,6 +116,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // For each container: lastTs := s.result.Initialization.FinishedAt + aborted := false for _, container := range append(podObj.Spec.InitContainers, podObj.Spec.Containers...) { // Ignore non-standard TestWorkflow containers number, err := strconv.Atoi(container.Name) @@ -171,8 +172,7 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } // Watch the container logs - follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(containerName) && !state.IsFinished("") - aborted := false + follow := !aborted && common.ResolvePtr(opts.Follow, true) && !state.IsFinished(containerName) && !state.IsFinished("") lastStarted := initialRef executionStatuses := map[string]constants.ExecutionResult{} for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, containerName, follow, 10, pod).Channel() { @@ -192,23 +192,25 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf switch v.Value.Hint.Name { case constants.InstructionStart: lastStarted = v.Value.Hint.Ref - s.Start(v.Value.Hint.Ref, v.Value.Time) + if !aborted { + s.Start(v.Value.Hint.Ref, v.Value.Time) + } case constants.InstructionEnd: status := testkube.TestWorkflowStepStatus(v.Value.Hint.Value.(string)) if status == "" { status = testkube.PASSED_TestWorkflowStepStatus } - s.FinishStep(v.Value.Hint.Ref, ContainerResultStep{ - Status: status, - Details: executionStatuses[v.Value.Hint.Ref].Details, - ExitCode: int(executionStatuses[v.Value.Hint.Ref].ExitCode), - FinishedAt: v.Value.Time, - }) - - // Escape when the job was aborted + if !aborted { + s.FinishStep(v.Value.Hint.Ref, ContainerResultStep{ + Status: status, + Details: executionStatuses[v.Value.Hint.Ref].Details, + ExitCode: int(executionStatuses[v.Value.Hint.Ref].ExitCode), + FinishedAt: v.Value.Time, + }) + } if status == testkube.ABORTED_TestWorkflowStepStatus { aborted = true - break + continue } case constants.InstructionExecution: serialized, _ := json.Marshal(v.Value.Hint.Value) @@ -235,15 +237,13 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } } - if aborted { - // Don't wait for any other statuses if we already know that some task has been aborted - } else if follow { + if follow { select { case <-state.Finished(container.Name): case <-state.Finished(""): // Finish fast when the whole execution has been finished } - } else { + } else if !aborted { select { case <-state.Finished(container.Name): case <-state.Finished(""): @@ -254,49 +254,46 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } // Fall back results to the termination log - if !aborted { - result, err := state.ContainerResult(container.Name) - if err != nil { - s.Error(err) - break + result, _ := state.ContainerResult(container.Name) + for i, ref := range endRefs[index] { + // Ignore tree root hints + if ref == "root" { + continue + } + status := ContainerResultStep{ + Status: testkube.ABORTED_TestWorkflowStepStatus, + ExitCode: -1, + Details: result.Details, + FinishedAt: s.GetStepResult(ref).FinishedAt, + } + if status.FinishedAt.IsZero() { + status.FinishedAt = result.FinishedAt + } + if status.FinishedAt.IsZero() { + status.FinishedAt = state.FinishedAt("") + } + if status.FinishedAt.IsZero() { + status.FinishedAt = s.GetLastTimestamp(lastStarted) } - for i, ref := range endRefs[index] { - // Ignore tree root hints - if ref == "root" { - continue + if len(result.Steps) > i { + status = result.Steps[i] + if status.Details == "" { + status.Details = result.Details } - status := ContainerResultStep{ - Status: testkube.ABORTED_TestWorkflowStepStatus, - ExitCode: -1, - Details: result.Details, - FinishedAt: s.GetStepResult(ref).FinishedAt, - } - if status.FinishedAt.IsZero() { - status.FinishedAt = result.FinishedAt - } - if status.FinishedAt.IsZero() { - status.FinishedAt = state.FinishedAt("") - } - if status.FinishedAt.IsZero() { - status.FinishedAt = s.GetLastTimestamp(lastStarted) - } - - if len(result.Steps) > i { - status = result.Steps[i] - if status.Details == "" { - status.Details = result.Details - } - finishedAt := s.GetStepResult(ref).FinishedAt - if !finishedAt.IsZero() { - status.FinishedAt = finishedAt - } + finishedAt := s.GetStepResult(ref).FinishedAt + if !finishedAt.IsZero() { + status.FinishedAt = finishedAt } + } + // Ignore if it's already finished + currentStatus := s.GetStepResult(ref).Status + if currentStatus == nil || !currentStatus.Finished() || *currentStatus == testkube.ABORTED_TestWorkflowStepStatus { s.FinishStep(ref, status) - if status.Status == testkube.ABORTED_TestWorkflowStepStatus { - lastStarted = ref - break - } + } + if status.Status == testkube.ABORTED_TestWorkflowStepStatus { + lastStarted = ref + break } } @@ -311,7 +308,6 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf // because due to GKE bug, the Job is still pending, // so it will get stuck there. if s.IsAnyAborted() { - result, _ := state.ContainerResult(container.Name) reason := s.result.Steps[lastStarted].ErrorMessage if reason == "" { reason = result.Details