Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(performance): improve Log Processing performance #5647

Merged
5 changes: 5 additions & 0 deletions pkg/testworkflows/testworkflowcontroller/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Channel[T any] interface {
Channel() <-chan ChannelMessage[T]
Close()
Done() <-chan struct{}
CtxErr() error
}

type channel[T any] struct {
Expand Down Expand Up @@ -168,3 +169,7 @@ func (c *channel[T]) Close() {
func (c *channel[T]) Done() <-chan struct{} {
return c.ctx.Done()
}

func (c *channel[T]) CtxErr() error {
return c.ctx.Err()
}
8 changes: 4 additions & 4 deletions pkg/testworkflows/testworkflowcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (c *controller) StopController() {
}

func (c *controller) Watch(parentCtx context.Context) <-chan ChannelMessage[Notification] {
w, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
JobEvents: c.jobEvents,
Job: c.job,
})
Expand All @@ -222,7 +222,7 @@ func (c *controller) Watch(parentCtx context.Context) <-chan ChannelMessage[Noti
v.Close()
return v.Channel()
}
return w.Channel()
return ch
}

// TODO: Make it actually light
Expand Down Expand Up @@ -281,15 +281,15 @@ func (c *controller) Logs(parentCtx context.Context, follow bool) io.Reader {
case <-c.podEvents.Peek(parentCtx):
case <-alignTimeoutCh:
}
w, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
ch, err := WatchInstrumentedPod(parentCtx, c.clientSet, c.signature, c.scheduledAt, c.pod, c.podEvents, WatchInstrumentedPodOptions{
JobEvents: c.jobEvents,
Job: c.job,
Follow: common.Ptr(follow),
})
if err != nil {
return
}
for v := range w.Channel() {
for v := range ch {
if v.Error == nil && v.Value.Log != "" && !v.Value.Temporary {
if ref != v.Value.Ref && v.Value.Ref != "" {
ref = v.Value.Ref
Expand Down
Loading
Loading