Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpcclient: Make keepalive time and timeout into config parameters #56

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
* [CHANGE] Cache: Switch Memcached backend to use `github.com/grafana/gomemcache` repository instead of `github.com/bradfitz/gomemcache`. #248
* [CHANGE] Multierror: Implement `Is(error) bool`. This allows to use `multierror.MultiError` with `errors.Is()`. `MultiError` will in turn call `errors.Is()` on every error value. #254
* [CHANGE] Cache: Remove the `context.Context` argument from the `Cache.Store` method and rename the method to `Cache.StoreAsync`. #273
* [CHANGE] grpcclient: Make ping time and timeout into `Config.DialOption` arguments (with same defaults as corresponding `google.golang.org/grpc/keepalive.ClientParameters.Time*` parameters). #56
* [FEATURE] Cache: Add support for configuring a Redis cache backend. #268 #271 #276
* [FEATURE] Add support for waiting on the rate limiter using the new `WaitN` method. #279
* [ENHANCEMENT] Add configuration to customize backoff for the gRPC clients.
Expand Down
13 changes: 10 additions & 3 deletions crypto/tls/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,16 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) {
return config, nil
}

// GetGRPCDialOptions creates GRPC DialOptions for TLS
func (cfg *ClientConfig) GetGRPCDialOptions(enabled bool) ([]grpc.DialOption, error) {
if !enabled {
// WithInsecure wraps grpc.WithInsecure.
//
// Stubbable for tests.
var WithInsecure = func() grpc.DialOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we wanna to expose it? Looks weird to me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's technically necessary for grpcclient tests to be able to stub calling out to the grpc package. I'll look again into whether there's another way to do this (dependency injection?).

return grpc.WithInsecure()
}

// GetGRPCDialOptions creates GRPC DialOptions for TLS.
func (cfg *ClientConfig) GetGRPCDialOptions(tlsEnabled bool) ([]grpc.DialOption, error) {
if !tlsEnabled {
return []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}, nil
}

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.5.6
github.com/grafana/gomemcache v0.0.0-20230316202710-a081dae0aba9
github.com/hashicorp/consul/api v1.15.3
github.com/hashicorp/go-cleanhttp v0.5.2
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down
43 changes: 32 additions & 11 deletions grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,19 @@ func (cfg *Config) CallOptions() []grpc.CallOption {
return opts
}

// DialOption returns the config as a grpc.DialOptions.
func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
tlsOpts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled)
// DialOption returns the config as a slice of grpc.DialOptions.
//
// keepaliveTime is the number of seconds after which the client will ping the server in case of inactivity.
// See `google.golang.org/grpc/keepalive.ClientParameters.Time` for reference.
//
// keepaliveTimeout is the number of seconds the client waits after pinging the server, and if no activity is
// seen after that, the connection is closed. See `google.golang.org/grpc/keepalive.ClientParameters.Timeout`
// for reference.
func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor, keepaliveTime, keepaliveTimeout int64) ([]grpc.DialOption, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think keepaliveTime and keepaliveTimeout should be time.Duration.

opts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled)
if err != nil {
return nil, err
}
opts = append(opts, tlsOpts...)

if cfg.BackoffOnRatelimits {
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...)
Expand Down Expand Up @@ -120,13 +125,29 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep

return append(
opts,
grpc.WithDefaultCallOptions(cfg.CallOptions()...),
grpc.WithChainUnaryInterceptor(unaryClientInterceptors...),
grpc.WithChainStreamInterceptor(streamClientInterceptors...),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 20,
Timeout: time.Second * 10,
withDefaultCallOptions(cfg.CallOptions()...),
withUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)),
withStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)),
withKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(keepaliveTime) * time.Second,
Timeout: time.Duration(keepaliveTimeout) * time.Second,
PermitWithoutStream: true,
}),
), nil
}

var withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using global variables for these, can they be passed to dialOption as parameters? (Default publicly-exported DialOption would just use grpc-functions). This avoids needing setup and cleanup, and can be hidden into “fakeDialOption” method or similar.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pstibrany I can try.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this feedback hasn't been addressed, am I wrong?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pracucci you're right! I was deferring addressing this until consensus on whether the PR should be closed or not.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pracucci @pstibrany the drawback I see with stubbing these dependencies via arguments is that it adds complexity. The tests will no longer call Config.DialOption itself, and introduces the risk of having a bug in Config.DialOption where it passes the wrong dependencies. The test coverage will also be lower with this technique.

