Skip to content

Commit

Permalink
Add graceful shutdown to loki.write draining the WAL (grafana#5804)
Browse files Browse the repository at this point in the history
* drain test and routing

* refactoring watcher to use state

* drain working

* fine-tuned test case

* added short timeout test

* prompt exit test

* prompt exit passing

* refactoring watcher

* some comments

* map river configs

* add docs

* splitting apart Stop and Drain

* minimize manager stop time
  • Loading branch information
thepalbi authored and BarunKGP committed Feb 20, 2024
1 parent 866bc45 commit c067af3
Show file tree
Hide file tree
Showing 11 changed files with 463 additions and 57 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Main (unreleased)

- The `remote.http` component can optionally define a request body. (@tpaschalis)

- Added support for `loki.write` to flush WAL on agent shutdown. (@thepalbi)

### Bugfixes

- Update `pyroscope.ebpf` to fix a logical bug causing to profile to many kthreads instead of regular processes https://github.com/grafana/pyroscope/pull/2778 (@korniltsev)
Expand Down
88 changes: 64 additions & 24 deletions component/common/loki/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,48 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {}

func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {}

type Stoppable interface {
type StoppableWatcher interface {
Stop()
Drain()
}

type StoppableClient interface {
Stop()
StopNow()
}

// watcherClientPair represents a pair of watcher and client, which are coupled together, or just a single client.
type watcherClientPair struct {
watcher StoppableWatcher
client StoppableClient
}

// Stop will proceed to stop, in order, the possibly-nil watcher and the client.
func (p watcherClientPair) Stop(drain bool) {
// if the config has WAL disabled, there will be no watcher per client config
if p.watcher != nil {
// if drain enabled, drain the WAL
if drain {
p.watcher.Drain()
}
p.watcher.Stop()
}

// subsequently stop the client
p.client.Stop()
}

// Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry
// from the scrape targets, to the remote write clients themselves.
//
// Right now it just supports instantiating the WAL writer side of the future-to-be WAL enabled client. In follow-up
// work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client
// types: Logger, Multi and WAL.
type Manager struct {
name string
clients []Client
walWatchers []Stoppable
name string

// stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface
stoppableClients []StoppableClient
clients []Client
pairs []watcherClientPair

entries chan loki.Entry
once sync.Once
Expand All @@ -78,8 +98,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr

clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(clientCfgs))
watchers := make([]Stoppable, 0, len(clientCfgs))
stoppableClients := make([]StoppableClient, 0, len(clientCfgs))
pairs := make([]watcherClientPair, 0, len(clientCfgs))
for _, cfg := range clientCfgs {
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
clientName := GetClientName(cfg)
Expand All @@ -103,7 +122,6 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
if err != nil {
return nil, fmt.Errorf("error starting queue client: %w", err)
}
stoppableClients = append(stoppableClients, queue)

// subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo
// series cache whenever a segment is deleted.
Expand All @@ -116,22 +134,27 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName)
watcher.Start()

watchers = append(watchers, watcher)
pairs = append(pairs, watcherClientPair{
watcher: watcher,
client: queue,
})
} else {
client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger)
if err != nil {
return nil, fmt.Errorf("error starting client: %w", err)
}

clients = append(clients, client)
stoppableClients = append(stoppableClients, client)

pairs = append(pairs, watcherClientPair{
client: client,
})
}
}
manager := &Manager{
clients: clients,
stoppableClients: stoppableClients,
walWatchers: watchers,
entries: make(chan loki.Entry),
clients: clients,
pairs: pairs,
entries: make(chan loki.Entry),
}
if walCfg.Enabled {
manager.name = buildManagerName("wal", clientCfgs...)
Expand Down Expand Up @@ -174,8 +197,8 @@ func (m *Manager) startWithForward() {
}

func (m *Manager) StopNow() {
for _, c := range m.stoppableClients {
c.StopNow()
for _, pair := range m.pairs {
pair.client.StopNow()
}
}

Expand All @@ -187,18 +210,35 @@ func (m *Manager) Chan() chan<- loki.Entry {
return m.entries
}

// Stop the manager, not draining the Write-Ahead Log, if that mode is enabled.
func (m *Manager) Stop() {
m.StopWithDrain(false)
}

// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled,
// the Watchers will attempt to drain the WAL completely.
// The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then
// the clients are shut down accordingly.
func (m *Manager) StopWithDrain(drain bool) {
// first stop the receiving channel
m.once.Do(func() { close(m.entries) })
m.wg.Wait()
// close wal watchers
for _, walWatcher := range m.walWatchers {
walWatcher.Stop()
}
// close clients
for _, c := range m.stoppableClients {
c.Stop()

var stopWG sync.WaitGroup

// Depending on whether drain is enabled, the maximum time stopping a watcher and it's client can take is
// the drain time of the watcher + drain time client. To minimize this, and since we keep a separate WAL for each
// client config, each (watcher, client) pair is stopped concurrently.
for _, pair := range m.pairs {
stopWG.Add(1)
go func(pair watcherClientPair) {
defer stopWG.Done()
pair.Stop(drain)
}(pair)
}

// wait for all pairs to be stopped
stopWG.Wait()
}

// GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config,
Expand Down
4 changes: 2 additions & 2 deletions component/common/loki/client/queue_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// StoppableWriteTo is a mixing of the WAL's WriteTo interface, that is Stoppable as well.
type StoppableWriteTo interface {
agentWal.WriteTo
Stoppable
Stop()
StopNow()
}

Expand All @@ -38,7 +38,7 @@ type StoppableWriteTo interface {
type MarkerHandler interface {
UpdateReceivedData(segmentId, dataCount int) // Data queued for sending
UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending
Stoppable
Stop()
}

// queuedBatch is a batch specific to a tenant, that is considered ready to be sent.
Expand Down
7 changes: 6 additions & 1 deletion component/common/loki/wal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ const (

// DefaultWatchConfig is the opinionated defaults for operating the Watcher.
var DefaultWatchConfig = WatchConfig{
MinReadFrequency: time.Millisecond * 250,
MinReadFrequency: 250 * time.Millisecond,
MaxReadFrequency: time.Second,
DrainTimeout: 15 * time.Second,
}

// Config contains all WAL-related settings.
Expand Down Expand Up @@ -49,6 +50,10 @@ type WatchConfig struct {
// MaxReadFrequency controls the maximum read frequency the Watcher polls the WAL for new records. As mentioned above
// it caps the polling frequency to a maximum, to prevent to exponential backoff from making it too high.
MaxReadFrequency time.Duration

// DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL.
// After that time, the Watcher is stopped immediately, dropping all the work in process.
DrainTimeout time.Duration
}

// UnmarshalYAML implement YAML Unmarshaler
Expand Down
88 changes: 88 additions & 0 deletions component/common/loki/wal/internal/watcher_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package internal

import (
"sync"

"github.com/go-kit/log"
"github.com/grafana/agent/pkg/flow/logging/level"
)

const (
// StateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed
// ones, and checking for new ones.
StateRunning = iota

// StateDraining is an intermediary state between running and stopping. The watcher will attempt to consume all the data
// found in the WAL, omitting errors and assuming all segments found are "closed", that is, no longer being written.
StateDraining

// StateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly.
StateStopping
)

// WatcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting
// the current state, or blocking until it has stopped.
type WatcherState struct {
current int
mut sync.RWMutex
stoppingSignal chan struct{}
logger log.Logger
}

func NewWatcherState(l log.Logger) *WatcherState {
return &WatcherState{
current: StateRunning,
stoppingSignal: make(chan struct{}),
logger: l,
}
}

// Transition changes the state of WatcherState to next, reacting accordingly.
func (s *WatcherState) Transition(next int) {
s.mut.Lock()
defer s.mut.Unlock()

level.Debug(s.logger).Log("msg", "watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next))

// only perform channel close if the state is not already stopping
// expect s.s to be either draining ro running to perform a close
if next == StateStopping && s.current != next {
close(s.stoppingSignal)
}

// update state
s.current = next
}

// IsDraining evaluates to true if the current state is StateDraining.
func (s *WatcherState) IsDraining() bool {
s.mut.RLock()
defer s.mut.RUnlock()
return s.current == StateDraining
}

// IsStopping evaluates to true if the current state is StateStopping.
func (s *WatcherState) IsStopping() bool {
s.mut.RLock()
defer s.mut.RUnlock()
return s.current == StateStopping
}

// WaitForStopping returns a channel in which the called can read, effectively waiting until the state changes to stopping.
func (s *WatcherState) WaitForStopping() <-chan struct{} {
return s.stoppingSignal
}

// printState prints a user-friendly name of the possible Watcher states.
func printState(state int) string {
switch state {
case StateRunning:
return "running"
case StateDraining:
return "draining"
case StateStopping:
return "stopping"
default:
return "unknown"
}
}
Loading

0 comments on commit c067af3

Please sign in to comment.