Skip to content

Commit

Permalink
feat(performance): batch Test Workflow's result updates
Browse files Browse the repository at this point in the history
  • Loading branch information
rangoo94 committed Jul 9, 2024
1 parent b131a55 commit fdc56a0
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 2 deletions.
5 changes: 5 additions & 0 deletions pkg/testworkflows/testworkflowcontroller/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Channel[T any] interface {
Channel() <-chan ChannelMessage[T]
Close()
Done() <-chan struct{}
CtxErr() error
}

type channel[T any] struct {
Expand Down Expand Up @@ -168,3 +169,7 @@ func (c *channel[T]) Close() {
func (c *channel[T]) Done() <-chan struct{} {
return c.ctx.Done()
}

func (c *channel[T]) CtxErr() error {
return c.ctx.Err()
}
71 changes: 70 additions & 1 deletion pkg/testworkflows/testworkflowcontroller/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package testworkflowcontroller
import (
"context"
"fmt"
"sync"
"time"

"github.com/kubeshop/testkube/cmd/testworkflow-init/data"
Expand All @@ -12,12 +13,21 @@ import (
"github.com/kubeshop/testkube/pkg/ui"
)

const (
FlushResultTime = 50 * time.Millisecond
FlushResultMaxTime = 100 * time.Millisecond
)

type notifier struct {
watcher *channel[Notification]
result testkube.TestWorkflowResult
sig []testkube.TestWorkflowSignature
scheduledAt time.Time
lastTs map[string]time.Time

resultMu sync.Mutex
resultCh chan struct{}
resultScheduled bool
}

func (n *notifier) GetLastTimestamp(ref string) time.Time {
Expand All @@ -40,12 +50,68 @@ func (n *notifier) RegisterTimestamp(ref string, t time.Time) {
}
}

func (n *notifier) Flush() {
n.resultMu.Lock()
defer n.resultMu.Unlock()
if !n.resultScheduled {
return
}
n.watcher.Send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()})
n.resultScheduled = false
}

func (n *notifier) scheduleFlush() {
n.resultMu.Lock()
defer n.resultMu.Unlock()

// Inform existing scheduler about the next result
if n.resultScheduled {
select {
case n.resultCh <- struct{}{}:
default:
}
return
}

// Run the scheduler
n.resultScheduled = true
go func() {
flushTimer := time.NewTimer(FlushResultMaxTime)
flushTimerEnabled := false

for {
if n.watcher.CtxErr() != nil {
return
}

select {
case <-n.watcher.Done():
n.Flush()
return
case <-flushTimer.C:
n.Flush()
flushTimerEnabled = false
case <-time.After(FlushResultTime):
n.Flush()
flushTimerEnabled = false
case <-n.resultCh:
if !flushTimerEnabled {
flushTimerEnabled = true
flushTimer.Reset(FlushResultMaxTime)
}
continue
}
}
}()
}

func (n *notifier) Raw(ref string, ts time.Time, message string, temporary bool) {
if message != "" {
if ref == InitContainerName {
ref = ""
}
// TODO: use timestamp from the message too for lastTs?
n.Flush()
n.watcher.Send(Notification{
Timestamp: ts.UTC(),
Log: message,
Expand Down Expand Up @@ -92,7 +158,7 @@ func (n *notifier) recompute() {

func (n *notifier) emit() {
n.recompute()
n.watcher.Send(Notification{Timestamp: n.result.LatestTimestamp(), Result: n.result.Clone()})
n.scheduleFlush()
}

func (n *notifier) queue(ts time.Time) {
Expand Down Expand Up @@ -184,6 +250,7 @@ func (n *notifier) Output(ref string, ts time.Time, output *data.Instruction) {
return
}
n.RegisterTimestamp(ref, ts)
n.Flush()
n.watcher.Send(Notification{Timestamp: ts.UTC(), Ref: ref, Output: output})
}

Expand Down Expand Up @@ -276,5 +343,7 @@ func newNotifier(ctx context.Context, signature []testworkflowprocessor.Signatur
scheduledAt: scheduledAt,
result: result,
lastTs: make(map[string]time.Time),

resultCh: make(chan struct{}, 1),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ func WatchInstrumentedPod(parentCtx context.Context, clientSet kubernetes.Interf

// Start watching
go func() {
defer ctxCancel()
defer func() {
s.Flush()
ctxCancel()
}()

// Watch for the basic initialization warnings
for v := range state.PreStart("") {
Expand Down

0 comments on commit fdc56a0

Please sign in to comment.