diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go index f6dc6094331..c22e9f9d6f1 100644 --- a/plugin/storage/grpc/config/config.go +++ b/plugin/storage/grpc/config/config.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "os/exec" - "runtime" "time" "github.com/hashicorp/go-hclog" @@ -56,7 +55,15 @@ type Configuration struct { // ClientPluginServices defines services plugin can expose and its capabilities type ClientPluginServices struct { shared.PluginServices - Capabilities shared.PluginCapabilities + Capabilities shared.PluginCapabilities + killPluginClient func() +} + +func (c *ClientPluginServices) Close() error { + if c.killPluginClient != nil { + c.killPluginClient() + } + return nil } // PluginBuilder is used to create storage plugins. Implemented by Configuration. @@ -154,10 +161,6 @@ func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.Tra GRPCDialOptions: opts, }) - runtime.SetFinalizer(client, func(c *plugin.Client) { - c.Kill() - }) - rpcClient, err := client.Client() if err != nil { return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err) @@ -200,7 +203,8 @@ func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.Tra ArchiveStore: archiveStoragePlugin, StreamingSpanWriter: streamingSpanWriterPlugin, }, - Capabilities: capabilities, + Capabilities: capabilities, + killPluginClient: client.Kill, }, nil } diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go index 8a9d360018a..0009ebcb6c8 100644 --- a/plugin/storage/grpc/factory.go +++ b/plugin/storage/grpc/factory.go @@ -15,6 +15,7 @@ package grpc import ( + "errors" "flag" "fmt" "io" @@ -53,6 +54,8 @@ type Factory struct { archiveStore shared.ArchiveStoragePlugin streamingSpanWriter shared.StreamingSpanWriterPlugin capabilities shared.PluginCapabilities + + servicesCloser io.Closer } // NewFactory creates a new Factory. @@ -108,6 +111,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.archiveStore = services.ArchiveStore f.capabilities = services.Capabilities f.streamingSpanWriter = services.StreamingSpanWriter + f.servicesCloser = services logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration)) return nil } @@ -164,5 +168,10 @@ func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) { // Close closes the resources held by the factory func (f *Factory) Close() error { - return f.builder.Close() + errs := []error{} + if f.servicesCloser != nil { + errs = append(errs, f.servicesCloser.Close()) + } + errs = append(errs, f.builder.Close()) + return errors.Join(errs...) } diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go index 46358b62b02..ef0ac89e70f 100644 --- a/plugin/storage/integration/grpc_test.go +++ b/plugin/storage/integration/grpc_test.go @@ -74,11 +74,15 @@ func (s *GRPCStorageIntegrationTestSuite) initialize(t *testing.T) { s.CleanUp = s.cleanUp } -func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) { +func (s *GRPCStorageIntegrationTestSuite) close(t *testing.T) { require.NoError(t, s.factory.Close()) if s.useRemoteStorage { s.remoteStorage.Close(t) } +} + +func (s *GRPCStorageIntegrationTestSuite) cleanUp(t *testing.T) { + s.close(t) s.initialize(t) } @@ -108,6 +112,7 @@ func TestGRPCStorage(t *testing.T) { flags: flags, } s.initialize(t) + defer s.close(t) s.RunAll(t) } @@ -124,6 +129,7 @@ func TestGRPCStreamingWriter(t *testing.T) { flags: flags, } s.initialize(t) + defer s.close(t) s.RunAll(t) } @@ -139,5 +145,6 @@ func TestGRPCRemoteStorage(t *testing.T) { useRemoteStorage: true, } s.initialize(t) + defer s.close(t) s.RunAll(t) }