If you still prefer the injection via method arguments technique, I can change to that. Just want to clarify the problems I see with it.

return grpc.WithDefaultCallOptions(cos...)
}

var withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption {
return grpc.WithUnaryInterceptor(f)
}

var withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption {
return grpc.WithStreamInterceptor(f)
}

var withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption {
return grpc.WithKeepaliveParams(kp)
}
120 changes: 120 additions & 0 deletions grpcclient/grpcclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package grpcclient

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/grafana/dskit/crypto/tls"
)

type fakeDialOption struct {
grpc.EmptyDialOption

callOpts []grpc.CallOption
isInsecure bool
unaryClientInterceptor grpc.UnaryClientInterceptor
streamClientInterceptor grpc.StreamClientInterceptor
keepaliveParams keepalive.ClientParameters
}

func (o fakeDialOption) Equal(other fakeDialOption) bool {
if len(o.callOpts) != len(other.callOpts) {
return false
}

for i, arg := range o.callOpts {
if maxRecv, ok := arg.(grpc.MaxRecvMsgSizeCallOption); ok {
if maxRecv != other.callOpts[i].(grpc.MaxRecvMsgSizeCallOption) {
return false
}
continue
}
if maxSend, ok := arg.(grpc.MaxSendMsgSizeCallOption); ok {
if maxSend != other.callOpts[i].(grpc.MaxSendMsgSizeCallOption) {
return false
}
continue
}
}

hasUnaryInterceptor := o.unaryClientInterceptor != nil
otherHasUnaryInterceptor := other.unaryClientInterceptor != nil
hasStreamInterceptor := o.streamClientInterceptor != nil
otherHasStreamInterceptor := other.streamClientInterceptor != nil

return o.isInsecure == other.isInsecure && hasUnaryInterceptor == otherHasUnaryInterceptor &&
hasStreamInterceptor == otherHasStreamInterceptor && o.keepaliveParams == other.keepaliveParams
}

func TestConfig(t *testing.T) {
origWithDefaultCallOptions := withDefaultCallOptions
origWithUnaryInterceptor := withUnaryInterceptor
origWithStreamInterceptor := withStreamInterceptor
origWithKeepaliveParams := withKeepaliveParams
origWithInsecure := tls.WithInsecure
t.Cleanup(func() {
withDefaultCallOptions = origWithDefaultCallOptions
withUnaryInterceptor = origWithUnaryInterceptor
withStreamInterceptor = origWithStreamInterceptor
withKeepaliveParams = origWithKeepaliveParams
tls.WithInsecure = origWithInsecure
})

withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption {
t.Log("Received call options", "options", cos)
return fakeDialOption{callOpts: cos}
}
withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption {
t.Log("Received unary client interceptor", f)
return fakeDialOption{unaryClientInterceptor: f}
}
withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption {
t.Log("Received stream client interceptor", f)
return fakeDialOption{streamClientInterceptor: f}
}
withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption {
t.Log("Received keepalive params", kp)
return fakeDialOption{
keepaliveParams: kp,
}
}
tls.WithInsecure = func() grpc.DialOption {
return fakeDialOption{isInsecure: true}
}

cfg := Config{}
const keepaliveTime = 10
const keepaliveTimeout = 20
expOpts := []grpc.DialOption{
fakeDialOption{isInsecure: true},
fakeDialOption{callOpts: []grpc.CallOption{
grpc.MaxCallRecvMsgSize(0),
grpc.MaxCallSendMsgSize(0),
}},
fakeDialOption{
unaryClientInterceptor: middleware.ChainUnaryClient(),
},
fakeDialOption{
streamClientInterceptor: middleware.ChainStreamClient(),
},
fakeDialOption{
keepaliveParams: keepalive.ClientParameters{
Time: keepaliveTime * time.Second,
Timeout: keepaliveTimeout * time.Second,
PermitWithoutStream: true,
},
},
}

opts, err := cfg.DialOption(nil, nil, keepaliveTime, keepaliveTimeout)
require.NoError(t, err)

assert.Empty(t, cmp.Diff(expOpts, opts))
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
}