Skip to content

Commit

Permalink
fix(loki/src/k8s_events): revert interface change + add background ac…
Browse files Browse the repository at this point in the history
…tor to set health based on applied task result

Signed-off-by: hainenber <[email protected]>
  • Loading branch information
hainenber committed Feb 29, 2024
1 parent 6152c7b commit 61693df
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 40 deletions.
6 changes: 3 additions & 3 deletions component/loki/source/docker/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func newTailer(l log.Logger, task *tailerTask) *tailer {
}
}

func (t *tailer) Run(ctx context.Context) error {
func (t *tailer) Run(ctx context.Context) {
ch, chErr := t.opts.client.ContainerWait(ctx, t.target.Name(), container.WaitConditionNextExit)

t.target.StartIfNotRunning()
Expand All @@ -108,10 +108,10 @@ func (t *tailer) Run(ctx context.Context) error {
// refresh.
level.Error(t.log).Log("msg", "could not set up a wait request to the Docker client", "error", err)
t.target.Stop()
return err
return
case <-ch:
t.target.Stop()
return nil
return
}
}

Expand Down
8 changes: 2 additions & 6 deletions component/loki/source/kubernetes/kubetail/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ var retailBackoff = backoff.Config{
MaxBackoff: time.Minute,
}

func (t *tailer) Run(ctx context.Context) error {
func (t *tailer) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -111,22 +111,18 @@ func (t *tailer) Run(ctx context.Context) error {
terminated, err := t.containerTerminated(ctx)
if terminated {
// The container shut down and won't come back; we can stop tailing it.
return nil
return
} else if err != nil {
level.Warn(t.log).Log("msg", "could not determine if container terminated; will retry tailing", "err", err)
return err
}
}

if err != nil {
t.target.Report(time.Now().UTC(), err)
level.Warn(t.log).Log("msg", "tailer stopped; will retry", "err", err)
return err
}
bo.Wait()
}

return nil
}

func (t *tailer) tail(ctx context.Context, handler loki.EntryHandler) error {
Expand Down
20 changes: 16 additions & 4 deletions component/loki/source/kubernetes_events/event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/cespare/xxhash/v2"
Expand Down Expand Up @@ -58,6 +59,9 @@ type eventController struct {

positionsKey string
initTimestamp time.Time

taskErr error
taskErrMut sync.RWMutex
}

func newEventController(task eventControllerTask) *eventController {
Expand All @@ -79,18 +83,20 @@ func newEventController(task eventControllerTask) *eventController {
}
}

func (ctrl *eventController) Run(ctx context.Context) error {
func (ctrl *eventController) Run(ctx context.Context) {
defer ctrl.handler.Stop()

level.Info(ctrl.log).Log("msg", "watching events for namespace", "namespace", ctrl.task.Namespace)
defer level.Info(ctrl.log).Log("msg", "stopping watcher for events", "namespace", ctrl.task.Namespace)

if err := ctrl.runError(ctx); err != nil {
err := ctrl.runError(ctx)
if err != nil {
level.Error(ctrl.log).Log("msg", "event watcher exited with error", "err", err)
return err
}

return nil
ctrl.taskErrMut.Lock()
ctrl.taskErr = err
ctrl.taskErrMut.Unlock()
}

func (ctrl *eventController) runError(ctx context.Context) error {
Expand Down Expand Up @@ -346,6 +352,12 @@ func (ctrl *eventController) DebugInfo() controllerInfo {
}
}

func (ctrl *eventController) GetTaskError() error {
ctrl.taskErrMut.RLock()
defer ctrl.taskErrMut.RUnlock()
return ctrl.taskErr
}

type controllerInfo struct {
Namespace string `river:"namespace,attr"`
LastTimestamp time.Time `river:"last_event_timestamp,attr"`
Expand Down
23 changes: 19 additions & 4 deletions component/loki/source/kubernetes_events/kubernetes_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,30 @@ func (c *Component) Run(ctx context.Context) error {
c.setHealth(err)
level.Error(c.log).Log("msg", "failed to apply event watchers", "err", err)
}
}
}
}, func(_ error) {
cancel()
})

// Check on bubbled up errors encountered by the workers when running applied tasks
// and set component health accordingly
// Actor to set component health through errors from applied tasks.
ticker := time.NewTicker(500 * time.Millisecond)
rg.Add(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
appliedTaskErrorString := ""
for _, err := range c.runner.GetWorkerErrors() {
appliedTaskErrorString += err.Error() + "\n"
for _, worker := range c.runner.Workers() {
if taskError := worker.(*eventController).GetTaskError(); taskError != nil {
appliedTaskErrorString += taskError.Error() + "\n"
}
}
if appliedTaskErrorString != "" {
c.setHealth(fmt.Errorf(appliedTaskErrorString))
} else {
c.setHealth(nil)
}
}
}
Expand Down
28 changes: 8 additions & 20 deletions pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Worker interface {
// Run starts a Worker, blocking until the provided ctx is canceled or a
// fatal error occurs. Run is guaranteed to be called exactly once for any
// given Worker.
Run(ctx context.Context) error
Run(ctx context.Context)
}

// The Runner manages a set of running Workers based on an active set of tasks.
Expand All @@ -42,10 +42,8 @@ type Runner[TaskType Task] struct {
ctx context.Context
cancel context.CancelFunc

running sync.WaitGroup
workers *hashMap
workerErrs []error
workerErrMut sync.RWMutex
running sync.WaitGroup
workers *hashMap
}

// Internal types used to implement the Runner.
Expand All @@ -65,8 +63,9 @@ type (
// workerTask implements Task for it to be used in a hashMap; two workerTasks
// are equal if their underlying Tasks are equal.
workerTask struct {
Worker *scheduledWorker
Task Task
Worker *scheduledWorker
Task Task
TaskErr error
}
)

Expand Down Expand Up @@ -165,12 +164,8 @@ func (s *Runner[TaskType]) ApplyTasks(ctx context.Context, tt []TaskType) error
go func() {
defer s.running.Done()
defer close(newWorker.Exited)
// Gather error encountered by worker when running
if err := newWorker.Worker.Run(workerCtx); err != nil {
s.workerErrMut.Lock()
s.workerErrs = append(s.workerErrs, err)
s.workerErrMut.Unlock()
}
// Gather error encountered by worker when running the defined task.
newWorker.Worker.Run(workerCtx)
}()

_ = s.workers.Add(newTask)
Expand Down Expand Up @@ -210,10 +205,3 @@ func (s *Runner[TaskType]) Stop() {
s.cancel()
s.running.Wait()
}

// GetWorkerErrors returns errors encountered by workers when they run assigned tasks
func (s *Runner[TaskType]) GetWorkerErrors() []error {
s.workerErrMut.RLock()
defer s.workerErrMut.RUnlock()
return s.workerErrs
}
4 changes: 1 addition & 3 deletions pkg/runner/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,9 @@ type genericWorker struct {

var _ runner.Worker = (*genericWorker)(nil)

func (w *genericWorker) Run(ctx context.Context) error {
func (w *genericWorker) Run(ctx context.Context) {
w.workerCount.Inc()
defer w.workerCount.Dec()

<-ctx.Done()

return nil
}

0 comments on commit 61693df

Please sign in to comment.