diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index a239e9a90091..3e3d8ff78fd1 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -35,7 +35,7 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {} func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {} -type Drainable interface { +type Stoppable interface { Stop(drain bool) } @@ -53,7 +53,7 @@ type StoppableClient interface { type Manager struct { name string clients []Client - walWatchers []Drainable + walWatchers []Stoppable // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface stoppableClients []StoppableClient @@ -78,7 +78,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]Drainable, 0, len(clientCfgs)) + watchers := make([]Stoppable, 0, len(clientCfgs)) stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). @@ -187,15 +187,19 @@ func (m *Manager) Chan() chan<- loki.Entry { return m.entries } +// Stop the manager, not draining the Write-Ahead Log, if that mode is enabled. func (m *Manager) Stop() { m.StopWithDrain(false) } +// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled, +// the Watchers will attempt to drain the WAL completely. func (m *Manager) StopWithDrain(drain bool) { // first stop the receiving channel m.once.Do(func() { close(m.entries) }) m.wg.Wait() - // close wal watchers + + // stop wal watchers for _, walWatcher := range m.walWatchers { walWatcher.Stop(drain) } diff --git a/component/common/loki/wal/config.go b/component/common/loki/wal/config.go index d25e2426923f..160bca2b90a9 100644 --- a/component/common/loki/wal/config.go +++ b/component/common/loki/wal/config.go @@ -52,6 +52,7 @@ type WatchConfig struct { MaxReadFrequency time.Duration // DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL. + // After that time, the Watcher is stopped immediately, dropping all the work in process. DrainTimeout time.Duration } diff --git a/component/common/loki/wal/internal/watcher_state.go b/component/common/loki/wal/internal/watcher_state.go new file mode 100644 index 000000000000..c81413dfd230 --- /dev/null +++ b/component/common/loki/wal/internal/watcher_state.go @@ -0,0 +1,88 @@ +package internal + +import ( + "sync" + + "github.com/go-kit/log" + "github.com/grafana/agent/pkg/flow/logging/level" +) + +const ( + // StateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed + // ones, and checking for new ones. + StateRunning = iota + + // StateDraining is an intermediary state between running and stopping. The watcher will attempt to consume all the data + // found in the WAL, omitting errors and assuming all segments found are "closed", that is, no longer being written. + StateDraining + + // StateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly. + StateStopping +) + +// WatcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting +// the current state, or blocking until it has stopped. +type WatcherState struct { + current int + mut sync.RWMutex + stoppingSignal chan struct{} + logger log.Logger +} + +func NewWatcherState(l log.Logger) *WatcherState { + return &WatcherState{ + current: StateRunning, + stoppingSignal: make(chan struct{}), + logger: l, + } +} + +// Transition changes the state of WatcherState to next, reacting accordingly. +func (s *WatcherState) Transition(next int) { + s.mut.Lock() + defer s.mut.Unlock() + + level.Debug(s.logger).Log("msg", "watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next)) + + // only perform channel close if the state is not already stopping + // expect s.s to be either draining ro running to perform a close + if next == StateStopping && s.current != next { + close(s.stoppingSignal) + } + + // update state + s.current = next +} + +// IsDraining evaluates to true if the current state is StateDraining. +func (s *WatcherState) IsDraining() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.current == StateDraining +} + +// IsStopping evaluates to true if the current state is StateStopping. +func (s *WatcherState) IsStopping() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.current == StateStopping +} + +// WaitForStopping returns a channel in which the called can read, effectively waiting until the state changes to stopping. +func (s *WatcherState) WaitForStopping() <-chan struct{} { + return s.stoppingSignal +} + +// printState prints a user-friendly name of the possible Watcher states. +func printState(state int) string { + switch state { + case StateRunning: + return "running" + case StateDraining: + return "draining" + case StateStopping: + return "stopping" + default: + return "unknown" + } +} diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index c4e4e6135e48..856d185d2327 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -3,11 +3,11 @@ package wal import ( "errors" "fmt" + "github.com/grafana/agent/component/common/loki/wal/internal" "io" "math" "os" "strconv" - "sync" "time" "github.com/go-kit/log" @@ -73,82 +73,6 @@ type Marker interface { LastMarkedSegment() int } -// wState represents the possible states the Watcher can be in. -type wState int64 - -const ( - // stateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed - // ones, and checking for new ones. - stateRunning wState = iota - - stateDraining - - // stateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly. - stateStopping -) - -// watcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting -// the current state, or blocking until it has stopped. -type watcherState struct { - current wState - mut sync.RWMutex - stoppingSignal chan struct{} - logger log.Logger -} - -func newWatcherState(l log.Logger) *watcherState { - return &watcherState{ - current: stateRunning, - stoppingSignal: make(chan struct{}), - logger: l, - } -} - -func printState(s wState) string { - switch s { - case stateRunning: - return "running" - case stateDraining: - return "draining" - case stateStopping: - return "stopping" - default: - return "unknown" - } -} - -func (s *watcherState) Transition(next wState) { - s.mut.Lock() - defer s.mut.Unlock() - - level.Debug(s.logger).Log("msg", "Watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next)) - - // only perform channel close if the state is not already stopping - // expect s.s to be either draining ro running to perform a close - if next == stateStopping && s.current != next { - close(s.stoppingSignal) - } - - // update state - s.current = next -} - -func (s *watcherState) Get() wState { - s.mut.RLock() - defer s.mut.RUnlock() - return s.current -} - -func (s *watcherState) IsStopping() bool { - s.mut.RLock() - defer s.mut.RUnlock() - return s.current == stateStopping -} - -func (s *watcherState) WaitForStopping() <-chan struct{} { - return s.stoppingSignal -} - type Watcher struct { // id identifies the Watcher. Used when one Watcher is instantiated per remote write client, to be able to track to whom // the metric/log line corresponds. @@ -157,7 +81,7 @@ type Watcher struct { actions WriteTo readNotify chan struct{} done chan struct{} - state *watcherState + state *internal.WatcherState walDir string logger log.Logger MaxSegment int @@ -177,7 +101,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log id: id, actions: writeTo, readNotify: make(chan struct{}), - state: newWatcherState(logger), + state: internal.NewWatcherState(logger), done: make(chan struct{}), MaxSegment: -1, marker: marker, @@ -211,11 +135,11 @@ func (w *Watcher) mainLoop() { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } - if w.state.Get() == stateDraining && errors.Is(err, os.ErrNotExist) { + if w.state.IsDraining() && errors.Is(err, os.ErrNotExist) { level.Info(w.logger).Log("msg", "Reached non existing segment while draining, assuming end of WAL") // since we've reached the end of the WAL, and the Watcher is draining, promptly transition to stopping state // so the watcher can stoppingSignal early - w.state.Transition(stateStopping) + w.state.Transition(internal.StateStopping) } select { @@ -315,11 +239,11 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { // Check if new segments exists, or we are draining the WAL, which means that either: // - This is the last segment, and we can consume it fully // - There's some other segment, and we can consume this segment fully as well - if last <= segmentNum && w.state.Get() != stateDraining { + if last <= segmentNum && !w.state.IsDraining() { continue } - if w.state.Get() == stateDraining { + if w.state.IsDraining() { level.Debug(w.logger).Log("msg", "Draining segment completely", "segment", segmentNum, "lastSegment", last) } @@ -432,7 +356,7 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { func (w *Watcher) Stop(drain bool) { if drain { level.Info(w.logger).Log("msg", "Draining Watcher") - w.state.Transition(stateDraining) + w.state.Transition(internal.StateDraining) // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly select { case <-time.NewTimer(w.drainTimeout).C: @@ -441,7 +365,7 @@ func (w *Watcher) Stop(drain bool) { } } - w.state.Transition(stateStopping) + w.state.Transition(internal.StateStopping) // upon calling stop, wait for main mainLoop execution to stop <-w.done @@ -506,16 +430,6 @@ func (w *Watcher) findNextSegmentFor(index int) (int, error) { return -1, errors.New("failed to find segment for index") } -// isClosed checks in a non-blocking manner if a channel is closed or not. -func isClosed(c chan struct{}) bool { - select { - case <-c: - return true - default: - return false - } -} - // readSegmentNumbers reads the given directory and returns all segment identifiers, that is, the index of each segment // file. func readSegmentNumbers(dir string) ([]int, error) {