Skip to content

Commit

Permalink
feat(performance): buffer the logs sent from the container, to avoid …
Browse files Browse the repository at this point in the history
…sending message for each line
  • Loading branch information
rangoo94 committed Jul 9, 2024
1 parent 56b2224 commit b131a55
Showing 1 changed file with 109 additions and 4 deletions.
113 changes: 109 additions & 4 deletions pkg/testworkflows/testworkflowcontroller/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,29 @@ package testworkflowcontroller

import (
"bufio"
"bytes"
"context"
"errors"
"io"
"strings"
"sync"
"time"

errors2 "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/utils"
)

const (
FlushLogMaxSize = 65_536
FlushLogTime = 50 * time.Millisecond
FlushLogMaxTime = 100 * time.Millisecond
)

type Comment struct {
Time time.Time
Hint *data.Instruction
Expand Down Expand Up @@ -65,6 +74,96 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
_ = stream.Close()
}()

// Build a buffer for logs to avoid scheduling Log notification for each write
var logBufferLog bytes.Buffer
var logBufferTs time.Time
var logBufferMu sync.Mutex
var logBufferCh = make(chan struct{}, 1)
defer close(logBufferCh)
unsafeFlushLogBuffer := func() {
if logBufferLog.Len() == 0 {
return
}
message := make([]byte, logBufferLog.Len())
_, err := logBufferLog.Read(message)
if err != nil {
log.DefaultLogger.Errorf("failed to read log buffer: %s/%s", podName, containerName)
return
}
w.Send(ContainerLog{Time: logBufferTs, Log: message})

}
flushLogBuffer := func() {
logBufferMu.Lock()
defer logBufferMu.Unlock()
unsafeFlushLogBuffer()
}
appendLog := func(ts time.Time, log []byte) {
if len(log) == 0 {
return
}
logBufferMu.Lock()
defer logBufferMu.Unlock()
if logBufferLog.Len() == 0 {
logBufferTs = ts
}
logBufferLog.Write(log)

// Inform the flushing worker about a new log to flush
select {
case logBufferCh <- struct{}{}:
default:
}
}
defer flushLogBuffer()

// Flush the log automatically when expected
bufferCtx, bufferCtxCancel := context.WithCancel(ctx)
defer bufferCtxCancel()
go func() {
flushLogTimer := time.NewTimer(FlushLogMaxTime)
flushLogTimerEnabled := false

for {
if bufferCtx.Err() != nil {
return
}

logLen := logBufferLog.Len()

if logLen > FlushLogMaxSize {
flushLogBuffer()
continue
}

if logLen == 0 {
flushLogTimerEnabled = false
select {
case <-bufferCtx.Done():
return
case <-logBufferCh:
continue
}
}

if !flushLogTimerEnabled {
flushLogTimerEnabled = true
flushLogTimer.Reset(FlushLogMaxTime)
}

select {
case <-bufferCtx.Done():
return
case <-flushLogTimer.C:
flushLogBuffer()
case <-time.After(FlushLogTime):
flushLogBuffer()
case <-logBufferCh:
continue
}
}
}()

// Parse and return the logs
reader := bufio.NewReader(stream)
var tsPrefix, tmpTsPrefix []byte
Expand All @@ -86,6 +185,7 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
if len(tmpTsPrefix) > 0 {
prepend = tmpTsPrefix
}
flushLogBuffer()
w.Error(err)
}

Expand All @@ -110,27 +210,32 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
} else {
log.Output = instruction
}
flushLogBuffer()
w.Send(log)
}

// Append as regular log if expected
if !hadComment {
if !isStarted {
line = append(tsPrefix, line...)
appendLog(ts, tsPrefix)
appendLog(ts, line)
isStarted = true
} else if isNewLine {
line = append(append([]byte("\n"), tsPrefix...), line...)
appendLog(ts, []byte("\n"))
appendLog(ts, tsPrefix)
appendLog(ts, line)
}
w.Send(ContainerLog{Time: ts, Log: line})
isNewLine = true
}
} else if isStarted {
w.Send(ContainerLog{Time: ts, Log: append([]byte("\n"), tsPrefix...)})
appendLog(ts, []byte("\n"))
appendLog(ts, tsPrefix)
}

// Handle the error
if err != nil {
if err != io.EOF {
flushLogBuffer()
w.Error(err)
}
return
Expand Down

0 comments on commit b131a55

Please sign in to comment.