Skip to content

Commit

Permalink
fix: manual abort (improve deletion detection) (#5227)
Browse files Browse the repository at this point in the history
  • Loading branch information
rangoo94 authored Mar 20, 2024
1 parent d8ebb29 commit 86ad572
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 4 deletions.
3 changes: 2 additions & 1 deletion pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func cleanupPods(ctx context.Context, clientSet kubernetes.Interface, namespace,

func cleanupJobs(ctx context.Context, clientSet kubernetes.Interface, namespace, id string) error {
return clientSet.BatchV1().Jobs(namespace).DeleteCollection(ctx, metav1.DeleteOptions{
PropagationPolicy: common.Ptr(metav1.DeletePropagationBackground),
GracePeriodSeconds: common.Ptr(int64(0)),
PropagationPolicy: common.Ptr(metav1.DeletePropagationBackground),
}, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", testworkflowprocessor.ExecutionIdLabelName, id),
})
Expand Down
7 changes: 7 additions & 0 deletions pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,13 @@ func (c *controller) Watch(parentCtx context.Context) Watcher[Notification] {
result.FinishedAt = v.Value.Status.CompletionTime.Time
}
}
if result.FinishedAt.IsZero() {
for v := range c.pod.Stream(ctx).Channel() {
if v.Value != nil && v.Value.ObjectMeta.DeletionTimestamp != nil {
result.FinishedAt = v.Value.ObjectMeta.DeletionTimestamp.Time
}
}
}

// Compute the TestWorkflow status and dates
result.Recompute(sig, c.scheduledAt)
Expand Down
9 changes: 6 additions & 3 deletions pkg/tcl/testworkflowstcl/testworkflowcontroller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ const (
)

func IsPodDone(pod *corev1.Pod) bool {
return pod.Status.Phase != corev1.PodPending && pod.Status.Phase != corev1.PodRunning
return (pod.Status.Phase != corev1.PodPending && pod.Status.Phase != corev1.PodRunning) || pod.ObjectMeta.DeletionTimestamp != nil
}

func IsJobDone(job *batchv1.Job) bool {
return job.Status.Active == 0 && (job.Status.Succeeded > 0 || job.Status.Failed > 0)
return (job.Status.Active == 0 && (job.Status.Succeeded > 0 || job.Status.Failed > 0)) || job.ObjectMeta.DeletionTimestamp != nil
}

func WatchJob(ctx context.Context, clientSet kubernetes.Interface, namespace, name string, cacheSize int) Watcher[*batchv1.Job] {
Expand Down Expand Up @@ -242,7 +242,10 @@ func WatchContainerStatus(ctx context.Context, pod Watcher[*corev1.Pod], contain
select {
case <-w.Done():
return
case p := <-stream.Channel():
case p, ok := <-stream.Channel():
if !ok {
return
}
if p.Error != nil {
w.SendError(p.Error)
continue
Expand Down

0 comments on commit 86ad572

Please sign in to comment.