From 3e4f766b0acc3e9bbc0d4262490972b99452023c Mon Sep 17 00:00:00 2001 From: Dawid Rusnak Date: Tue, 9 Jul 2024 11:03:01 +0200 Subject: [PATCH] feat(performance): buffer the logs sent from the container, to avoid sending message for each line --- .../testworkflowcontroller/logs.go | 113 +++++++++++++++++- 1 file changed, 109 insertions(+), 4 deletions(-) diff --git a/pkg/testworkflows/testworkflowcontroller/logs.go b/pkg/testworkflows/testworkflowcontroller/logs.go index 15d9072d191..e1f5016875d 100644 --- a/pkg/testworkflows/testworkflowcontroller/logs.go +++ b/pkg/testworkflows/testworkflowcontroller/logs.go @@ -2,10 +2,12 @@ package testworkflowcontroller import ( "bufio" + "bytes" "context" "errors" "io" "strings" + "sync" "time" errors2 "github.com/pkg/errors" @@ -13,9 +15,16 @@ import ( "k8s.io/client-go/kubernetes" "github.com/kubeshop/testkube/cmd/testworkflow-init/data" + "github.com/kubeshop/testkube/pkg/log" "github.com/kubeshop/testkube/pkg/utils" ) +const ( + FlushLogMaxSize = 65_536 + FlushLogTime = 50 * time.Millisecond + FlushLogMaxTime = 100 * time.Millisecond +) + type Comment struct { Time time.Time Hint *data.Instruction @@ -65,6 +74,96 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam _ = stream.Close() }() + // Build a buffer for logs to avoid scheduling Log notification for each write + var logBufferLog bytes.Buffer + var logBufferTs time.Time + var logBufferMu sync.Mutex + var logBufferCh = make(chan struct{}, 1) + defer close(logBufferCh) + unsafeFlushLogBuffer := func() { + if logBufferLog.Len() == 0 { + return + } + message := make([]byte, logBufferLog.Len()) + _, err := logBufferLog.Read(message) + if err != nil { + log.DefaultLogger.Errorf("failed to read log buffer: %s/%s", podName, containerName) + return + } + w.Send(ContainerLog{Time: logBufferTs, Log: message}) + + } + flushLogBuffer := func() { + logBufferMu.Lock() + defer logBufferMu.Unlock() + unsafeFlushLogBuffer() + } + appendLog := func(ts time.Time, log []byte) { + if len(log) == 0 { + return + } + logBufferMu.Lock() + logBufferMu.Unlock() + if logBufferLog.Len() == 0 { + logBufferTs = ts + } + logBufferLog.Write(log) + + // Inform the flushing worker about a new log to flush + select { + case logBufferCh <- struct{}{}: + default: + } + } + defer flushLogBuffer() + + // Flush the log automatically when expected + bufferCtx, bufferCtxCancel := context.WithCancel(ctx) + defer bufferCtxCancel() + go func() { + flushLogTimer := time.NewTimer(FlushLogMaxTime) + flushLogTimerEnabled := false + + for { + if bufferCtx.Err() != nil { + return + } + + logLen := logBufferLog.Len() + + if logLen > FlushLogMaxSize { + flushLogBuffer() + continue + } + + if logLen == 0 { + flushLogTimerEnabled = false + select { + case <-bufferCtx.Done(): + return + case <-logBufferCh: + continue + } + } + + if !flushLogTimerEnabled { + flushLogTimerEnabled = true + flushLogTimer.Reset(FlushLogMaxTime) + } + + select { + case <-bufferCtx.Done(): + return + case <-flushLogTimer.C: + flushLogBuffer() + case <-time.After(FlushLogTime): + flushLogBuffer() + case <-logBufferCh: + continue + } + } + }() + // Parse and return the logs reader := bufio.NewReader(stream) var tsPrefix, tmpTsPrefix []byte @@ -86,6 +185,7 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam if len(tmpTsPrefix) > 0 { prepend = tmpTsPrefix } + flushLogBuffer() w.Error(err) } @@ -110,27 +210,32 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam } else { log.Output = instruction } + flushLogBuffer() w.Send(log) } // Append as regular log if expected if !hadComment { if !isStarted { - line = append(tsPrefix, line...) + appendLog(ts, tsPrefix) + appendLog(ts, line) isStarted = true } else if isNewLine { - line = append(append([]byte("\n"), tsPrefix...), line...) + appendLog(ts, []byte("\n")) + appendLog(ts, tsPrefix) + appendLog(ts, line) } - w.Send(ContainerLog{Time: ts, Log: line}) isNewLine = true } } else if isStarted { - w.Send(ContainerLog{Time: ts, Log: append([]byte("\n"), tsPrefix...)}) + appendLog(ts, []byte("\n")) + appendLog(ts, tsPrefix) } // Handle the error if err != nil { if err != io.EOF { + flushLogBuffer() w.Error(err) } return