Skip to content

Commit

Permalink
splitting apart Stop and Drain
Browse files Browse the repository at this point in the history
  • Loading branch information
thepalbi committed Dec 6, 2023
1 parent d45d5bf commit ed3190b
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 28 deletions.
15 changes: 10 additions & 5 deletions component/common/loki/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ func (n nilNotifier) SubscribeCleanup(_ wal.CleanupEventSubscriber) {}

func (n nilNotifier) SubscribeWrite(_ wal.WriteEventSubscriber) {}

type Stoppable interface {
Stop(drain bool)
type StoppableWatcher interface {
Stop()
Drain()
}

type StoppableClient interface {
Expand All @@ -53,7 +54,7 @@ type StoppableClient interface {
type Manager struct {
name string
clients []Client
walWatchers []Stoppable
walWatchers []StoppableWatcher

// stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface
stoppableClients []StoppableClient
Expand All @@ -78,7 +79,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr

clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(clientCfgs))
watchers := make([]Stoppable, 0, len(clientCfgs))
watchers := make([]StoppableWatcher, 0, len(clientCfgs))
stoppableClients := make([]StoppableClient, 0, len(clientCfgs))
for _, cfg := range clientCfgs {
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
Expand Down Expand Up @@ -203,7 +204,11 @@ func (m *Manager) StopWithDrain(drain bool) {

// stop wal watchers
for _, walWatcher := range m.walWatchers {
walWatcher.Stop(drain)
// if drain enabled, drain the WAL
if drain {
walWatcher.Drain()
}
walWatcher.Stop()
}
// close clients
for _, c := range m.stoppableClients {
Expand Down
4 changes: 2 additions & 2 deletions component/common/loki/wal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ const (

// DefaultWatchConfig is the opinionated defaults for operating the Watcher.
var DefaultWatchConfig = WatchConfig{
MinReadFrequency: time.Millisecond * 250,
MinReadFrequency: 250 * time.Millisecond,
MaxReadFrequency: time.Second,
DrainTimeout: time.Second * 30,
DrainTimeout: 15 * time.Second,
}

// Config contains all WAL-related settings.
Expand Down
28 changes: 14 additions & 14 deletions component/common/loki/wal/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,22 +349,22 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) {
return readData, firstErr
}

// Stop stops the Watcher, draining the WAL until the end of the last segment if drain is true. Since the writer of the WAL
// is expected to have stopped before the Watcher, no further writes are expected, so segments can be safely consumed.
//
// Note if drain is enabled, the caller routine of Stop will block executing the drain procedure.
func (w *Watcher) Stop(drain bool) {
if drain {
level.Info(w.logger).Log("msg", "Draining Watcher")
w.state.Transition(internal.StateDraining)
// wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly
select {
case <-time.NewTimer(w.drainTimeout).C:
level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping")
case <-w.state.WaitForStopping():
}
// Drain moves the Watcher to a draining state, which will assume no more data is being written to the WAL, and it will
// attempt to read until the end of the last written segment. The calling routine of Drain will block until all data is
// read, or a timeout occurs.
func (w *Watcher) Drain() {
level.Info(w.logger).Log("msg", "Draining Watcher")
w.state.Transition(internal.StateDraining)
// wait for drain timeout, or stopping state, in case the Watcher does the transition itself promptly
select {
case <-time.NewTimer(w.drainTimeout).C:
level.Warn(w.logger).Log("msg", "Watcher drain timeout occurred, transitioning to Stopping")
case <-w.state.WaitForStopping():
}
}

// Stop stops the Watcher, shutting down the main routine.
func (w *Watcher) Stop() {
w.state.Transition(internal.StateStopping)

// upon calling stop, wait for main mainLoop execution to stop
Expand Down
15 changes: 9 additions & 6 deletions component/common/loki/wal/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func TestWatcher(t *testing.T) {
}
// create new watcher, and defer stop
watcher := NewWatcher(dir, "test", metrics, writeTo, logger, DefaultWatchConfig, noMarker{})
defer watcher.Stop(false)
defer watcher.Stop()
wl, err := New(Config{
Enabled: true,
Dir: dir,
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestWatcher_Replay(t *testing.T) {
return 0
},
})
defer watcher.Stop(false)
defer watcher.Stop()
wl, err := New(Config{
Enabled: true,
Dir: dir,
Expand Down Expand Up @@ -503,7 +503,7 @@ func TestWatcher_Replay(t *testing.T) {
return -1
},
})
defer watcher.Stop(false)
defer watcher.Stop()
wl, err := New(Config{
Enabled: true,
Dir: dir,
Expand Down Expand Up @@ -682,7 +682,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) {

// Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1
now := time.Now()
watcher.Stop(true)
watcher.Drain()
watcher.Stop()

// expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout
// has one extra second.
Expand Down Expand Up @@ -733,7 +734,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) {

// Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1
now := time.Now()
watcher.Stop(true)
watcher.Drain()
watcher.Stop()

// expecting 15s (missing 15 entries * 1 sec delay in AppendEntries) +/- 1.1s (taking into account the drain timeout
// has one extra second.
Expand Down Expand Up @@ -785,7 +787,8 @@ func TestWatcher_StopAndDrainWAL(t *testing.T) {

// Upon calling Stop drain, the Watcher should finish burning through segment 0, and also consume segment 1
now := time.Now()
watcher.Stop(true)
watcher.Drain()
watcher.Stop()

require.InDelta(t, time.Second*10, time.Since(now), float64(time.Millisecond*1100), "expected the drain procedure to take around 15s")
require.Less(t, int(writeTo.entriesReceived.Load()), 20, "expected watcher to have not consumed WAL fully")
Expand Down
2 changes: 1 addition & 1 deletion component/loki/write/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type QueueConfig struct {
func (q *QueueConfig) SetToDefault() {
*q = QueueConfig{
Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10
DrainTimeout: time.Minute,
DrainTimeout: 15 * time.Second,
}
}

Expand Down

0 comments on commit ed3190b

Please sign in to comment.