Skip to content

Commit

Permalink
Fix goroutine leaks in grpc integration tests (#5340)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Solves part of #5006

## Description of the changes
- The key issue was that the grpc client was only being killed in a
`runtime.SetFinalizer` - i.e. when it is GCed. I think in the tests this
was not shutting down before goleak had decided that the goroutine had
leaked.
- This change switches to a more conventional approach of killing the
client as part of the Close method and then ensuring this is called in
each of the tests.

## How was this change tested?
- While this change does not include a call to activate goleak, it _was_
tested with this. The PR does not include goleak detection because there
are still other violations related to elasticsearch in this package.

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [ ] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Will Sewell <[email protected]>
Signed-off-by: Yuri Shkuro <[email protected]>
Co-authored-by: Yuri Shkuro <[email protected]>
  • Loading branch information
WillSewell and yurishkuro authored Apr 11, 2024
1 parent 4681e36 commit a9a7bc2
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 9 deletions.
18 changes: 11 additions & 7 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"os/exec"
"runtime"
"time"

"github.com/hashicorp/go-hclog"
Expand Down Expand Up @@ -56,7 +55,15 @@ type Configuration struct {
// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Capabilities shared.PluginCapabilities
Capabilities shared.PluginCapabilities
killPluginClient func()
}

func (c *ClientPluginServices) Close() error {
if c.killPluginClient != nil {
c.killPluginClient()
}
return nil
}

// PluginBuilder is used to create storage plugins. Implemented by Configuration.
Expand Down Expand Up @@ -154,10 +161,6 @@ func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.Tra
GRPCDialOptions: opts,
})

runtime.SetFinalizer(client, func(c *plugin.Client) {
c.Kill()
})

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
Expand Down Expand Up @@ -200,7 +203,8 @@ func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.Tra
ArchiveStore: archiveStoragePlugin,
StreamingSpanWriter: streamingSpanWriterPlugin,
},
Capabilities: capabilities,
Capabilities: capabilities,
killPluginClient: client.Kill,
}, nil
}

Expand Down
11 changes: 10 additions & 1 deletion plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"errors"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -53,6 +54,8 @@ type Factory struct {
archiveStore shared.ArchiveStoragePlugin
streamingSpanWriter shared.StreamingSpanWriterPlugin
capabilities shared.PluginCapabilities

servicesCloser io.Closer
}

// NewFactory creates a new Factory.
Expand Down Expand Up @@ -108,6 +111,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.archiveStore = services.ArchiveStore
f.capabilities = services.Capabilities
f.streamingSpanWriter = services.StreamingSpanWriter
f.servicesCloser = services
logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration))
return nil
}
Expand Down Expand Up @@ -164,5 +168,10 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {

// Close closes the resources held by the factory
func (f *Factory) Close() error {
return f.builder.Close()
errs := []error{}
if f.servicesCloser != nil {
errs = append(errs, f.servicesCloser.Close())
}
errs = append(errs, f.builder.Close())
return errors.Join(errs...)
}
9 changes: 8 additions & 1 deletion plugin/storage/integration/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,15 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) {
s.CleanUp = s.cleanUp
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) {
func (s *GRPCStorageIntegrationTestSuite) close(t *testing.T) {
require.NoError(t, s.factory.Close())
if s.useRemoteStorage {
s.remoteStorage.Close(t)
}
}

func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) {
s.close(t)
s.initialize(t)
}

Expand Down Expand Up @@ -108,6 +112,7 @@ func TestGRPCStorage(t *testing.T) {
flags: flags,
}
s.initialize(t)
defer s.close(t)
s.RunAll(t)
}

Expand All @@ -124,6 +129,7 @@ func TestGRPCStreamingWriter(t *testing.T) {
flags: flags,
}
s.initialize(t)
defer s.close(t)
s.RunAll(t)
}

Expand All @@ -139,5 +145,6 @@ func TestGRPCRemoteStorage(t *testing.T) {
useRemoteStorage: true,
}
s.initialize(t)
defer s.close(t)
s.RunAll(t)
}

0 comments on commit a9a7bc2

Please sign in to comment.