From 5cde5fc54fba16f172e61aa7cba3bd9821bbae0f Mon Sep 17 00:00:00 2001 From: Pablo Balbi Date: Wed, 6 Dec 2023 13:57:11 -0300 Subject: [PATCH] minimize manager stop time --- component/common/loki/client/manager.go | 79 ++++++++++++++++--------- 1 file changed, 52 insertions(+), 27 deletions(-) diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 14fbf92be6df..244aa587a81f 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -45,6 +45,27 @@ type StoppableClient interface { StopNow() } +// watcherClientPair represents a pair of watcher and client, which are coupled together, or just a single client. +type watcherClientPair struct { + watcher StoppableWatcher + client StoppableClient +} + +// Stop will proceed to stop, in order, the possibly-nil watcher and the client. +func (p watcherClientPair) Stop(drain bool) { + // if the config has WAL disabled, there will be no watcher per client config + if p.watcher != nil { + // if drain enabled, drain the WAL + if drain { + p.watcher.Drain() + } + p.watcher.Stop() + } + + // subsequently stop the client + p.client.Stop() +} + // Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry // from the scrape targets, to the remote write clients themselves. // @@ -52,12 +73,10 @@ type StoppableClient interface { // work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client // types: Logger, Multi and WAL. type Manager struct { - name string - clients []Client - walWatchers []StoppableWatcher + name string - // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface - stoppableClients []StoppableClient + clients []Client + pairs []watcherClientPair entries chan loki.Entry once sync.Once @@ -79,8 +98,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) - watchers := make([]StoppableWatcher, 0, len(clientCfgs)) - stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) + pairs := make([]watcherClientPair, 0, len(clientCfgs)) for _, cfg := range clientCfgs { // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). clientName := GetClientName(cfg) @@ -104,7 +122,6 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr if err != nil { return nil, fmt.Errorf("error starting queue client: %w", err) } - stoppableClients = append(stoppableClients, queue) // subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo // series cache whenever a segment is deleted. @@ -117,7 +134,10 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName) watcher.Start() - watchers = append(watchers, watcher) + pairs = append(pairs, watcherClientPair{ + watcher: watcher, + client: queue, + }) } else { client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger) if err != nil { @@ -125,14 +145,16 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr } clients = append(clients, client) - stoppableClients = append(stoppableClients, client) + + pairs = append(pairs, watcherClientPair{ + client: client, + }) } } manager := &Manager{ - clients: clients, - stoppableClients: stoppableClients, - walWatchers: watchers, - entries: make(chan loki.Entry), + clients: clients, + pairs: pairs, + entries: make(chan loki.Entry), } if walCfg.Enabled { manager.name = buildManagerName("wal", clientCfgs...) @@ -175,8 +197,8 @@ func (m *Manager) startWithForward() { } func (m *Manager) StopNow() { - for _, c := range m.stoppableClients { - c.StopNow() + for _, pair := range m.pairs { + pair.client.StopNow() } } @@ -202,18 +224,21 @@ func (m *Manager) StopWithDrain(drain bool) { m.once.Do(func() { close(m.entries) }) m.wg.Wait() - // stop wal watchers - for _, walWatcher := range m.walWatchers { - // if drain enabled, drain the WAL - if drain { - walWatcher.Drain() - } - walWatcher.Stop() - } - // close clients - for _, c := range m.stoppableClients { - c.Stop() + var stopWG sync.WaitGroup + + // Depending on whether drain is enabled, the maximum time stopping a watcher and it's client can take is + // the drain time of the watcher + drain time client. To minimize this, and since we keep a separate WAL for each + // client config, each (watcher, client) pair is stopped concurrently. + for _, pair := range m.pairs { + stopWG.Add(1) + go func(pair watcherClientPair) { + defer stopWG.Done() + pair.Stop(drain) + }(pair) } + + // wait for all pairs to be stopped + stopWG.Wait() } // GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config,