From 61693df00b01ffafa66e83c9b8fd79a9b34bd95f Mon Sep 17 00:00:00 2001 From: hainenber Date: Thu, 29 Feb 2024 23:43:28 +0700 Subject: [PATCH] fix(loki/src/k8s_events): revert interface change + add background actor to set health based on applied task result Signed-off-by: hainenber --- component/loki/source/docker/runner.go | 6 ++-- .../loki/source/kubernetes/kubetail/tailer.go | 8 ++---- .../kubernetes_events/event_controller.go | 20 ++++++++++--- .../kubernetes_events/kubernetes_events.go | 23 ++++++++++++--- pkg/runner/runner.go | 28 ++++++------------- pkg/runner/runner_test.go | 4 +-- 6 files changed, 49 insertions(+), 40 deletions(-) diff --git a/component/loki/source/docker/runner.go b/component/loki/source/docker/runner.go index 069bd2d9f2b5..c010a02feef2 100644 --- a/component/loki/source/docker/runner.go +++ b/component/loki/source/docker/runner.go @@ -94,7 +94,7 @@ func newTailer(l log.Logger, task *tailerTask) *tailer { } } -func (t *tailer) Run(ctx context.Context) error { +func (t *tailer) Run(ctx context.Context) { ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit) t.target.StartIfNotRunning() @@ -108,10 +108,10 @@ func (t *tailer) Run(ctx context.Context) error { // refresh. level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err) t.target.Stop() - return err + return case <-ch: t.target.Stop() - return nil + return } } diff --git a/component/loki/source/kubernetes/kubetail/tailer.go b/component/loki/source/kubernetes/kubetail/tailer.go index ba0e10e79512..6ea2a2d9b3fb 100644 --- a/component/loki/source/kubernetes/kubetail/tailer.go +++ b/component/loki/source/kubernetes/kubetail/tailer.go @@ -89,7 +89,7 @@ var retailBackoff = backoff.Config{ MaxBackoff: time.Minute, } -func (t *tailer) Run(ctx context.Context) error { +func (t *tailer) Run(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -111,22 +111,18 @@ func (t *tailer) Run(ctx context.Context) error { terminated, err := t.containerTerminated(ctx) if terminated { // The container shut down and won't come back; we can stop tailing it. - return nil + return } else if err != nil { level.Warn(t.log).Log("msg", "could not determine if container terminated; will retry tailing", "err", err) - return err } } if err != nil { t.target.Report(time.Now().UTC(), err) level.Warn(t.log).Log("msg", "tailer stopped; will retry", "err", err) - return err } bo.Wait() } - - return nil } func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error { diff --git a/component/loki/source/kubernetes_events/event_controller.go b/component/loki/source/kubernetes_events/event_controller.go index 15857ff384ec..cb1d12fa0e5a 100644 --- a/component/loki/source/kubernetes_events/event_controller.go +++ b/component/loki/source/kubernetes_events/event_controller.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/cespare/xxhash/v2" @@ -58,6 +59,9 @@ type eventController struct { positionsKey string initTimestamp time.Time + + taskErr error + taskErrMut sync.RWMutex } func newEventController(task eventControllerTask) *eventController { @@ -79,18 +83,20 @@ func newEventController(task eventControllerTask) *eventController { } } -func (ctrl *eventController) Run(ctx context.Context) error { +func (ctrl *eventController) Run(ctx context.Context) { defer ctrl.handler.Stop() level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace) defer level.Info(ctrl.log).Log("msg", "stopping watcher for events", "namespace", ctrl.task.Namespace) - if err := ctrl.runError(ctx); err != nil { + err := ctrl.runError(ctx) + if err != nil { level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err) - return err } - return nil + ctrl.taskErrMut.Lock() + ctrl.taskErr = err + ctrl.taskErrMut.Unlock() } func (ctrl *eventController) runError(ctx context.Context) error { @@ -346,6 +352,12 @@ func (ctrl *eventController) DebugInfo() controllerInfo { } } +func (ctrl *eventController) GetTaskError() error { + ctrl.taskErrMut.RLock() + defer ctrl.taskErrMut.RUnlock() + return ctrl.taskErr +} + type controllerInfo struct { Namespace string `river:"namespace,attr"` LastTimestamp time.Time `river:"last_event_timestamp,attr"` diff --git a/component/loki/source/kubernetes_events/kubernetes_events.go b/component/loki/source/kubernetes_events/kubernetes_events.go index ca1f0684808b..1bf803371e6e 100644 --- a/component/loki/source/kubernetes_events/kubernetes_events.go +++ b/component/loki/source/kubernetes_events/kubernetes_events.go @@ -161,15 +161,30 @@ func (c *Component) Run(ctx context.Context) error { c.setHealth(err) level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err) } + } + } + }, func(_ error) { + cancel() + }) - // Check on bubbled up errors encountered by the workers when running applied tasks - // and set component health accordingly + // Actor to set component health through errors from applied tasks. + ticker := time.NewTicker(500 * time.Millisecond) + rg.Add(func() error { + for { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: appliedTaskErrorString := "" - for _, err := range c.runner.GetWorkerErrors() { - appliedTaskErrorString += err.Error() + "\n" + for _, worker := range c.runner.Workers() { + if taskError := worker.(*eventController).GetTaskError(); taskError != nil { + appliedTaskErrorString += taskError.Error() + "\n" + } } if appliedTaskErrorString != "" { c.setHealth(fmt.Errorf(appliedTaskErrorString)) + } else { + c.setHealth(nil) } } } diff --git a/pkg/runner/runner.go b/pkg/runner/runner.go index 4234f7522c5f..7f910c45105e 100644 --- a/pkg/runner/runner.go +++ b/pkg/runner/runner.go @@ -32,7 +32,7 @@ type Worker interface { // Run starts a Worker, blocking until the provided ctx is canceled or a // fatal error occurs. Run is guaranteed to be called exactly once for any // given Worker. - Run(ctx context.Context) error + Run(ctx context.Context) } // The Runner manages a set of running Workers based on an active set of tasks. @@ -42,10 +42,8 @@ type Runner[TaskType Task] struct { ctx context.Context cancel context.CancelFunc - running sync.WaitGroup - workers *hashMap - workerErrs []error - workerErrMut sync.RWMutex + running sync.WaitGroup + workers *hashMap } // Internal types used to implement the Runner. @@ -65,8 +63,9 @@ type ( // workerTask implements Task for it to be used in a hashMap; two workerTasks // are equal if their underlying Tasks are equal. workerTask struct { - Worker *scheduledWorker - Task Task + Worker *scheduledWorker + Task Task + TaskErr error } ) @@ -165,12 +164,8 @@ func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error go func() { defer s.running.Done() defer close(newWorker.Exited) - // Gather error encountered by worker when running - if err := newWorker.Worker.Run(workerCtx); err != nil { - s.workerErrMut.Lock() - s.workerErrs = append(s.workerErrs, err) - s.workerErrMut.Unlock() - } + // Gather error encountered by worker when running the defined task. + newWorker.Worker.Run(workerCtx) }() _ = s.workers.Add(newTask) @@ -210,10 +205,3 @@ func (s *Runner[TaskType]) Stop() { s.cancel() s.running.Wait() } - -// GetWorkerErrors returns errors encountered by workers when they run assigned tasks -func (s *Runner[TaskType]) GetWorkerErrors() []error { - s.workerErrMut.RLock() - defer s.workerErrMut.RUnlock() - return s.workerErrs -} diff --git a/pkg/runner/runner_test.go b/pkg/runner/runner_test.go index 726093cbf47c..ea06dcff80ca 100644 --- a/pkg/runner/runner_test.go +++ b/pkg/runner/runner_test.go @@ -101,11 +101,9 @@ type genericWorker struct { var _ runner.Worker = (*genericWorker)(nil) -func (w *genericWorker) Run(ctx context.Context) error { +func (w *genericWorker) Run(ctx context.Context) { w.workerCount.Inc() defer w.workerCount.Dec() <-ctx.Done() - - return nil }