From ed3190b9e83c6a6a3f95508ca6720c0d8353dfb1 Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Thu, 23 Nov 2023 11:13:00 -0300 Subject: [PATCH] splitting apart Stop and Drain --- component/common/loki/client/manager.go | 15 ++++++++---- component/common/loki/wal/config.go | 4 ++-- component/common/loki/wal/watcher.go | 28 +++++++++++------------ component/common/loki/wal/watcher_test.go | 15 +++++++----- component/loki/write/types.go | 2 +- 5 files changed, 36 insertions(+), 28 deletions(-) diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index d8caad75d120..14fbf92be6df 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -35,8 +35,9 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {} func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {} -type Stoppable interface { - Stop(drain bool) +type StoppableWatcher interface { + Stop() + Drain() } type StoppableClient interface { @@ -53,7 +54,7 @@ type StoppableClient interface { type Manager struct { name string clients []Client - walWatchers []Stoppable + walWatchers []StoppableWatcher // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface stoppableClients []StoppableClient @@ -78,7 +79,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]Stoppable, 0, len(clientCfgs)) + watchers := make([]StoppableWatcher, 0, len(clientCfgs)) stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). @@ -203,7 +204,11 @@ func (m *Manager) StopWithDrain(drain bool) { // stop wal watchers for _, walWatcher := range m.walWatchers { - walWatcher.Stop(drain) + // if drain enabled, drain the WAL + if drain { + walWatcher.Drain() + } + walWatcher.Stop() } // close clients for _, c := range m.stoppableClients { diff --git a/component/common/loki/wal/config.go b/component/common/loki/wal/config.go index 160bca2b90a9..7c22d747c13d 100644 --- a/component/common/loki/wal/config.go +++ b/component/common/loki/wal/config.go @@ -10,9 +10,9 @@ const ( // DefaultWatchConfig is the opinionated defaults for operating the Watcher. var DefaultWatchConfig = WatchConfig{ - MinReadFrequency: time.Millisecond * 250, + MinReadFrequency: 250 * time.Millisecond, MaxReadFrequency: time.Second, - DrainTimeout: time.Second * 30, + DrainTimeout: 15 * time.Second, } // Config contains all WAL-related settings. diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index a92a62631f7e..f91e71b856dc 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -349,22 +349,22 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { return readData, firstErr } -// Stop stops the Watcher, draining the WAL until the end of the last segment if drain is true. Since the writer of the WAL -// is expected to have stopped before the Watcher, no further writes are expected, so segments can be safely consumed. -// -// Note if drain is enabled, the caller routine of Stop will block executing the drain procedure. -func (w *Watcher) Stop(drain bool) { - if drain { - level.Info(w.logger).Log("msg", "Draining Watcher") - w.state.Transition(internal.StateDraining) - // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly - select { - case <-time.NewTimer(w.drainTimeout).C: - level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping") - case <-w.state.WaitForStopping(): - } +// Drain moves the Watcher to a draining state, which will assume no more data is being written to the WAL, and it will +// attempt to read until the end of the last written segment. The calling routine of Drain will block until all data is +// read, or a timeout occurs. +func (w *Watcher) Drain() { + level.Info(w.logger).Log("msg", "Draining Watcher") + w.state.Transition(internal.StateDraining) + // wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly + select { + case <-time.NewTimer(w.drainTimeout).C: + level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping") + case <-w.state.WaitForStopping(): } +} +// Stop stops the Watcher, shutting down the main routine. +func (w *Watcher) Stop() { w.state.Transition(internal.StateStopping) // upon calling stop, wait for main mainLoop execution to stop diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index dbdef8a5d3bb..15644d740a28 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -343,7 +343,7 @@ func TestWatcher(t *testing.T) { } // create new watcher, and defer stop watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, noMarker{}) - defer watcher.Stop(false) + defer watcher.Stop() wl, err := New(Config{ Enabled: true, Dir: dir, @@ -421,7 +421,7 @@ func TestWatcher_Replay(t *testing.T) { return 0 }, }) - defer watcher.Stop(false) + defer watcher.Stop() wl, err := New(Config{ Enabled: true, Dir: dir, @@ -503,7 +503,7 @@ func TestWatcher_Replay(t *testing.T) { return -1 }, }) - defer watcher.Stop(false) + defer watcher.Stop() wl, err := New(Config{ Enabled: true, Dir: dir, @@ -682,7 +682,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 now := time.Now() - watcher.Stop(true) + watcher.Drain() + watcher.Stop() // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout // has one extra second. @@ -733,7 +734,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 now := time.Now() - watcher.Stop(true) + watcher.Drain() + watcher.Stop() // expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout // has one extra second. @@ -785,7 +787,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) { // Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1 now := time.Now() - watcher.Stop(true) + watcher.Drain() + watcher.Stop() require.InDelta(t, time.Second*10, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s") require.Less(t, int(writeTo.entriesReceived.Load()), 20, "expected watcher to have not consumed WAL fully") diff --git a/component/loki/write/types.go b/component/loki/write/types.go index 2959f1b681a1..dc240c675e98 100644 --- a/component/loki/write/types.go +++ b/component/loki/write/types.go @@ -82,7 +82,7 @@ type QueueConfig struct { func (q *QueueConfig) SetToDefault() { *q = QueueConfig{ Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10 - DrainTimeout: time.Minute, + DrainTimeout: 15 * time.Second, } }