From 86ad5724cb279f112536468e11262c2f1afdb042 Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Wed, 20 Mar 2024 16:57:12 +0100 Subject: [PATCH] fix: manual abort (improve deletion detection) (#5227) --- .../testworkflowstcl/testworkflowcontroller/cleanup.go | 3 ++- .../testworkflowcontroller/controller.go | 7 +++++++ pkg/tcl/testworkflowstcl/testworkflowcontroller/utils.go | 9 ++++++--- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go b/pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go index 57d41ebe085..2ea4369d25d 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go +++ b/pkg/tcl/testworkflowstcl/testworkflowcontroller/cleanup.go @@ -40,7 +40,8 @@ func cleanupPods(ctx context.Context, clientSet kubernetes.Interface, namespace, func cleanupJobs(ctx context.Context, clientSet kubernetes.Interface, namespace, id string) error { return clientSet.BatchV1().Jobs(namespace).DeleteCollection(ctx, metav1.DeleteOptions{ - PropagationPolicy: common.Ptr(metav1.DeletePropagationBackground), + GracePeriodSeconds: common.Ptr(int64(0)), + PropagationPolicy: common.Ptr(metav1.DeletePropagationBackground), }, metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", testworkflowprocessor.ExecutionIdLabelName, id), }) diff --git a/pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go b/pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go index 84e0b3ed92d..00a9a4db471 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go +++ b/pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go @@ -400,6 +400,13 @@ func (c *controller) Watch(parentCtx context.Context) Watcher[Notification] { result.FinishedAt = v.Value.Status.CompletionTime.Time } } + if result.FinishedAt.IsZero() { + for v := range c.pod.Stream(ctx).Channel() { + if v.Value != nil && v.Value.ObjectMeta.DeletionTimestamp != nil { + result.FinishedAt = v.Value.ObjectMeta.DeletionTimestamp.Time + } + } + } // Compute the TestWorkflow status and dates result.Recompute(sig, c.scheduledAt) diff --git a/pkg/tcl/testworkflowstcl/testworkflowcontroller/utils.go b/pkg/tcl/testworkflowstcl/testworkflowcontroller/utils.go index 80ea622d231..52cc5fb404f 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowcontroller/utils.go +++ b/pkg/tcl/testworkflowstcl/testworkflowcontroller/utils.go @@ -29,11 +29,11 @@ const ( ) func IsPodDone(pod *corev1.Pod) bool { - return pod.Status.Phase != corev1.PodPending && pod.Status.Phase != corev1.PodRunning + return (pod.Status.Phase != corev1.PodPending && pod.Status.Phase != corev1.PodRunning) || pod.ObjectMeta.DeletionTimestamp != nil } func IsJobDone(job *batchv1.Job) bool { - return job.Status.Active == 0 && (job.Status.Succeeded > 0 || job.Status.Failed > 0) + return (job.Status.Active == 0 && (job.Status.Succeeded > 0 || job.Status.Failed > 0)) || job.ObjectMeta.DeletionTimestamp != nil } func WatchJob(ctx context.Context, clientSet kubernetes.Interface, namespace, name string, cacheSize int) Watcher[*batchv1.Job] { @@ -242,7 +242,10 @@ func WatchContainerStatus(ctx context.Context, pod Watcher[*corev1.Pod], contain select { case <-w.Done(): return - case p := <-stream.Channel(): + case p, ok := <-stream.Channel(): + if !ok { + return + } if p.Error != nil { w.SendError(p.Error) continue