From f62d60b797bdab09ec82d8031668ba986b658b85 Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Mon, 27 May 2024 16:17:47 +0200 Subject: [PATCH] fix(testworkflows): fetch logs properly for running services (#5502) * fix(testworkflows): fetch logs properly for running services * chore: extract 100ms as IdleTimeout constant * chore: avoid getting stack on container logs when the pod is already stopped --- .../testworkflow-toolkit/commands/services.go | 1 + cmd/tcl/testworkflow-toolkit/spawn/utils.go | 3 ++- .../testworkflowcontroller/controller.go | 15 ++++++++++++--- .../testworkflowcontroller/logs.go | 9 +++++++-- .../testworkflowcontroller/podstate.go | 18 +++++++++++++++++- .../watchinstrumentedpod.go | 16 ++++++++++++++-- pkg/triggers/scraper.go | 1 + 7 files changed, 54 insertions(+), 9 deletions(-) diff --git a/cmd/tcl/testworkflow-toolkit/commands/services.go b/cmd/tcl/testworkflow-toolkit/commands/services.go index 1b2e929ac3c..a414adeea6b 100644 --- a/cmd/tcl/testworkflow-toolkit/commands/services.go +++ b/cmd/tcl/testworkflow-toolkit/commands/services.go @@ -336,6 +336,7 @@ func NewServicesCmd() *cobra.Command { break } } + ctrl.StopController() // Fail if the container has not started if !started { diff --git a/cmd/tcl/testworkflow-toolkit/spawn/utils.go b/cmd/tcl/testworkflow-toolkit/spawn/utils.go index 1aa35e41af6..65fd806cbe0 100644 --- a/cmd/tcl/testworkflow-toolkit/spawn/utils.go +++ b/cmd/tcl/testworkflow-toolkit/spawn/utils.go @@ -234,7 +234,8 @@ func SaveLogs(ctx context.Context, clientSet kubernetes.Interface, storage artif Timeout: ControllerTimeout, }) if err == nil { - err = storage.SaveStream(filePath, ctrl.Logs(ctx)) + err = storage.SaveStream(filePath, ctrl.Logs(ctx, false)) + ctrl.StopController() } return filePath, err } diff --git a/pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go b/pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go index 6f323077f55..20462b59f2e 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go +++ b/pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go @@ -20,6 +20,7 @@ import ( initconstants "github.com/kubeshop/testkube/cmd/tcl/testworkflow-init/constants" "github.com/kubeshop/testkube/cmd/tcl/testworkflow-init/data" + "github.com/kubeshop/testkube/internal/common" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor" "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor/constants" @@ -56,7 +57,7 @@ type Controller interface { Cleanup(ctx context.Context) error Watch(ctx context.Context) <-chan ChannelMessage[Notification] WatchLightweight(ctx context.Context) <-chan LightweightNotification - Logs(ctx context.Context) io.Reader + Logs(ctx context.Context, follow bool) io.Reader NodeName(ctx context.Context) (string, error) PodIP(ctx context.Context) (string, error) StopController() @@ -271,12 +272,20 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei return ch } -func (c *controller) Logs(parentCtx context.Context) io.Reader { +func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader { reader, writer := io.Pipe() go func() { defer writer.Close() ref := "" - for v := range c.Watch(parentCtx) { + w, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{ + JobEvents: c.jobEvents, + Job: c.job, + Follow: common.Ptr(follow), + }) + if err != nil { + return + } + for v := range w.Channel() { if v.Error == nil && v.Value.Log != "" { if ref != v.Value.Ref { ref = v.Value.Ref diff --git a/pkg/tcl/testworkflowstcl/testworkflowcontroller/logs.go b/pkg/tcl/testworkflowstcl/testworkflowcontroller/logs.go index 44fa53edb8e..e4057901597 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowcontroller/logs.go +++ b/pkg/tcl/testworkflowstcl/testworkflowcontroller/logs.go @@ -11,6 +11,7 @@ package testworkflowcontroller import ( "bufio" "context" + "errors" "io" "strings" "time" @@ -36,7 +37,7 @@ type ContainerLog struct { Output *data.Instruction } -func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int) Channel[ContainerLog] { +func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, follow bool, pod Channel[*corev1.Pod]) Channel[ContainerLog] { w := newChannel[ContainerLog](ctx, bufferSize) go func() { @@ -45,7 +46,7 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam // Create logs stream request req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ - Follow: true, + Follow: follow, Timestamps: true, Container: containerName, }) @@ -58,6 +59,10 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam w.Error(err) return } + p := <-pod.Peek(ctx) + if p != nil && IsPodDone(p) { + w.Error(errors.New("pod is finished and there are no logs for this container")) + } continue } break diff --git a/pkg/tcl/testworkflowstcl/testworkflowcontroller/podstate.go b/pkg/tcl/testworkflowstcl/testworkflowcontroller/podstate.go index eb6c1672cce..cb41223686a 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowcontroller/podstate.go +++ b/pkg/tcl/testworkflowstcl/testworkflowcontroller/podstate.go @@ -62,9 +62,16 @@ func newPodState(parentCtx context.Context) *podState { <-ctx.Done() state.mu.Lock() defer state.mu.Unlock() - for _, c := range state.finishedCh { + for name, c := range state.finishedCh { if c != nil { + state.finished[name] = time.Time{} close(c) + delete(state.finishedCh, name) + } + } + for _, c := range state.prestart { + if c != nil { + c.Close() } } }() @@ -218,6 +225,8 @@ func (p *podState) RegisterJob(job *batchv1.Job) { p.setQueuedAt("", job.CreationTimestamp.Time) if job.Status.CompletionTime != nil { p.setFinishedAt("", job.Status.CompletionTime.Time) + } else if job.DeletionTimestamp != nil { + p.setFinishedAt("", job.DeletionTimestamp.Time) } } @@ -231,6 +240,13 @@ func (p *podState) PreStart(name string) <-chan ChannelMessage[podStateUpdate] { return p.preStartWatcher(name).Channel() } +func (p *podState) IsFinished(name string) bool { + p.mu.Lock() + defer p.mu.Unlock() + _, ok := p.finished[name] + return ok && p.ctx.Err() == nil +} + func (p *podState) Finished(name string) chan struct{} { p.mu.Lock() defer p.mu.Unlock() diff --git a/pkg/tcl/testworkflowstcl/testworkflowcontroller/watchinstrumentedpod.go b/pkg/tcl/testworkflowstcl/testworkflowcontroller/watchinstrumentedpod.go index c2032692d53..286d2e18181 100644 --- a/pkg/tcl/testworkflowstcl/testworkflowcontroller/watchinstrumentedpod.go +++ b/pkg/tcl/testworkflowstcl/testworkflowcontroller/watchinstrumentedpod.go @@ -19,17 +19,20 @@ import ( "k8s.io/client-go/kubernetes" "github.com/kubeshop/testkube/cmd/tcl/testworkflow-init/constants" + "github.com/kubeshop/testkube/internal/common" "github.com/kubeshop/testkube/pkg/api/v1/testkube" "github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor" ) const ( InitContainerName = "tktw-init" + IdleTimeout = 100 * time.Millisecond ) type WatchInstrumentedPodOptions struct { JobEvents Channel[*corev1.Event] Job Channel[*batchv1.Job] + Follow *bool } func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interface, signature []testworkflowprocessor.Signature, scheduledAt time.Time, pod Channel[*corev1.Pod], podEvents Channel[*corev1.Event], opts WatchInstrumentedPodOptions) (Channel[Notification], error) { @@ -101,7 +104,8 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } // Watch the container logs - for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10).Channel() { + follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(ref) + for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10, follow, pod).Channel() { if v.Error != nil { s.Error(v.Error) } else if v.Value.Output != nil { @@ -139,7 +143,15 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf } // Get the final result - <-state.Finished(ref) + if follow { + <-state.Finished(ref) + } else { + select { + case <-state.Finished(ref): + case <-time.After(IdleTimeout): + return + } + } status, err := state.ContainerResult(ref) if err != nil { s.Error(err) diff --git a/pkg/triggers/scraper.go b/pkg/triggers/scraper.go index 27a58b063cb..f7a07187095 100644 --- a/pkg/triggers/scraper.go +++ b/pkg/triggers/scraper.go @@ -188,6 +188,7 @@ func (s *Service) abortRunningTestWorkflowExecutions(ctx context.Context, status // Pro edition only (tcl protected code) // Abort the execution err = ctrl.Abort(context.Background()) + ctrl.StopController() if err != nil { s.logger.Errorf("trigger service: execution scraper component: error aborting test workflow execution: %v", err) continue