Skip to content

Commit

Permalink
fix: controlling issues (#5756)
Browse files Browse the repository at this point in the history
* fix: continue paused container, when the abort is requested

* fix: ensure the lightweight container watcher will get `finishedAt` timestamp

* chore: add minor todos

* fix: configure no preemption policy by default for Test Workflows

* fix: allow Test Workflow status notifier to update "Aborted" status with details

* fix: ensure the parallel workers will not end without result

* fix: properly build timestamps and detect finished resul in the TestWorkflowResult model

* fix: use Pod/Job StatusConditions for detecting the status, make watching more resilient to external problems, expose more Kubernetes error details

* chore: do not require job/pod events when fetching logs of parallel workers and services

* fixup unit tests

* fix: delete preemption policy setup

* fixup unit tests

* fix: adjust resume time to avoid negative duration

* fix: calibrate clocks

* chore: use consts

* fixup unit tests
  • Loading branch information
rangoo94 committed Aug 14, 2024
1 parent 9af4cfe commit 1106f74
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 99 deletions.
8 changes: 6 additions & 2 deletions cmd/tcl/testworkflow-toolkit/commands/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func NewParallelCmd() *cobra.Command {

prevStatus := testkube.QUEUED_TestWorkflowStatus
prevStep := ""
prevIsFinished := false
scheduled := false
for v := range ctrl.WatchLightweight(ctx) {
// Handle error
Expand All @@ -283,14 +284,17 @@ func NewParallelCmd() *cobra.Command {
}

// Handle result change
if v.Status != prevStatus || v.Current != prevStep {
// TODO: the final status should always have the finishedAt too,
// there should be no need for checking isFinished diff
if v.Status != prevStatus || lastResult.IsFinished() != prevIsFinished || v.Current != prevStep {
if v.Status != prevStatus {
log(string(v.Status))
}
updates <- Update{index: index, result: v.Result}
prevStep = v.Current
prevStatus = v.Status
if v.Result.IsFinished() {
prevIsFinished = lastResult.IsFinished()
if lastResult.IsFinished() {
instructions.PrintOutput(env.Ref(), "parallel", ParallelStatus{Index: int(index), Status: v.Status, Result: v.Result})
ctxCancel()
return v.Result.IsPassed()
Expand Down
2 changes: 2 additions & 0 deletions cmd/testworkflow-init/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func main() {
orchestration.Pause(step, *step.StartedAt)
for _, parentRef := range step.Parents {
parent := state.GetStep(parentRef)
// TODO: What about parents of the parents?
orchestration.Pause(parent, *step.StartedAt)
}
return err
Expand All @@ -125,6 +126,7 @@ func main() {
orchestration.Resume(step, ts)
for _, parentRef := range step.Parents {
parent := state.GetStep(parentRef)
// TODO: What about parents of the parents?
orchestration.Resume(parent, ts)
}
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/testworkflow-init/orchestration/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (e *executionGroup) Kill() (err error) {
func (e *executionGroup) Abort() {
e.aborted.Store(true)
_ = e.Kill()
_ = e.Resume()
}

func (e *executionGroup) IsAborted() bool {
Expand Down
53 changes: 42 additions & 11 deletions pkg/api/v1/testkube/model_test_workflow_result_extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func (r *TestWorkflowResult) IsFinished() bool {
return !r.IsStatus(QUEUED_TestWorkflowStatus) && !r.IsStatus(RUNNING_TestWorkflowStatus) && !r.IsStatus(PAUSED_TestWorkflowStatus)
return !r.FinishedAt.IsZero() && !r.IsStatus(QUEUED_TestWorkflowStatus) && !r.IsStatus(RUNNING_TestWorkflowStatus) && !r.IsStatus(PAUSED_TestWorkflowStatus)
}

func (r *TestWorkflowResult) IsStatus(s TestWorkflowStatus) bool {
Expand Down Expand Up @@ -90,7 +90,7 @@ func (r *TestWorkflowResult) Fatal(err error, aborted bool, ts time.Time) {
if r.FinishedAt.IsZero() {
r.FinishedAt = ts.UTC()
}
if r.Initialization.Status == nil || (*r.Initialization.Status == QUEUED_TestWorkflowStepStatus) || (*r.Initialization.Status == RUNNING_TestWorkflowStepStatus) {
if r.Initialization.Status == nil || !(*r.Initialization.Status).Finished() {
r.Initialization.Status = common.Ptr(FAILED_TestWorkflowStepStatus)
if aborted {
r.Initialization.Status = common.Ptr(ABORTED_TestWorkflowStepStatus)
Expand Down Expand Up @@ -158,16 +158,26 @@ func (r *TestWorkflowResult) RecomputeDuration() {
if !r.FinishedAt.IsZero() {
r.PausedMs = 0

// Finalize pauses
for i := range r.Pauses {
step := r.Steps[r.Pauses[i].Ref]
if !step.FinishedAt.IsZero() {
if r.Pauses[i].ResumedAt.IsZero() {
r.Pauses[i].ResumedAt = step.FinishedAt
}
if r.Pauses[i].PausedAt.Before(step.StartedAt) {
r.Pauses[i].PausedAt = step.StartedAt
}
if r.Pauses[i].ResumedAt.Before(r.Pauses[i].PausedAt) {
r.Pauses[i].PausedAt = r.Pauses[i].ResumedAt
}
}
}

// Get unique pause periods
pauses := make([]TestWorkflowPause, 0)
loop:
for _, p := range r.Pauses {
// Finalize the pause if it's not
step := r.Steps[p.Ref]
if !step.FinishedAt.IsZero() && p.ResumedAt.IsZero() {
p.ResumedAt = step.FinishedAt
}

for i := range pauses {
// They don't overlap
if p.PausedAt.After(pauses[i].ResumedAt) || p.ResumedAt.Before(pauses[i].PausedAt) {
Expand Down Expand Up @@ -326,6 +336,14 @@ func (r *TestWorkflowResult) Recompute(sig []TestWorkflowSignature, scheduledAt
r.Steps[s.ref] = s.result
}

// Ensure initialization timestamps
if !r.Initialization.FinishedAt.IsZero() && r.Initialization.StartedAt.IsZero() {
r.Initialization.StartedAt = r.Initialization.FinishedAt
}
if !r.Initialization.StartedAt.IsZero() && r.Initialization.QueuedAt.IsZero() {
r.Initialization.QueuedAt = r.Initialization.StartedAt
}

// Calibrate the clock for group steps
walkSteps(sig, func(s TestWorkflowSignature) {
if len(s.Children) == 0 {
Expand Down Expand Up @@ -386,8 +404,12 @@ func (r *TestWorkflowResult) Recompute(sig []TestWorkflowSignature, scheduledAt
r.Status = common.Ptr(RUNNING_TestWorkflowStatus)
}

if r.FinishedAt.IsZero() && r.Status != nil && *r.Status == ABORTED_TestWorkflowStatus {
r.FinishedAt = r.LatestTimestamp()
// Ensure the finish time is after all other timestamps
if !r.FinishedAt.IsZero() || (r.Status != nil && *r.Status == ABORTED_TestWorkflowStatus) {
lastTs := r.LatestTimestamp()
if r.FinishedAt.Before(lastTs) {
r.FinishedAt = lastTs
}
}

// Compute the duration
Expand Down Expand Up @@ -543,7 +565,16 @@ func recomputeTestWorkflowStepResult(v TestWorkflowStepResult, sig TestWorkflowS

// Ensure there is a start time if the step is already finished
if v.StartedAt.IsZero() && !v.FinishedAt.IsZero() {
v.StartedAt = v.QueuedAt
if v.QueuedAt.IsZero() {
v.StartedAt = v.FinishedAt
} else {
v.StartedAt = v.QueuedAt
}
}

// Ensure there is a queued time if the step is already finished
if v.QueuedAt.IsZero() && !v.StartedAt.IsZero() {
v.QueuedAt = v.StartedAt
}

// Compute children
Expand Down
18 changes: 7 additions & 11 deletions pkg/testworkflows/testworkflowcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
prevNodeName := ""
prevPodIP := ""
prevStatus := testkube.QUEUED_TestWorkflowStatus
prevIsFinished := false
sig := stage.MapSignatureListToInternal(c.signature)
ch := make(chan LightweightNotification)
go func() {
Expand All @@ -245,20 +246,25 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
podIP, _ := c.PodIP(parentCtx)
current := prevCurrent
status := prevStatus
isFinished := prevIsFinished
if v.Value.Result != nil {
if v.Value.Result.Status != nil {
status = *v.Value.Result.Status
} else {
status = testkube.QUEUED_TestWorkflowStatus
}
current = v.Value.Result.Current(sig)
isFinished = v.Value.Result.IsFinished()
}

if nodeName != prevNodeName || podIP != prevPodIP || prevStatus != status || prevCurrent != current {
// TODO: the final status should always have the finishedAt too,
// there should be no need for checking isFinished diff
if nodeName != prevNodeName || isFinished != prevIsFinished || podIP != prevPodIP || prevStatus != status || prevCurrent != current {
prevNodeName = nodeName
prevPodIP = podIP
prevStatus = status
prevCurrent = current
prevIsFinished = isFinished
ch <- LightweightNotification{NodeName: nodeName, PodIP: podIP, Status: status, Current: current, Result: v.Value.Result}
}
}
Expand All @@ -271,16 +277,6 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader {
go func() {
defer writer.Close()
ref := ""
// Wait until there will be events fetched first
alignTimeoutCh := time.After(alignmentTimeout)
select {
case <-c.jobEvents.Peek(parentCtx):
case <-alignTimeoutCh:
}
select {
case <-c.podEvents.Peek(parentCtx):
case <-alignTimeoutCh:
}
ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
JobEvents: c.jobEvents,
Job: c.job,
Expand Down
17 changes: 17 additions & 0 deletions pkg/testworkflows/testworkflowcontroller/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,23 @@ type ContainerLog struct {
Output *instructions.Instruction
}

type ContainerLogType string

const (
ContainerLogTypeHint ContainerLogType = "hint"
ContainerLogTypeOutput ContainerLogType = "output"
ContainerLogTypeLog ContainerLogType = ""
)

func (c *ContainerLog) Type() ContainerLogType {
if c.Hint != nil {
return ContainerLogTypeHint
} else if c.Output != nil {
return ContainerLogTypeOutput
}
return ContainerLogTypeLog
}

// getContainerLogsStream is getting logs stream, and tries to reinitialize the stream on EOF.
// EOF may happen not only on the actual container end, but also in case of the log rotation.
// @see {@link https://stackoverflow.com/a/68673451}
Expand Down
10 changes: 5 additions & 5 deletions pkg/testworkflows/testworkflowcontroller/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,11 @@ func (n *notifier) Output(ref string, ts time.Time, output *instructions.Instruc
}

func (n *notifier) Finish(ts time.Time) {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if !n.result.FinishedAt.Before(ts) {
if ts.IsZero() {
return
}
n.resultMu.Lock()
defer n.resultMu.Unlock()
n.result.FinishedAt = ts
n.emit()
}
Expand All @@ -312,7 +312,7 @@ func (n *notifier) UpdateStepStatus(ref string, status testkube.TestWorkflowStep
}

func (n *notifier) finishInit(status ContainerResultStep) {
if n.result.Initialization.FinishedAt.Equal(status.FinishedAt) && n.result.Initialization.Status != nil && *n.result.Initialization.Status == status.Status {
if n.result.Initialization.FinishedAt.Equal(status.FinishedAt) && n.result.Initialization.Status != nil && *n.result.Initialization.Status == status.Status && (status.Status != testkube.ABORTED_TestWorkflowStepStatus || n.result.Initialization.ErrorMessage == status.Details) {
return
}
n.result.Initialization.FinishedAt = status.FinishedAt.UTC()
Expand Down Expand Up @@ -352,7 +352,7 @@ func (n *notifier) FinishStep(ref string, status ContainerResultStep) {
n.finishInit(status)
return
}
if n.result.Steps[ref].FinishedAt.Equal(status.FinishedAt) && n.result.Steps[ref].Status != nil && *n.result.Steps[ref].Status == status.Status {
if n.result.Steps[ref].FinishedAt.Equal(status.FinishedAt) && n.result.Steps[ref].Status != nil && *n.result.Steps[ref].Status == status.Status && (status.Status != testkube.ABORTED_TestWorkflowStepStatus || n.result.Steps[ref].ErrorMessage == status.Details) {
return
}
s := n.result.Steps[ref]
Expand Down
Loading

0 comments on commit 1106f74

Please sign in to comment.