From b74f174bd54cc4097517cb5abda373da222c989c Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Thu, 29 Aug 2024 18:33:13 +0100 Subject: [PATCH] Fix memory leak in loki.process on config update (#7004) * Cleanup loki.process on update * Fix goroutine leaks in other unit tests * Stop handleOut only after the pipeline shut down --- CHANGELOG.md | 4 + internal/component/loki/process/process.go | 23 +- .../component/loki/process/process_test.go | 275 ++++++++++++++---- 3 files changed, 236 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ca2cbecf20d..ed031a135650 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,10 @@ internal API changes are not present. Main (unreleased) ----------------- +### Bugfixes + +- Fix a memory leak which would occur any time `loki.process` had its configuration reloaded. (@ptodev) + ### Other changes - Change the Docker base image for Linux containers to `ubuntu:noble`. (@amontalban) diff --git a/internal/component/loki/process/process.go b/internal/component/loki/process/process.go index b7d0e8eb1b47..e7429f618e57 100644 --- a/internal/component/loki/process/process.go +++ b/internal/component/loki/process/process.go @@ -85,18 +85,24 @@ func New(o component.Options, args Arguments) (*Component, error) { // Run implements component.Component. func (c *Component) Run(ctx context.Context) error { + shutdownCh := make(chan struct{}) + wgOut := &sync.WaitGroup{} defer func() { c.mut.RLock() if c.entryHandler != nil { c.entryHandler.Stop() + // Stop handleOut only after the entryHandler has stopped. + // If handleOut stops first, entryHandler might get stuck on a channel send. + close(shutdownCh) + wgOut.Wait() } - close(c.processIn) c.mut.RUnlock() }() wg := &sync.WaitGroup{} - wg.Add(2) + wg.Add(1) go c.handleIn(ctx, wg) - go c.handleOut(ctx, wg) + wgOut.Add(1) + go c.handleOut(shutdownCh, wgOut) wg.Wait() return nil @@ -127,8 +133,9 @@ func (c *Component) Update(args component.Arguments) error { if err != nil { return err } - c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) - c.processIn = pipeline.Wrap(c.entryHandler).Chan() + entryHandler := loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() }) + c.entryHandler = pipeline.Wrap(entryHandler) + c.processIn = c.entryHandler.Chan() c.stages = newArgs.Stages } @@ -158,11 +165,11 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) { } } -func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) { +func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) { defer wg.Done() for { select { - case <-ctx.Done(): + case <-shutdownCh: return case entry := <-c.processOut: c.fanoutMut.RLock() @@ -170,7 +177,7 @@ func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) { c.fanoutMut.RUnlock() for _, f := range fanout { select { - case <-ctx.Done(): + case <-shutdownCh: return case f.Chan() <- entry: // no-op diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 5256bbcbca0f..c2099456950b 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -7,6 +7,7 @@ import ( "fmt" "os" "strings" + "sync" "testing" "time" @@ -316,6 +317,8 @@ stage.labels { } func TestEntrySentToTwoProcessComponents(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + // Set up two different loki.process components. stg1 := ` forward_to = [] @@ -337,13 +340,16 @@ stage.static_labels { args1.ForwardTo = []loki.LogsReceiver{ch1} args2.ForwardTo = []loki.LogsReceiver{ch2} + ctx, ctxCancel := context.WithCancel(context.Background()) + defer ctxCancel() + // Start the loki.process components. tc1, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process") require.NoError(t, err) tc2, err := componenttest.NewControllerFromID(util.TestLogger(t), "loki.process") require.NoError(t, err) - go func() { require.NoError(t, tc1.Run(componenttest.TestContext(t), args1)) }() - go func() { require.NoError(t, tc2.Run(componenttest.TestContext(t), args2)) }() + go func() { require.NoError(t, tc1.Run(ctx, args1)) }() + go func() { require.NoError(t, tc2.Run(ctx, args2)) }() require.NoError(t, tc1.WaitExports(time.Second)) require.NoError(t, tc2.WaitExports(time.Second)) @@ -357,7 +363,7 @@ stage.static_labels { require.NoError(t, err) go func() { - err := ctrl.Run(context.Background(), lsf.Arguments{ + err := ctrl.Run(ctx, lsf.Arguments{ Targets: []discovery.Target{{"__path__": f.Name(), "somelbl": "somevalue"}}, ForwardTo: []loki.LogsReceiver{ tc1.Exports().(Exports).Receiver, @@ -395,68 +401,108 @@ stage.static_labels { } } -func TestDeadlockWithFrequentUpdates(t *testing.T) { - stg := `stage.json { - expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } - drop_malformed = true - } - stage.json { - expressions = { "user" = "" } - source = "extra" - } - stage.labels { - values = { - stream = "", - user = "", - ts = "timestamp", - } - }` +type testFrequentUpdate struct { + t *testing.T + c *Component - // Unmarshal the River relabel rules into a custom struct, as we don't have - // an easy way to refer to a loki.LogsReceiver value for the forward_to - // argument. - type cfg struct { - Stages []stages.StageConfig `river:"stage,enum"` + receiver1 loki.LogsReceiver + receiver2 loki.LogsReceiver + + keepSending atomic.Bool + stopReceiving chan struct{} + keepUpdating atomic.Bool + + wgSend sync.WaitGroup + wgReceive sync.WaitGroup + wgRun sync.WaitGroup + wgUpdate sync.WaitGroup + + lastSend atomic.Value + + stop func() +} + +func startTestFrequentUpdate(t *testing.T, cfg string) *testFrequentUpdate { + res := testFrequentUpdate{ + t: t, + receiver1: loki.NewLogsReceiver(), + receiver2: loki.NewLogsReceiver(), + stopReceiving: make(chan struct{}), } - var stagesCfg cfg - err := river.Unmarshal([]byte(stg), &stagesCfg) + + ctx, cancel := context.WithCancel(context.Background()) + + res.keepSending.Store(true) + res.keepUpdating.Store(true) + + res.stop = func() { + res.keepUpdating.Store(false) + res.wgUpdate.Wait() + + res.keepSending.Store(false) + res.wgSend.Wait() + + cancel() + res.wgRun.Wait() + + close(res.stopReceiving) + res.wgReceive.Wait() + + close(res.receiver1.Chan()) + close(res.receiver2.Chan()) + } + + var args Arguments + err := river.Unmarshal([]byte(cfg), &args) require.NoError(t, err) - ch1, ch2 := loki.NewLogsReceiver(), loki.NewLogsReceiver() + args.ForwardTo = []loki.LogsReceiver{res.receiver1, res.receiver2} - // Create and run the component, so that it can process and forwards logs. opts := component.Options{ Logger: util.TestFlowLogger(t), Registerer: prometheus.NewRegistry(), OnStateChange: func(e component.Exports) {}, } - args := Arguments{ - ForwardTo: []loki.LogsReceiver{ch1, ch2}, - Stages: stagesCfg.Stages, - } - c, err := New(opts, args) + res.c, err = New(opts, args) require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go c.Run(ctx) - var lastSend atomic.Value - // Drain received logs + res.wgRun.Add(1) + go func() { + res.c.Run(ctx) + res.wgRun.Done() + }() + + return &res +} + +// Continuously receive the logs from both channels +func (r *testFrequentUpdate) drainLogs() { + drainLogs := func() { + r.lastSend.Store(time.Now()) + } + + r.wgReceive.Add(1) go func() { for { select { - case <-ch1.Chan(): - lastSend.Store(time.Now()) - case <-ch2.Chan(): - lastSend.Store(time.Now()) + case <-r.stopReceiving: + r.wgReceive.Done() + return + case <-r.receiver1.Chan(): + drainLogs() + case <-r.receiver2.Chan(): + drainLogs() } } }() +} - // Continuously send entries to both channels +// Continuously send entries to both channels +func (r *testFrequentUpdate) sendLogs() { + r.wgSend.Add(1) go func() { - for { + for r.keepSending.Load() { ts := time.Now() logEntry := loki.Entry{ Labels: model.LabelSet{"filename": "/var/log/pods/agent/agent/1.log", "foo": "bar"}, @@ -465,29 +511,142 @@ func TestDeadlockWithFrequentUpdates(t *testing.T) { Line: logline, }, } - c.receiver.Chan() <- logEntry + select { + case r.c.receiver.Chan() <- logEntry: + default: + // continue + } } + r.wgSend.Done() }() +} - // Call Updates - args1 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch1}, - Stages: stagesCfg.Stages, - } - args2 := Arguments{ - ForwardTo: []loki.LogsReceiver{ch2}, - Stages: stagesCfg.Stages, - } +func (r *testFrequentUpdate) updateContinuously(cfg1, cfg2 string) { + var args1 Arguments + err := river.Unmarshal([]byte(cfg1), &args1) + require.NoError(r.t, err) + args1.ForwardTo = []loki.LogsReceiver{r.receiver1} + + var args2 Arguments + err = river.Unmarshal([]byte(cfg2), &args2) + require.NoError(r.t, err) + args2.ForwardTo = []loki.LogsReceiver{r.receiver2} + + r.wgUpdate.Add(1) go func() { - for { - c.Update(args1) - c.Update(args2) + for r.keepUpdating.Load() { + r.c.Update(args1) + r.c.Update(args2) } + r.wgUpdate.Done() }() +} + +func TestDeadlockWithFrequentUpdates(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + cfg1 := `stage.json { + expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } + drop_malformed = true + } + stage.json { + expressions = { "user" = "" } + source = "extra" + } + stage.labels { + values = { + stream = "", + user = "", + ts = "timestamp", + } + } + forward_to = []` + + cfg2 := `stage.json { + expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } + drop_malformed = true + } + stage.labels { + values = { + stream = "", + ts = "timestamp", + } + } + forward_to = []` + + r := startTestFrequentUpdate(t, `forward_to = []`) + + // Continuously send entries to both channels + r.sendLogs() + + // Continuously receive entries on both channels + r.drainLogs() + + // Call Updates + r.updateContinuously(cfg1, cfg2) // Run everything for a while time.Sleep(1 * time.Second) - require.WithinDuration(t, time.Now(), lastSend.Load().(time.Time), 300*time.Millisecond) + require.WithinDuration(t, time.Now(), r.lastSend.Load().(time.Time), 300*time.Millisecond) + + // Clean up + r.stop() +} + +// Make sure there are no goroutine leaks when the config is updated. +// Goroutine leaks often cause memory leaks. +func TestLeakyUpdate(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start")) + + tester := newTester(t) + defer tester.stop() + + forwardArgs := ` + // This will be filled later + forward_to = []` + + numLogsToSend := 1 + + cfg1 := ` + stage.metrics { + metric.counter { + name = "paulin_test1" + action = "inc" + match_all = true + } + }` + forwardArgs + + cfg2 := ` + stage.metrics { + metric.counter { + name = "paulin_test2" + action = "inc" + match_all = true + } + }` + forwardArgs + + metricsTemplate1 := ` + # HELP loki_process_custom_paulin_test1 + # TYPE loki_process_custom_paulin_test1 counter + loki_process_custom_paulin_test1{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + metricsTemplate2 := ` + # HELP loki_process_custom_paulin_test2 + # TYPE loki_process_custom_paulin_test2 counter + loki_process_custom_paulin_test2{filename="/var/log/pods/agent/agent/1.log",foo="bar"} %d + ` + + metrics1 := fmt.Sprintf(metricsTemplate1, numLogsToSend) + metrics2 := fmt.Sprintf(metricsTemplate2, numLogsToSend) + + tester.updateAndTest(numLogsToSend, cfg1, "", metrics1) + tester.updateAndTest(numLogsToSend, cfg2, "", metrics2) + + for i := 0; i < 100; i++ { + tester.updateAndTest(numLogsToSend, cfg1, "", metrics1) + tester.updateAndTest(numLogsToSend, cfg2, "", metrics2) + } } func TestMetricsStageRefresh(t *testing.T) {