diff --git a/CHANGELOG.md b/CHANGELOG.md index eb9024692b74..ea83a9966c7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,8 @@ Main (unreleased) - The `remote.http` component can optionally define a request body. (@tpaschalis) +- Added support for `loki.write` to flush WAL on agent shutdown. (@thepalbi) + ### Bugfixes - Update `pyroscope.ebpf` to fix a logical bug causing to profile to many kthreads instead of regular processes https://github.com/grafana/pyroscope/pull/2778 (@korniltsev) diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 6683e9e5772b..244aa587a81f 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -35,8 +35,9 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {} func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {} -type Stoppable interface { +type StoppableWatcher interface { Stop() + Drain() } type StoppableClient interface { @@ -44,6 +45,27 @@ type StoppableClient interface { StopNow() } +// watcherClientPair represents a pair of watcher and client, which are coupled together, or just a single client. +type watcherClientPair struct { + watcher StoppableWatcher + client StoppableClient +} + +// Stop will proceed to stop, in order, the possibly-nil watcher and the client. +func (p watcherClientPair) Stop(drain bool) { + // if the config has WAL disabled, there will be no watcher per client config + if p.watcher != nil { + // if drain enabled, drain the WAL + if drain { + p.watcher.Drain() + } + p.watcher.Stop() + } + + // subsequently stop the client + p.client.Stop() +} + // Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry // from the scrape targets, to the remote write clients themselves. // @@ -51,12 +73,10 @@ type StoppableClient interface { // work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client // types: Logger, Multi and WAL. type Manager struct { - name string - clients []Client - walWatchers []Stoppable + name string - // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface - stoppableClients []StoppableClient + clients []Client + pairs []watcherClientPair entries chan loki.Entry once sync.Once @@ -78,8 +98,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]Stoppable, 0, len(clientCfgs)) - stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) + pairs := make([]watcherClientPair, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). clientName := GetClientName(cfg) @@ -103,7 +122,6 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr if err != nil { return nil, fmt.Errorf("error starting queue client: %w", err) } - stoppableClients = append(stoppableClients, queue) // subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo // series cache whenever a segment is deleted. @@ -116,7 +134,10 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName) watcher.Start() - watchers = append(watchers, watcher) + pairs = append(pairs, watcherClientPair{ + watcher: watcher, + client: queue, + }) } else { client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger) if err != nil { @@ -124,14 +145,16 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr } clients = append(clients, client) - stoppableClients = append(stoppableClients, client) + + pairs = append(pairs, watcherClientPair{ + client: client, + }) } } manager := &Manager{ - clients: clients, - stoppableClients: stoppableClients, - walWatchers: watchers, - entries: make(chan loki.Entry), + clients: clients, + pairs: pairs, + entries: make(chan loki.Entry), } if walCfg.Enabled { manager.name = buildManagerName("wal", clientCfgs...) @@ -174,8 +197,8 @@ func (m *Manager) startWithForward() { } func (m *Manager) StopNow() { - for _, c := range m.stoppableClients { - c.StopNow() + for _, pair := range m.pairs { + pair.client.StopNow() } } @@ -187,18 +210,35 @@ func (m *Manager) Chan() chan<- loki.Entry { return m.entries } +// Stop the manager, not draining the Write-Ahead Log, if that mode is enabled. func (m *Manager) Stop() { + m.StopWithDrain(false) +} + +// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled, +// the Watchers will attempt to drain the WAL completely. +// The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then +// the clients are shut down accordingly. +func (m *Manager) StopWithDrain(drain bool) { // first stop the receiving channel m.once.Do(func() { close(m.entries) }) m.wg.Wait() - // close wal watchers - for _, walWatcher := range m.walWatchers { - walWatcher.Stop() - } - // close clients - for _, c := range m.stoppableClients { - c.Stop() + + var stopWG sync.WaitGroup + + // Depending on whether drain is enabled, the maximum time stopping a watcher and it's client can take is + // the drain time of the watcher + drain time client. To minimize this, and since we keep a separate WAL for each + // client config, each (watcher, client) pair is stopped concurrently. + for _, pair := range m.pairs { + stopWG.Add(1) + go func(pair watcherClientPair) { + defer stopWG.Done() + pair.Stop(drain) + }(pair) } + + // wait for all pairs to be stopped + stopWG.Wait() } // GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config, diff --git a/component/common/loki/client/queue_client.go b/component/common/loki/client/queue_client.go index 871880cad17b..edd9c25bbd37 100644 --- a/component/common/loki/client/queue_client.go +++ b/component/common/loki/client/queue_client.go @@ -29,7 +29,7 @@ import ( // StoppableWriteTo is a mixing of the WAL's WriteTo interface, that is Stoppable as well. type StoppableWriteTo interface { agentWal.WriteTo - Stoppable + Stop() StopNow() } @@ -38,7 +38,7 @@ type StoppableWriteTo interface { type MarkerHandler interface { UpdateReceivedData(segmentId, dataCount int) // Data queued for sending UpdateSentData(segmentId, dataCount int) // Data which was sent or given up on sending - Stoppable + Stop() } // queuedBatch is a batch specific to a tenant, that is considered ready to be sent. diff --git a/component/common/loki/wal/config.go b/component/common/loki/wal/config.go index c0d6c7ae2752..7c22d747c13d 100644 --- a/component/common/loki/wal/config.go +++ b/component/common/loki/wal/config.go @@ -10,8 +10,9 @@ const ( // DefaultWatchConfig is the opinionated defaults for operating the Watcher. var DefaultWatchConfig = WatchConfig{ - MinReadFrequency: time.Millisecond * 250, + MinReadFrequency: 250 * time.Millisecond, MaxReadFrequency: time.Second, + DrainTimeout: 15 * time.Second, } // Config contains all WAL-related settings. @@ -49,6 +50,10 @@ type WatchConfig struct { // MaxReadFrequency controls the maximum read frequency the Watcher polls the WAL for new records. As mentioned above // it caps the polling frequency to a maximum, to prevent to exponential backoff from making it too high. MaxReadFrequency time.Duration + + // DrainTimeout is the maximum amount of time that the Watcher can spend draining the remaining segments in the WAL. + // After that time, the Watcher is stopped immediately, dropping all the work in process. + DrainTimeout time.Duration } // UnmarshalYAML implement YAML Unmarshaler diff --git a/component/common/loki/wal/internal/watcher_state.go b/component/common/loki/wal/internal/watcher_state.go new file mode 100644 index 000000000000..c81413dfd230 --- /dev/null +++ b/component/common/loki/wal/internal/watcher_state.go @@ -0,0 +1,88 @@ +package internal + +import ( + "sync" + + "github.com/go-kit/log" + "github.com/grafana/agent/pkg/flow/logging/level" +) + +const ( + // StateRunning is the main functioning state of the watcher. It will keep tailing head segments, consuming closed + // ones, and checking for new ones. + StateRunning = iota + + // StateDraining is an intermediary state between running and stopping. The watcher will attempt to consume all the data + // found in the WAL, omitting errors and assuming all segments found are "closed", that is, no longer being written. + StateDraining + + // StateStopping means the Watcher is being stopped. It should drop all segment read activity, and exit promptly. + StateStopping +) + +// WatcherState is a holder for the state the Watcher is in. It provides handy methods for checking it it's stopping, getting +// the current state, or blocking until it has stopped. +type WatcherState struct { + current int + mut sync.RWMutex + stoppingSignal chan struct{} + logger log.Logger +} + +func NewWatcherState(l log.Logger) *WatcherState { + return &WatcherState{ + current: StateRunning, + stoppingSignal: make(chan struct{}), + logger: l, + } +} + +// Transition changes the state of WatcherState to next, reacting accordingly. +func (s *WatcherState) Transition(next int) { + s.mut.Lock() + defer s.mut.Unlock() + + level.Debug(s.logger).Log("msg", "watcher transitioning state", "currentState", printState(s.current), "nextState", printState(next)) + + // only perform channel close if the state is not already stopping + // expect s.s to be either draining ro running to perform a close + if next == StateStopping && s.current != next { + close(s.stoppingSignal) + } + + // update state + s.current = next +} + +// IsDraining evaluates to true if the current state is StateDraining. +func (s *WatcherState) IsDraining() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.current == StateDraining +} + +// IsStopping evaluates to true if the current state is StateStopping. +func (s *WatcherState) IsStopping() bool { + s.mut.RLock() + defer s.mut.RUnlock() + return s.current == StateStopping +} + +// WaitForStopping returns a channel in which the called can read, effectively waiting until the state changes to stopping. +func (s *WatcherState) WaitForStopping() <-chan struct{} { + return s.stoppingSignal +} + +// printState prints a user-friendly name of the possible Watcher states. +func printState(state int) string { + switch state { + case StateRunning: + return "running" + case StateDraining: + return "draining" + case StateStopping: + return "stopping" + default: + return "unknown" + } +} diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index 0972f32f8f8a..f91e71b856dc 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -10,6 +10,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/agent/component/common/loki/wal/internal" "github.com/grafana/agent/pkg/flow/logging/level" "github.com/prometheus/prometheus/tsdb/record" "github.com/prometheus/prometheus/tsdb/wlog" @@ -80,7 +81,7 @@ type Watcher struct { actions WriteTo readNotify chan struct{} done chan struct{} - quit chan struct{} + state *internal.WatcherState walDir string logger log.Logger MaxSegment int @@ -88,6 +89,7 @@ type Watcher struct { metrics *WatcherMetrics minReadFreq time.Duration maxReadFreq time.Duration + drainTimeout time.Duration marker Marker savedSegment int } @@ -99,7 +101,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log id: id, actions: writeTo, readNotify: make(chan struct{}), - quit: make(chan struct{}), + state: internal.NewWatcherState(logger), done: make(chan struct{}), MaxSegment: -1, marker: marker, @@ -108,6 +110,7 @@ func NewWatcher(walDir, id string, metrics *WatcherMetrics, writeTo WriteTo, log metrics: metrics, minReadFreq: config.MinReadFrequency, maxReadFreq: config.MaxReadFrequency, + drainTimeout: config.DrainTimeout, } } @@ -121,18 +124,26 @@ func (w *Watcher) Start() { // retries. func (w *Watcher) mainLoop() { defer close(w.done) - for !isClosed(w.quit) { + for !w.state.IsStopping() { if w.marker != nil { w.savedSegment = w.marker.LastMarkedSegment() level.Debug(w.logger).Log("msg", "last saved segment", "segment", w.savedSegment) } - if err := w.run(); err != nil { + err := w.run() + if err != nil { level.Error(w.logger).Log("msg", "error tailing WAL", "err", err) } + if w.state.IsDraining() && errors.Is(err, os.ErrNotExist) { + level.Info(w.logger).Log("msg", "Reached non existing segment while draining, assuming end of WAL") + // since we've reached the end of the WAL, and the Watcher is draining, promptly transition to stopping state + // so the watcher can stoppingSignal early + w.state.Transition(internal.StateStopping) + } + select { - case <-w.quit: + case <-w.state.WaitForStopping(): return case <-time.After(5 * time.Second): } @@ -160,9 +171,8 @@ func (w *Watcher) run() error { } level.Debug(w.logger).Log("msg", "Tailing WAL", "currentSegment", currentSegment, "lastSegment", lastSegment) - for !isClosed(w.quit) { + for !w.state.IsStopping() { w.metrics.currentSegment.WithLabelValues(w.id).Set(float64(currentSegment)) - level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", currentSegment) // On start, we have a pointer to what is the latest segment. On subsequent calls to this function, // currentSegment will have been incremented, and we should open that segment. @@ -187,6 +197,8 @@ func (w *Watcher) run() error { // If tail is false, we know the segment we are "watching" over is closed (no further write will occur to it). Then, the // segment is read fully, any errors are logged as Warnings, and no error is returned. func (w *Watcher) watch(segmentNum int, tail bool) error { + level.Debug(w.logger).Log("msg", "Watching WAL segment", "currentSegment", segmentNum, "tail", tail) + segment, err := wlog.OpenReadSegment(wlog.SegmentName(w.walDir, segmentNum)) if err != nil { return err @@ -215,7 +227,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { for { select { - case <-w.quit: + case <-w.state.WaitForStopping(): return nil case <-segmentTicker.C: @@ -224,24 +236,30 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { return fmt.Errorf("segments: %w", err) } - // Check if new segments exists. - if last <= segmentNum { + // Check if new segments exists, or we are draining the WAL, which means that either: + // - This is the last segment, and we can consume it fully because we are draining the WAL + // - There's a segment after the current one, and we can consume this segment fully as well + if last <= segmentNum && !w.state.IsDraining() { continue } - // Since we know last > segmentNum, there must be a new segment. Read the remaining from the segmentNum segment - // and return from `watch` to read the next one + if w.state.IsDraining() { + level.Debug(w.logger).Log("msg", "Draining segment completely", "segment", segmentNum, "lastSegment", last) + } + + // We now that there's either a new segment (last > segmentNum), or we are draining the WAL. Either case, read + // the remaining data from the segmentNum and return from `watch` to read the next one. _, err = w.readSegment(reader, segmentNum) if debug { level.Warn(w.logger).Log("msg", "Error reading segment inside segmentTicker", "segment", segmentNum, "read", reader.Offset(), "err", err) } - // io.EOF error are non-fatal since we are tailing the wal + // io.EOF error are non-fatal since we are consuming the segment till the end if errors.Unwrap(err) != io.EOF { return err } - // return after reading the whole segment for creating a new LiveReader from the newly created segment + // return after reading the whole segment return nil // the cases below will unlock the select block, and execute the block below @@ -293,7 +311,7 @@ func (w *Watcher) watch(segmentNum int, tail bool) error { func (w *Watcher) readSegment(r *wlog.LiveReader, segmentNum int) (bool, error) { var readData bool - for r.Next() && !isClosed(w.quit) { + for r.Next() && !w.state.IsStopping() { rec := r.Record() w.metrics.recordsRead.WithLabelValues(w.id).Inc() read, err := w.decodeAndDispatch(rec, segmentNum) @@ -331,9 +349,24 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { return readData, firstErr } +// Drain moves the Watcher to a draining state, which will assume no more data is being written to the WAL, and it will +// attempt to read until the end of the last written segment. The calling routine of Drain will block until all data is +// read, or a timeout occurs. +func (w *Watcher) Drain() { + level.Info(w.logger).Log("msg", "Draining Watcher") + w.state.Transition(internal.StateDraining) + // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly + select { + case <-time.NewTimer(w.drainTimeout).C: + level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping") + case <-w.state.WaitForStopping(): + } +} + +// Stop stops the Watcher, shutting down the main routine. func (w *Watcher) Stop() { - // first close the quit channel to order main mainLoop routine to stop - close(w.quit) + w.state.Transition(internal.StateStopping) + // upon calling stop, wait for main mainLoop execution to stop <-w.done @@ -397,16 +430,6 @@ func (w *Watcher) findNextSegmentFor(index int) (int, error) { return -1, errors.New("failed to find segment for index") } -// isClosed checks in a non-blocking manner if a channel is closed or not. -func isClosed(c chan struct{}) bool { - select { - case <-c: - return true - default: - return false - } -} - // readSegmentNumbers reads the given directory and returns all segment identifiers, that is, the index of each segment // file. func readSegmentNumbers(dir string) ([]int, error) { diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index 97de19748dee..15644d740a28 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -3,6 +3,7 @@ package wal import ( "fmt" "os" + "strings" "testing" "time" @@ -12,6 +13,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/tsdb/record" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "github.com/grafana/agent/component/common/loki" "github.com/grafana/agent/component/common/loki/utils" @@ -568,3 +570,228 @@ func TestWatcher_Replay(t *testing.T) { writeTo.AssertContainsLines(t, segment2Lines...) }) } + +// slowWriteTo mimics the combination of a WriteTo and a slow remote write client. This will allow us to have a writer +// that moves faster than the WAL watcher, and therefore, test the draining procedure. +type slowWriteTo struct { + t *testing.T + entriesReceived atomic.Uint64 + sleepAfterAppendEntries time.Duration +} + +func (s *slowWriteTo) SeriesReset(segmentNum int) { +} + +func (s *slowWriteTo) StoreSeries(series []record.RefSeries, segmentNum int) { +} + +func (s *slowWriteTo) AppendEntries(entries wal.RefEntries, segmentNum int) error { + // only log on development debug flag + if debug { + var allLines strings.Builder + for _, e := range entries.Entries { + allLines.WriteString(e.Line) + allLines.WriteString("/") + } + s.t.Logf("AppendEntries called from segment %d - %s", segmentNum, allLines.String()) + } + + s.entriesReceived.Add(uint64(len(entries.Entries))) + time.Sleep(s.sleepAfterAppendEntries) + return nil +} + +func TestWatcher_StopAndDrainWAL(t *testing.T) { + labels := model.LabelSet{ + "app": "test", + } + logger := level.NewFilter(log.NewLogfmtLogger(os.Stdout), level.AllowDebug()) + + // newTestingResources is a helper for bootstrapping all required testing resources + newTestingResources := func(t *testing.T, cfg WatchConfig) (*slowWriteTo, *Watcher, WAL) { + reg := prometheus.NewRegistry() + dir := t.TempDir() + metrics := NewWatcherMetrics(reg) + + // the slow write to will take one second on each AppendEntries operation + writeTo := &slowWriteTo{ + t: t, + sleepAfterAppendEntries: time.Second, + } + + watcher := NewWatcher(dir, "test", metrics, writeTo, logger, cfg, mockMarker{ + LastMarkedSegmentFunc: func() int { + // Ignore marker to read from last segment, which is none + return -1 + }, + }) + + // start watcher, and burn through WAL as we write to it + watcher.Start() + + wl, err := New(Config{ + Enabled: true, + Dir: dir, + }, logger, reg) + require.NoError(t, err) + return writeTo, watcher, wl + } + + t.Run("watcher drains WAL just in time", func(t *testing.T) { + cfg := DefaultWatchConfig + // considering the slow write to has a 1 second delay when Appending an entry, and before the draining begins, + // the watcher would have consumed only 5 entries, this timeout will give the Watcher just enough time to fully + // drain the WAL. + cfg.DrainTimeout = time.Second * 16 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() + + ew := newEntryWriter() + + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } + } + + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Drain() + watcher.Stop() + + // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout + // has one extra second. + require.InDelta(t, time.Second*15, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Equal(t, int(writeTo.entriesReceived.Load()), 20, "expected the watcher to fully drain the WAL") + }) + + t.Run("watcher should exit promptly after draining completely", func(t *testing.T) { + cfg := DefaultWatchConfig + // the drain timeout will be too long, for the amount of data remaining in the WAL (~15 entries more) + cfg.DrainTimeout = time.Second * 30 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() + + ew := newEntryWriter() + + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } + } + + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Drain() + watcher.Stop() + + // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout + // has one extra second. + require.InDelta(t, time.Second*15, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Equal(t, int(writeTo.entriesReceived.Load()), 20, "expected the watcher to fully drain the WAL") + }) + + t.Run("watcher drain timeout too short, should exit promptly", func(t *testing.T) { + cfg := DefaultWatchConfig + // having a 10 seconds timeout should give the watcher enough time to only consume ~10 entries, and be missing ~5 + // from the last segment + cfg.DrainTimeout = time.Second * 10 + writeTo, watcher, wl := newTestingResources(t, cfg) + defer wl.Close() + + ew := newEntryWriter() + + // helper to add context to each written line + var lineCounter atomic.Int64 + writeNLines := func(t *testing.T, n int) { + for i := 0; i < n; i++ { + // First, write to segment 0. This will be the last "marked" segment + err := ew.WriteEntry(loki.Entry{ + Labels: labels, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("test line %d", lineCounter.Load()), + }, + }, wl, logger) + lineCounter.Add(1) + require.NoError(t, err) + } + } + + // The test will write the WAL while the Watcher is running. First, 10 lines will be written to a segment, and the test + // will wait for the Watcher to have read 5 lines. After, a new segment will be cut, 10 other lines written, and the + // Watcher stopped with drain. The test will expect all 20 lines in total to have been received. + + writeNLines(t, 10) + + require.Eventually(t, func() bool { + return writeTo.entriesReceived.Load() >= 5 + }, time.Second*11, time.Millisecond*500, "expected the write to catch up to half of the first segment") + + _, err := wl.NextSegment() + require.NoError(t, err) + writeNLines(t, 10) + require.NoError(t, wl.Sync()) + + // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 + now := time.Now() + watcher.Drain() + watcher.Stop() + + require.InDelta(t, time.Second*10, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") + require.Less(t, int(writeTo.entriesReceived.Load()), 20, "expected watcher to have not consumed WAL fully") + require.InDelta(t, 15, int(writeTo.entriesReceived.Load()), 1.0, "expected Watcher to consume at most +/- 1 entry from the WAL") + }) +} diff --git a/component/loki/write/types.go b/component/loki/write/types.go index 2959f1b681a1..dc240c675e98 100644 --- a/component/loki/write/types.go +++ b/component/loki/write/types.go @@ -82,7 +82,7 @@ type QueueConfig struct { func (q *QueueConfig) SetToDefault() { *q = QueueConfig{ Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10 - DrainTimeout: time.Minute, + DrainTimeout: 15 * time.Second, } } diff --git a/component/loki/write/write.go b/component/loki/write/write.go index efda7b597c6f..a31cb0745976 100644 --- a/component/loki/write/write.go +++ b/component/loki/write/write.go @@ -41,6 +41,7 @@ type WalArguments struct { MaxSegmentAge time.Duration `river:"max_segment_age,attr,optional"` MinReadFrequency time.Duration `river:"min_read_frequency,attr,optional"` MaxReadFrequency time.Duration `river:"max_read_frequency,attr,optional"` + DrainTimeout time.Duration `river:"drain_timeout,attr,optional"` } func (wa *WalArguments) Validate() error { @@ -58,6 +59,7 @@ func (wa *WalArguments) SetToDefault() { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, } } @@ -81,7 +83,7 @@ type Component struct { receiver loki.LogsReceiver // remote write components - clientManger client.Client + clientManger *client.Manager walWriter *wal.Writer // sink is the place where log entries received by this component should be written to. If WAL @@ -111,6 +113,18 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { + defer func() { + // when exiting Run, proceed to shut down first the writer component, and then + // the client manager, with the WAL and remote-write client inside + if c.walWriter != nil { + c.walWriter.Stop() + } + if c.clientManger != nil { + // drain, since the component is shutting down. That means the agent is shutting down as well + c.clientManger.StopWithDrain(true) + } + }() + for { select { case <-ctx.Done(): @@ -140,6 +154,7 @@ func (c *Component) Update(args component.Arguments) error { c.walWriter.Stop() } if c.clientManger != nil { + // only drain on component shutdown c.clientManger.Stop() } @@ -150,6 +165,7 @@ func (c *Component) Update(args component.Arguments) error { WatchConfig: wal.WatchConfig{ MinReadFrequency: newArgs.WAL.MinReadFrequency, MaxReadFrequency: newArgs.WAL.MaxReadFrequency, + DrainTimeout: newArgs.WAL.DrainTimeout, }, } diff --git a/component/loki/write/write_test.go b/component/loki/write/write_test.go index 642b53703e0c..d77bebe21c0f 100644 --- a/component/loki/write/write_test.go +++ b/component/loki/write/write_test.go @@ -79,6 +79,7 @@ func TestUnmarshallWalAttrributes(t *testing.T) { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, }, }, "wal enabled with defaults": { @@ -90,6 +91,7 @@ func TestUnmarshallWalAttrributes(t *testing.T) { MaxSegmentAge: wal.DefaultMaxSegmentAge, MinReadFrequency: wal.DefaultWatchConfig.MinReadFrequency, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: wal.DefaultWatchConfig.DrainTimeout, }, }, "wal enabled with some overrides": { @@ -97,12 +99,14 @@ func TestUnmarshallWalAttrributes(t *testing.T) { enabled = true max_segment_age = "10m" min_read_frequency = "11ms" + drain_timeout = "5m" `, expected: WalArguments{ Enabled: true, MaxSegmentAge: time.Minute * 10, MinReadFrequency: time.Millisecond * 11, MaxReadFrequency: wal.DefaultWatchConfig.MaxReadFrequency, + DrainTimeout: time.Minute * 5, }, }, } { diff --git a/docs/sources/flow/reference/components/loki.write.md b/docs/sources/flow/reference/components/loki.write.md index efbcdf34eabc..75aad04f3f2a 100644 --- a/docs/sources/flow/reference/components/loki.write.md +++ b/docs/sources/flow/reference/components/loki.write.md @@ -162,10 +162,11 @@ The following arguments are supported: Name | Type | Description | Default | Required --------------------- |------------|--------------------------------------------------------------------------------------------------------------------|-----------| -------- -`enabled` | `bool` | Whether to enable the WAL. | false | no +`enabled` | `bool` | Whether to enable the WAL. | false | no `max_segment_age` | `duration` | Maximum time a WAL segment should be allowed to live. Segments older than this setting will be eventually deleted. | `"1h"` | no `min_read_frequency` | `duration` | Minimum backoff time in the backup read mechanism. | `"250ms"` | no `max_read_frequency` | `duration` | Maximum backoff time in the backup read mechanism. | `"1s"` | no +`drain_timeout` | `duration` | Maximum time the WAL drain procedure can take, before being forcefully stopped. | `"30s"` | no [run]: {{< relref "../cli/run.md" >}}