Skip to content

Commit

Permalink
Fix memory leak in loki.process on config update (#7004)
Browse files Browse the repository at this point in the history
* Cleanup loki.process on update

* Fix goroutine leaks in other unit tests

* Stop handleOut only after the pipeline shut down
  • Loading branch information
ptodev authored Aug 29, 2024
1 parent eabae1a commit b74f174
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 66 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ internal API changes are not present.
Main (unreleased)
-----------------

### Bugfixes

- Fix a memory leak which would occur any time `loki.process` had its configuration reloaded. (@ptodev)

### Other changes

- Change the Docker base image for Linux containers to `ubuntu:noble`. (@amontalban)
Expand Down
23 changes: 15 additions & 8 deletions internal/component/loki/process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,24 @@ func New(o component.Options, args Arguments) (*Component, error) {

// Run implements component.Component.
func (c *Component) Run(ctx context.Context) error {
shutdownCh := make(chan struct{})
wgOut := &sync.WaitGroup{}
defer func() {
c.mut.RLock()
if c.entryHandler != nil {
c.entryHandler.Stop()
// Stop handleOut only after the entryHandler has stopped.
// If handleOut stops first, entryHandler might get stuck on a channel send.
close(shutdownCh)
wgOut.Wait()
}
close(c.processIn)
c.mut.RUnlock()
}()
wg := &sync.WaitGroup{}
wg.Add(2)
wg.Add(1)
go c.handleIn(ctx, wg)
go c.handleOut(ctx, wg)
wgOut.Add(1)
go c.handleOut(shutdownCh, wgOut)

wg.Wait()
return nil
Expand Down Expand Up @@ -127,8 +133,9 @@ func (c *Component) Update(args component.Arguments) error {
if err != nil {
return err
}
c.entryHandler = loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.processIn = pipeline.Wrap(c.entryHandler).Chan()
entryHandler := loki.NewEntryHandler(c.processOut, func() { pipeline.Cleanup() })
c.entryHandler = pipeline.Wrap(entryHandler)
c.processIn = c.entryHandler.Chan()
c.stages = newArgs.Stages
}

Expand Down Expand Up @@ -158,19 +165,19 @@ func (c *Component) handleIn(ctx context.Context, wg *sync.WaitGroup) {
}
}

func (c *Component) handleOut(ctx context.Context, wg *sync.WaitGroup) {
func (c *Component) handleOut(shutdownCh chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
case <-shutdownCh:
return
case entry := <-c.processOut:
c.fanoutMut.RLock()
fanout := c.fanout
c.fanoutMut.RUnlock()
for _, f := range fanout {
select {
case <-ctx.Done():
case <-shutdownCh:
return
case f.Chan() <- entry:
// no-op
Expand Down
Loading

0 comments on commit b74f174

Please sign in to comment.