Skip to content

Commit

Permalink
refactoring watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
thepalbi committed Nov 17, 2023
1 parent cd8d143 commit 9b2ab46
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 99 deletions.
12 changes: 8 additions & 4 deletions component/common/loki/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {}

func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {}

type Drainable interface {
type Stoppable interface {
Stop(drain bool)
}

Expand All @@ -53,7 +53,7 @@ type StoppableClient interface {
type Manager struct {
name string
clients []Client
walWatchers []Drainable
walWatchers []Stoppable

// stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface
stoppableClients []StoppableClient
Expand All @@ -78,7 +78,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr

clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(clientCfgs))
watchers := make([]Drainable, 0, len(clientCfgs))
watchers := make([]Stoppable, 0, len(clientCfgs))
stoppableClients := make([]StoppableClient, 0, len(clientCfgs))
for _, cfg := range clientCfgs {
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
Expand Down Expand Up @@ -187,15 +187,19 @@ func (m *Manager) Chan() chan<- loki.Entry {
return m.entries
}

// Stop the manager, not draining the Write-Ahead Log, if that mode is enabled.
func (m *Manager) Stop() {
m.StopWithDrain(false)
}

// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled,
// the Watchers will attempt to drain the WAL completely.
func (m *Manager) StopWithDrain(drain bool) {
// first stop the receiving channel
m.once.Do(func() { close(m.entries) })
m.wg.Wait()
// close wal watchers

// stop wal watchers
for _, walWatcher := range m.walWatchers {
walWatcher.Stop(drain)
}
Expand Down
1 change: 1 addition & 0 deletions component/common/loki/wal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type WatchConfig struct {
MaxReadFrequency time.Duration

// DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL.
// After that time, the Watcher is stopped immediately, dropping all the work in process.
DrainTimeout time.Duration
}

Expand Down
88 changes: 88 additions & 0 deletions component/common/loki/wal/internal/watcher_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package internal

import (
"sync"

"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/logging/level"
)

const (
// StateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed
// ones, and checking for new ones.
StateRunning = iota

// StateDraining is an intermediary state between running and stopping. The watcher will attempt to consume all the data
// found in the WAL, omitting errors and assuming all segments found are "closed", that is, no longer being written.
StateDraining

// StateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly.
StateStopping
)

// WatcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting
// the current state, or blocking until it has stopped.
type WatcherState struct {
current int
mut sync.RWMutex
stoppingSignal chan struct{}
logger log.Logger
}

func NewWatcherState(l log.Logger) *WatcherState {
return &WatcherState{
current: StateRunning,
stoppingSignal: make(chan struct{}),
logger: l,
}
}

// Transition changes the state of WatcherState to next, reacting accordingly.
func (s *WatcherState) Transition(next int) {
s.mut.Lock()
defer s.mut.Unlock()

level.Debug(s.logger).Log("msg", "watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next))

// only perform channel close if the state is not already stopping
// expect s.s to be either draining ro running to perform a close
if next == StateStopping && s.current != next {
close(s.stoppingSignal)
}

// update state
s.current = next
}

// IsDraining evaluates to true if the current state is StateDraining.
func (s *WatcherState) IsDraining() bool {
s.mut.RLock()
defer s.mut.RUnlock()
return s.current == StateDraining
}

// IsStopping evaluates to true if the current state is StateStopping.
func (s *WatcherState) IsStopping() bool {
s.mut.RLock()
defer s.mut.RUnlock()
return s.current == StateStopping
}

// WaitForStopping returns a channel in which the called can read, effectively waiting until the state changes to stopping.
func (s *WatcherState) WaitForStopping() <-chan struct{} {
return s.stoppingSignal
}

// printState prints a user-friendly name of the possible Watcher states.
func printState(state int) string {
switch state {
case StateRunning:
return "running"
case StateDraining:
return "draining"
case StateStopping:
return "stopping"
default:
return "unknown"
}
}
104 changes: 9 additions & 95 deletions component/common/loki/wal/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package wal
import (
"errors"
"fmt"
"github.com/grafana/agent/component/common/loki/wal/internal"
"io"
"math"
"os"
"strconv"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -73,82 +73,6 @@ type Marker interface {
LastMarkedSegment() int
}

// wState represents the possible states the Watcher can be in.
type wState int64

const (
// stateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed
// ones, and checking for new ones.
stateRunning wState = iota

stateDraining

// stateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly.
stateStopping
)

// watcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting
// the current state, or blocking until it has stopped.
type watcherState struct {
current wState
mut sync.RWMutex
stoppingSignal chan struct{}
logger log.Logger
}

func newWatcherState(l log.Logger) *watcherState {
return &watcherState{
current: stateRunning,
stoppingSignal: make(chan struct{}),
logger: l,
}
}

func printState(s wState) string {
switch s {
case stateRunning:
return "running"
case stateDraining:
return "draining"
case stateStopping:
return "stopping"
default:
return "unknown"
}
}

func (s *watcherState) Transition(next wState) {
s.mut.Lock()
defer s.mut.Unlock()

level.Debug(s.logger).Log("msg", "Watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next))

// only perform channel close if the state is not already stopping
// expect s.s to be either draining ro running to perform a close
if next == stateStopping && s.current != next {
close(s.stoppingSignal)
}

// update state
s.current = next
}

func (s *watcherState) Get() wState {
s.mut.RLock()
defer s.mut.RUnlock()
return s.current
}

func (s *watcherState) IsStopping() bool {
s.mut.RLock()
defer s.mut.RUnlock()
return s.current == stateStopping
}

func (s *watcherState) WaitForStopping() <-chan struct{} {
return s.stoppingSignal
}

type Watcher struct {
// id identifies the Watcher. Used when one Watcher is instantiated per remote write client, to be able to track to whom
// the metric/log line corresponds.
Expand All @@ -157,7 +81,7 @@ type Watcher struct {
actions WriteTo
readNotify chan struct{}
done chan struct{}
state *watcherState
state *internal.WatcherState
walDir string
logger log.Logger
MaxSegment int
Expand All @@ -177,7 +101,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log
id: id,
actions: writeTo,
readNotify: make(chan struct{}),
state: newWatcherState(logger),
state: internal.NewWatcherState(logger),
done: make(chan struct{}),
MaxSegment: -1,
marker: marker,
Expand Down Expand Up @@ -211,11 +135,11 @@ func (w *Watcher) mainLoop() {
level.Error(w.logger).Log("msg", "error tailing WAL", "err", err)
}

if w.state.Get() == stateDraining && errors.Is(err, os.ErrNotExist) {
if w.state.IsDraining() && errors.Is(err, os.ErrNotExist) {
level.Info(w.logger).Log("msg", "Reached non existing segment while draining, assuming end of WAL")
// since we've reached the end of the WAL, and the Watcher is draining, promptly transition to stopping state
// so the watcher can stoppingSignal early
w.state.Transition(stateStopping)
w.state.Transition(internal.StateStopping)
}

select {
Expand Down Expand Up @@ -315,11 +239,11 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
// Check if new segments exists, or we are draining the WAL, which means that either:
// - This is the last segment, and we can consume it fully
// - There's some other segment, and we can consume this segment fully as well
if last <= segmentNum && w.state.Get() != stateDraining {
if last <= segmentNum && !w.state.IsDraining() {
continue
}

if w.state.Get() == stateDraining {
if w.state.IsDraining() {
level.Debug(w.logger).Log("msg", "Draining segment completely", "segment", segmentNum, "lastSegment", last)
}

Expand Down Expand Up @@ -432,7 +356,7 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) {
func (w *Watcher) Stop(drain bool) {
if drain {
level.Info(w.logger).Log("msg", "Draining Watcher")
w.state.Transition(stateDraining)
w.state.Transition(internal.StateDraining)
// wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly
select {
case <-time.NewTimer(w.drainTimeout).C:
Expand All @@ -441,7 +365,7 @@ func (w *Watcher) Stop(drain bool) {
}
}

w.state.Transition(stateStopping)
w.state.Transition(internal.StateStopping)

// upon calling stop, wait for main mainLoop execution to stop
<-w.done
Expand Down Expand Up @@ -506,16 +430,6 @@ func (w *Watcher) findNextSegmentFor(index int) (int, error) {
return -1, errors.New("failed to find segment for index")
}

// isClosed checks in a non-blocking manner if a channel is closed or not.
func isClosed(c chan struct{}) bool {
select {
case <-c:
return true
default:
return false
}
}

// readSegmentNumbers reads the given directory and returns all segment identifiers, that is, the index of each segment
// file.
func readSegmentNumbers(dir string) ([]int, error) {
Expand Down

0 comments on commit 9b2ab46

Please sign in to comment.