Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(performance): improve Log Processing performance #5647

Merged
5 changes: 5 additions & 0 deletions pkg/testworkflows/testworkflowcontroller/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Channel[T any] interface {
Channel() <-chan ChannelMessage[T]
Close()
Done() <-chan struct{}
CtxErr() error
}

type channel[T any] struct {
Expand Down Expand Up @@ -168,3 +169,7 @@ func (c *channel[T]) Close() {
func (c *channel[T]) Done() <-chan struct{} {
return c.ctx.Done()
}

func (c *channel[T]) CtxErr() error {
return c.ctx.Err()
}
113 changes: 109 additions & 4 deletions pkg/testworkflows/testworkflowcontroller/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,29 @@ package testworkflowcontroller

import (
"bufio"
"bytes"
"context"
"errors"
"io"
"strings"
"sync"
"time"

errors2 "github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"

"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
"github.com/kubeshop/testkube/pkg/log"
"github.com/kubeshop/testkube/pkg/utils"
)

const (
FlushLogMaxSize = 65_536
FlushLogTime = 50 * time.Millisecond
FlushLogMaxTime = 100 * time.Millisecond
)

type Comment struct {
Time time.Time
Hint *data.Instruction
Expand Down Expand Up @@ -65,6 +74,96 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
_ = stream.Close()
}()

// Build a buffer for logs to avoid scheduling Log notification for each write
var logBufferLog bytes.Buffer
var logBufferTs time.Time
var logBufferMu sync.Mutex
var logBufferCh = make(chan struct{}, 1)
defer close(logBufferCh)
unsafeFlushLogBuffer := func() {
if logBufferLog.Len() == 0 {
return
}
message := make([]byte, logBufferLog.Len())
_, err := logBufferLog.Read(message)
if err != nil {
log.DefaultLogger.Errorf("failed to read log buffer: %s/%s", podName, containerName)
return
}
w.Send(ContainerLog{Time: logBufferTs, Log: message})

}
flushLogBuffer := func() {
logBufferMu.Lock()
defer logBufferMu.Unlock()
unsafeFlushLogBuffer()
}
appendLog := func(ts time.Time, log []byte) {
if len(log) == 0 {
return
}
logBufferMu.Lock()
defer logBufferMu.Unlock()
if logBufferLog.Len() == 0 {
logBufferTs = ts
}
logBufferLog.Write(log)

// Inform the flushing worker about a new log to flush
select {
case logBufferCh <- struct{}{}:
default:
}
}
defer flushLogBuffer()

// Flush the log automatically when expected
bufferCtx, bufferCtxCancel := context.WithCancel(ctx)
defer bufferCtxCancel()
go func() {
flushLogTimer := time.NewTimer(FlushLogMaxTime)
flushLogTimerEnabled := false

for {
if bufferCtx.Err() != nil {
return
}

logLen := logBufferLog.Len()

if logLen > FlushLogMaxSize {
flushLogBuffer()
continue
}

if logLen == 0 {
flushLogTimerEnabled = false
select {
case <-bufferCtx.Done():
return
case <-logBufferCh:
continue
}
}

if !flushLogTimerEnabled {
flushLogTimerEnabled = true
flushLogTimer.Reset(FlushLogMaxTime)
}

select {
case <-bufferCtx.Done():
return
case <-flushLogTimer.C:
flushLogBuffer()
case <-time.After(FlushLogTime):
flushLogBuffer()
case <-logBufferCh:
continue
}
}
}()

// Parse and return the logs
reader := bufio.NewReader(stream)
var tsPrefix, tmpTsPrefix []byte
Expand All @@ -86,6 +185,7 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
if len(tmpTsPrefix) > 0 {
prepend = tmpTsPrefix
}
flushLogBuffer()
w.Error(err)
}

Expand All @@ -110,27 +210,32 @@ func WatchContainerLogs(ctx context.Context, clientSet kubernetes.Interface, nam
} else {
log.Output = instruction
}
flushLogBuffer()
w.Send(log)
}

