Skip to content

Commit

Permalink
fix(testworkflows): fetch logs properly for running services (#5502)
Browse files Browse the repository at this point in the history
* fix(testworkflows): fetch logs properly for running services
* chore: extract 100ms as IdleTimeout constant
* chore: avoid getting stack on container logs when the pod is already stopped
  • Loading branch information
rangoo94 authored May 27, 2024
1 parent 948f881 commit f62d60b
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 9 deletions.
1 change: 1 addition & 0 deletions cmd/tcl/testworkflow-toolkit/commands/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func NewServicesCmd() *cobra.Command {
break
}
}
ctrl.StopController()

// Fail if the container has not started
if !started {
Expand Down
3 changes: 2 additions & 1 deletion cmd/tcl/testworkflow-toolkit/spawn/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func SaveLogs(ctx context.Context, clientSet kubernetes.Interface, storage artif
Timeout: ControllerTimeout,
})
if err == nil {
err = storage.SaveStream(filePath, ctrl.Logs(ctx))
err = storage.SaveStream(filePath, ctrl.Logs(ctx, false))
ctrl.StopController()
}
return filePath, err
}
Expand Down
15 changes: 12 additions & 3 deletions pkg/tcl/testworkflowstcl/testworkflowcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

initconstants "github.com/kubeshop/testkube/cmd/tcl/testworkflow-init/constants"
"github.com/kubeshop/testkube/cmd/tcl/testworkflow-init/data"
"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor"
"github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor/constants"
Expand Down Expand Up @@ -56,7 +57,7 @@ type Controller interface {
Cleanup(ctx context.Context) error
Watch(ctx context.Context) <-chan ChannelMessage[Notification]
WatchLightweight(ctx context.Context) <-chan LightweightNotification
Logs(ctx context.Context) io.Reader
Logs(ctx context.Context, follow bool) io.Reader
NodeName(ctx context.Context) (string, error)
PodIP(ctx context.Context) (string, error)
StopController()
Expand Down Expand Up @@ -271,12 +272,20 @@ func (c *controller) WatchLightweight(parentCtx context.Context) <-chan Lightwei
return ch
}

func (c *controller) Logs(parentCtx context.Context) io.Reader {
func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader {
reader, writer := io.Pipe()
go func() {
defer writer.Close()
ref := ""
for v := range c.Watch(parentCtx) {
w, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
JobEvents: c.jobEvents,
Job: c.job,
Follow: common.Ptr(follow),
})
if err != nil {
return
}
for v := range w.Channel() {
if v.Error == nil && v.Value.Log != "" {
if ref != v.Value.Ref {
ref = v.Value.Ref
Expand Down
9 changes: 7 additions & 2 deletions pkg/tcl/testworkflowstcl/testworkflowcontroller/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package testworkflowcontroller
import (
"bufio"
"context"
"errors"
"io"
"strings"
"time"
Expand All @@ -36,7 +37,7 @@ type ContainerLog struct {
Output *data.Instruction
}

func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int) Channel[ContainerLog] {
func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, namespace, podName, containerName string, bufferSize int, follow bool, pod Channel[*corev1.Pod]) Channel[ContainerLog] {
w := newChannel[ContainerLog](ctx, bufferSize)

go func() {
Expand All @@ -45,7 +46,7 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam

// Create logs stream request
req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Follow: true,
Follow: follow,
Timestamps: true,
Container: containerName,
})
Expand All @@ -58,6 +59,10 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
w.Error(err)
return
}
p := <-pod.Peek(ctx)
if p != nil && IsPodDone(p) {
w.Error(errors.New("pod is finished and there are no logs for this container"))
}
continue
}
break
Expand Down
18 changes: 17 additions & 1 deletion pkg/tcl/testworkflowstcl/testworkflowcontroller/podstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,16 @@ func newPodState(parentCtx context.Context) *podState {
<-ctx.Done()
state.mu.Lock()
defer state.mu.Unlock()
for _, c := range state.finishedCh {
for name, c := range state.finishedCh {
if c != nil {
state.finished[name] = time.Time{}
close(c)
delete(state.finishedCh, name)
}
}
for _, c := range state.prestart {
if c != nil {
c.Close()
}
}
}()
Expand Down Expand Up @@ -218,6 +225,8 @@ func (p *podState) RegisterJob(job *batchv1.Job) {
p.setQueuedAt("", job.CreationTimestamp.Time)
if job.Status.CompletionTime != nil {
p.setFinishedAt("", job.Status.CompletionTime.Time)
} else if job.DeletionTimestamp != nil {
p.setFinishedAt("", job.DeletionTimestamp.Time)
}
}

Expand All @@ -231,6 +240,13 @@ func (p *podState) PreStart(name string) <-chan ChannelMessage[podStateUpdate] {
return p.preStartWatcher(name).Channel()
}

func (p *podState) IsFinished(name string) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, ok := p.finished[name]
return ok && p.ctx.Err() == nil
}

func (p *podState) Finished(name string) chan struct{} {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/kubeshop/testkube/cmd/tcl/testworkflow-init/constants"
"github.com/kubeshop/testkube/internal/common"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/tcl/testworkflowstcl/testworkflowprocessor"
)

const (
InitContainerName = "tktw-init"
IdleTimeout = 100 * time.Millisecond
)

type WatchInstrumentedPodOptions struct {
JobEvents Channel[*corev1.Event]
Job Channel[*batchv1.Job]
Follow *bool
}

func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interface, signature []testworkflowprocessor.Signature, scheduledAt time.Time, pod Channel[*corev1.Pod], podEvents Channel[*corev1.Event], opts WatchInstrumentedPodOptions) (Channel[Notification], error) {
Expand Down Expand Up @@ -101,7 +104,8 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
}

// Watch the container logs
for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10).Channel() {
follow := common.ResolvePtr(opts.Follow, true) && !state.IsFinished(ref)
for v := range WatchContainerLogs(ctx, clientSet, podObj.Namespace, podObj.Name, ref, 10, follow, pod).Channel() {
if v.Error != nil {
s.Error(v.Error)
} else if v.Value.Output != nil {
Expand Down Expand Up @@ -139,7 +143,15 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf
}

// Get the final result
<-state.Finished(ref)
if follow {
<-state.Finished(ref)
} else {
select {
case <-state.Finished(ref):
case <-time.After(IdleTimeout):
return
}
}
status, err := state.ContainerResult(ref)
if err != nil {
s.Error(err)
Expand Down
1 change: 1 addition & 0 deletions pkg/triggers/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (s *Service) abortRunningTestWorkflowExecutions(ctx context.Context, status
// Pro edition only (tcl protected code)
// Abort the execution
err = ctrl.Abort(context.Background())
ctrl.StopController()
if err != nil {
s.logger.Errorf("trigger service: execution scraper component: error aborting test workflow execution: %v", err)
continue
Expand Down

0 comments on commit f62d60b

Please sign in to comment.