Skip to content

Commit

Permalink
Make gRPC ping time and timeout into parameters
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Jan 4, 2022
1 parent 0b2e3ec commit 6788324
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [CHANGE] Remove package `math`. #104
* [CHANGE] time: Remove time package. #103
* [CHANGE] grpcutil: Convert Resolver into concrete type. #105
* [CHANGE] grpcclient: Make ping time and timeout into configuration parameters. #56
* [ENHANCEMENT] Add middleware package. #38
* [ENHANCEMENT] Add the ring package #45
* [ENHANCEMENT] Add limiter package. #41
Expand Down
11 changes: 9 additions & 2 deletions crypto/tls/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,17 @@ func (cfg *ClientConfig) GetTLSConfig() (*tls.Config, error) {
return config, nil
}

// GetGRPCDialOptions creates GRPC DialOptions for TLS
// WithInsecure wraps grpc.WithInsecure.
//
// Stubbable for tests.
var WithInsecure = func() grpc.DialOption {
return grpc.WithInsecure()
}

// GetGRPCDialOptions creates GRPC DialOptions for TLS.
func (cfg *ClientConfig) GetGRPCDialOptions(enabled bool) ([]grpc.DialOption, error) {
if !enabled {
return []grpc.DialOption{grpc.WithInsecure()}, nil
return []grpc.DialOption{WithInsecure()}, nil
}

tlsConfig, err := cfg.GetTLSConfig()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.0
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.5.6
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0
github.com/hashicorp/consul/api v1.9.1
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down
41 changes: 32 additions & 9 deletions grpcclient/grpcclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ type Config struct {
GRPCCompression string `yaml:"grpc_compression"`
RateLimit float64 `yaml:"rate_limit"`
RateLimitBurst int `yaml:"rate_limit_burst"`
// KeepaliveTime is the number of seconds after which the client will ping the server in case of inactivity.
//
// See `google.golang.org/grpc/keepalive.ClientParameters.Time` for reference.
KeepaliveTime int64 `yaml:"ping_time"`
// KeepaliveTimeOut is the number of seconds the client waits after pinging the server, and if no activity is seen
// after that, the connection is closed.
//
// See `google.golang.org/grpc/keepalive.ClientParameters.Timeout` for reference.
KeepaliveTimeout int64 `yaml:"ping_timeout"`

BackoffOnRatelimits bool `yaml:"backoff_on_ratelimits"`
BackoffConfig backoff.Config `yaml:"backoff_config"`
Expand Down Expand Up @@ -74,12 +83,10 @@ func (cfg *Config) CallOptions() []grpc.CallOption {

// DialOption returns the config as a grpc.DialOptions.
func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientInterceptor, streamClientInterceptors []grpc.StreamClientInterceptor) ([]grpc.DialOption, error) {
var opts []grpc.DialOption
tlsOpts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled)
opts, err := cfg.TLS.GetGRPCDialOptions(cfg.TLSEnabled)
if err != nil {
return nil, err
}
opts = append(opts, tlsOpts...)

if cfg.BackoffOnRatelimits {
unaryClientInterceptors = append([]grpc.UnaryClientInterceptor{NewBackoffRetry(cfg.BackoffConfig)}, unaryClientInterceptors...)
Expand All @@ -91,13 +98,29 @@ func (cfg *Config) DialOption(unaryClientInterceptors []grpc.UnaryClientIntercep

return append(
opts,
grpc.WithDefaultCallOptions(cfg.CallOptions()...),
grpc.WithUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)),
grpc.WithStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: time.Second * 20,
Timeout: time.Second * 10,
withDefaultCallOptions(cfg.CallOptions()...),
withUnaryInterceptor(middleware.ChainUnaryClient(unaryClientInterceptors...)),
withStreamInterceptor(middleware.ChainStreamClient(streamClientInterceptors...)),
withKeepaliveParams(keepalive.ClientParameters{
Time: time.Duration(cfg.KeepaliveTime) * time.Second,
Timeout: time.Duration(cfg.KeepaliveTimeout) * time.Second,
PermitWithoutStream: true,
}),
), nil
}

var withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption {
return grpc.WithDefaultCallOptions(cos...)
}

var withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption {
return grpc.WithUnaryInterceptor(f)
}

var withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption {
return grpc.WithStreamInterceptor(f)
}

var withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption {
return grpc.WithKeepaliveParams(kp)
}
121 changes: 121 additions & 0 deletions grpcclient/grpcclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package grpcclient

import (
"testing"
"time"

"github.com/google/go-cmp/cmp"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

"github.com/grafana/dskit/crypto/tls"
)

type fakeDialOption struct {
grpc.EmptyDialOption

callOpts []grpc.CallOption
isInsecure bool
unaryClientInterceptor grpc.UnaryClientInterceptor
streamClientInterceptor grpc.StreamClientInterceptor
keepaliveParams keepalive.ClientParameters
}

func (o fakeDialOption) Equal(other fakeDialOption) bool {
if len(o.callOpts) != len(other.callOpts) {
return false
}

for i, arg := range o.callOpts {
if maxRecv, ok := arg.(grpc.MaxRecvMsgSizeCallOption); ok {
if maxRecv != other.callOpts[i].(grpc.MaxRecvMsgSizeCallOption) {
return false
}
continue
}
if maxSend, ok := arg.(grpc.MaxSendMsgSizeCallOption); ok {
if maxSend != other.callOpts[i].(grpc.MaxSendMsgSizeCallOption) {
return false
}
continue
}
}

hasUnaryInterceptor := o.unaryClientInterceptor != nil
otherHasUnaryInterceptor := other.unaryClientInterceptor != nil
hasStreamInterceptor := o.streamClientInterceptor != nil
otherHasStreamInterceptor := other.streamClientInterceptor != nil

return o.isInsecure == other.isInsecure && hasUnaryInterceptor == otherHasUnaryInterceptor &&
hasStreamInterceptor == otherHasStreamInterceptor && o.keepaliveParams == other.keepaliveParams
}

func TestConfig(t *testing.T) {
origWithDefaultCallOptions := withDefaultCallOptions
origWithUnaryInterceptor := withUnaryInterceptor
origWithStreamInterceptor := withStreamInterceptor
origWithKeepaliveParams := withKeepaliveParams
origWithInsecure := tls.WithInsecure
t.Cleanup(func() {
withDefaultCallOptions = origWithDefaultCallOptions
withUnaryInterceptor = origWithUnaryInterceptor
withStreamInterceptor = origWithStreamInterceptor
withKeepaliveParams = origWithKeepaliveParams
tls.WithInsecure = origWithInsecure
})

withDefaultCallOptions = func(cos ...grpc.CallOption) grpc.DialOption {
t.Log("Received call options", "options", cos)
return fakeDialOption{callOpts: cos}
}
withUnaryInterceptor = func(f grpc.UnaryClientInterceptor) grpc.DialOption {
t.Log("Received unary client interceptor", f)
return fakeDialOption{unaryClientInterceptor: f}
}
withStreamInterceptor = func(f grpc.StreamClientInterceptor) grpc.DialOption {
t.Log("Received stream client interceptor", f)
return fakeDialOption{streamClientInterceptor: f}
}
withKeepaliveParams = func(kp keepalive.ClientParameters) grpc.DialOption {
t.Log("Received keepalive params", kp)
return fakeDialOption{
keepaliveParams: kp,
}
}
tls.WithInsecure = func() grpc.DialOption {
return fakeDialOption{isInsecure: true}
}

cfg := Config{
KeepaliveTime: 10,
KeepaliveTimeout: 20,
}
expOpts := []grpc.DialOption{
fakeDialOption{isInsecure: true},
fakeDialOption{callOpts: []grpc.CallOption{
grpc.MaxCallRecvMsgSize(0),
grpc.MaxCallSendMsgSize(0),
}},
fakeDialOption{
unaryClientInterceptor: middleware.ChainUnaryClient(),
},
fakeDialOption{
streamClientInterceptor: middleware.ChainStreamClient(),
},
fakeDialOption{
keepaliveParams: keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 20 * time.Second,
PermitWithoutStream: true,
},
},
}

opts, err := cfg.DialOption(nil, nil)
require.NoError(t, err)

assert.Empty(t, cmp.Diff(expOpts, opts))
}

0 comments on commit 6788324

Please sign in to comment.