Skip to content

Commit

Permalink
minimize manager stop time
Browse files Browse the repository at this point in the history
  • Loading branch information
thepalbi committed Dec 6, 2023
1 parent ed3190b commit 5cde5fc
Showing 1 changed file with 52 additions and 27 deletions.
79 changes: 52 additions & 27 deletions component/common/loki/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,38 @@ type StoppableClient interface {
StopNow()
}

// watcherClientPair represents a pair of watcher and client, which are coupled together, or just a single client.
type watcherClientPair struct {
watcher StoppableWatcher
client StoppableClient
}

// Stop will proceed to stop, in order, the possibly-nil watcher and the client.
func (p watcherClientPair) Stop(drain bool) {
// if the config has WAL disabled, there will be no watcher per client config
if p.watcher != nil {
// if drain enabled, drain the WAL
if drain {
p.watcher.Drain()
}
p.watcher.Stop()
}

// subsequently stop the client
p.client.Stop()
}

// Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry
// from the scrape targets, to the remote write clients themselves.
//
// Right now it just supports instantiating the WAL writer side of the future-to-be WAL enabled client. In follow-up
// work, tracked in https://github.com/grafana/loki/issues/8197, this Manager will be responsible for instantiating all client
// types: Logger, Multi and WAL.
type Manager struct {
name string
clients []Client
walWatchers []StoppableWatcher
name string

// stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface
stoppableClients []StoppableClient
clients []Client
pairs []watcherClientPair

entries chan loki.Entry
once sync.Once
Expand All @@ -79,8 +98,7 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr

clientsCheck := make(map[string]struct{})
clients := make([]Client, 0, len(clientCfgs))
watchers := make([]StoppableWatcher, 0, len(clientCfgs))
stoppableClients := make([]StoppableClient, 0, len(clientCfgs))
pairs := make([]watcherClientPair, 0, len(clientCfgs))
for _, cfg := range clientCfgs {
// Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name).
clientName := GetClientName(cfg)
Expand All @@ -104,7 +122,6 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
if err != nil {
return nil, fmt.Errorf("error starting queue client: %w", err)
}
stoppableClients = append(stoppableClients, queue)

// subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo
// series cache whenever a segment is deleted.
Expand All @@ -117,22 +134,27 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr
level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName)
watcher.Start()

watchers = append(watchers, watcher)
pairs = append(pairs, watcherClientPair{
watcher: watcher,
client: queue,
})
} else {
client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger)
if err != nil {
return nil, fmt.Errorf("error starting client: %w", err)
}

clients = append(clients, client)
stoppableClients = append(stoppableClients, client)

pairs = append(pairs, watcherClientPair{
client: client,
})
}
}
manager := &Manager{
clients: clients,
stoppableClients: stoppableClients,
walWatchers: watchers,
entries: make(chan loki.Entry),
clients: clients,
pairs: pairs,
entries: make(chan loki.Entry),
}
if walCfg.Enabled {
manager.name = buildManagerName("wal", clientCfgs...)
Expand Down Expand Up @@ -175,8 +197,8 @@ func (m *Manager) startWithForward() {
}

func (m *Manager) StopNow() {
for _, c := range m.stoppableClients {
c.StopNow()
for _, pair := range m.pairs {
pair.client.StopNow()
}
}

Expand All @@ -202,18 +224,21 @@ func (m *Manager) StopWithDrain(drain bool) {
m.once.Do(func() { close(m.entries) })
m.wg.Wait()

// stop wal watchers
for _, walWatcher := range m.walWatchers {
// if drain enabled, drain the WAL
if drain {
walWatcher.Drain()
}
walWatcher.Stop()
}
// close clients
for _, c := range m.stoppableClients {
c.Stop()
var stopWG sync.WaitGroup

// Depending on whether drain is enabled, the maximum time stopping a watcher and it's client can take is
// the drain time of the watcher + drain time client. To minimize this, and since we keep a separate WAL for each
// client config, each (watcher, client) pair is stopped concurrently.
for _, pair := range m.pairs {
stopWG.Add(1)
go func(pair watcherClientPair) {
defer stopWG.Done()
pair.Stop(drain)
}(pair)
}

// wait for all pairs to be stopped
stopWG.Wait()
}

// GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config,
Expand Down

0 comments on commit 5cde5fc

Please sign in to comment.