// Append as regular log if expected
if !hadComment {
if !isStarted {
line = append(tsPrefix, line...)
appendLog(ts, tsPrefix)
appendLog(ts, line)
isStarted = true
} else if isNewLine {
line = append(append([]byte("\n"), tsPrefix...), line...)
appendLog(ts, []byte("\n"))
appendLog(ts, tsPrefix)
appendLog(ts, line)
}
w.Send(ContainerLog{Time: ts, Log: line})
isNewLine = true
}
} else if isStarted {
w.Send(ContainerLog{Time: ts, Log: append([]byte("\n"), tsPrefix...)})
appendLog(ts, []byte("\n"))
appendLog(ts, tsPrefix)
}

// Handle the error
if err != nil {
if err != io.EOF {
flushLogBuffer()
w.Error(err)
}
return
Expand Down
71 changes: 70 additions & 1 deletion pkg/testworkflows/testworkflowcontroller/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testworkflowcontroller
import (
"context"
"fmt"
"sync"
"time"

"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
Expand All @@ -12,12 +13,21 @@ import (
"github.com/kubeshop/testkube/pkg/ui"
)

const (
FlushResultTime = 50 * time.Millisecond
FlushResultMaxTime = 100 * time.Millisecond
)

type notifier struct {
watcher *channel[Notification]
result testkube.TestWorkflowResult
sig []testkube.TestWorkflowSignature
scheduledAt time.Time
lastTs map[string]time.Time

resultMu sync.Mutex
resultCh chan struct{}
resultScheduled bool
}

func (n *notifier) GetLastTimestamp(ref string) time.Time {
Expand All @@ -40,12 +50,68 @@ func (n *notifier) RegisterTimestamp(ref string, t time.Time) {
}
}

func (n *notifier) Flush() {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if !n.resultScheduled {
return
}
n.watcher.Send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()})
n.resultScheduled = false
}

func (n *notifier) scheduleFlush() {
n.resultMu.Lock()
defer n.resultMu.Unlock()

// Inform existing scheduler about the next result
if n.resultScheduled {
select {
case n.resultCh <- struct{}{}:
default:
}
return
}

// Run the scheduler
n.resultScheduled = true
go func() {
flushTimer := time.NewTimer(FlushResultMaxTime)
flushTimerEnabled := false

for {
if n.watcher.CtxErr() != nil {
return
}

select {
case <-n.watcher.Done():
n.Flush()
return
case <-flushTimer.C:
n.Flush()
flushTimerEnabled = false
case <-time.After(FlushResultTime):
n.Flush()
flushTimerEnabled = false
case <-n.resultCh:
if !flushTimerEnabled {
flushTimerEnabled = true
flushTimer.Reset(FlushResultMaxTime)
}
continue
}
}
}()
}

func (n *notifier) Raw(ref string, ts time.Time, message string, temporary bool) {
if message != "" {
if ref == InitContainerName {
ref = ""
}
// TODO: use timestamp from the message too for lastTs?
n.Flush()
n.watcher.Send(Notification{
Timestamp: ts.UTC(),
Log: message,
Expand Down Expand Up @@ -92,7 +158,7 @@ func (n *notifier) recompute() {

func (n *notifier) emit() {
n.recompute()
n.watcher.Send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()})
n.scheduleFlush()
}

func (n *notifier) queue(ts time.Time) {
Expand Down Expand Up @@ -184,6 +250,7 @@ func (n *notifier) Output(ref string, ts time.Time, output *data.Instruction) {
return
}
n.RegisterTimestamp(ref, ts)
n.Flush()
n.watcher.Send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output})
}

Expand Down Expand Up @@ -276,5 +343,7 @@ func newNotifier(ctx context.Context, signature []testworkflowprocessor.Signatur
scheduledAt: scheduledAt,
result: result,
lastTs: make(map[string]time.Time),

resultCh: make(chan struct{}, 1),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Start watching
go func() {
defer ctxCancel()
defer func() {
s.Flush()
ctxCancel()
}()

// Watch for the basic initialization warnings
for v := range state.PreStart("") {
Expand Down
Loading