Skip to content

Commit

Permalink
feat: add support for setting custom grpc interceptors for requests s…
Browse files Browse the repository at this point in the history
…ent to index-gateway (#12193)

(cherry picked from commit 25f15cb)
  • Loading branch information
sandeepsukhani committed Mar 13, 2024
1 parent f6cfff5 commit 832898f
Showing 1 changed file with 23 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import (
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
Expand Down Expand Up @@ -72,6 +75,9 @@ type IndexGatewayClientConfig struct {
// LogGatewayRequests configures if requests sent to the gateway should be logged or not.
// The log messages are of type debug and contain the address of the gateway and the relevant tenant.
LogGatewayRequests bool `yaml:"log_gateway_requests"`

GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`
}

// RegisterFlagsWithPrefix register client-specific flags with the given prefix.
Expand Down Expand Up @@ -136,7 +142,7 @@ func NewGatewayClient(cfg IndexGatewayClientConfig, r prometheus.Registerer, lim
done: make(chan struct{}),
}

dialOpts, err := cfg.GRPCClientConfig.DialOption(grpcclient.Instrument(sgClient.storeGatewayClientRequestDuration))
dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation(cfg, sgClient.storeGatewayClientRequestDuration))
if err != nil {
return nil, errors.Wrap(err, "index gateway grpc dial option")
}
Expand Down Expand Up @@ -458,3 +464,19 @@ func (b *grpcIter) RangeValue() []byte {
func (b *grpcIter) Value() []byte {
return b.Rows[b.i].Value
}

func instrumentation(cfg IndexGatewayClientConfig, clientRequestDuration *prometheus.HistogramVec) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
var unaryInterceptors []grpc.UnaryClientInterceptor
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()))
unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor)
unaryInterceptors = append(unaryInterceptors, middleware.UnaryClientInstrumentInterceptor(clientRequestDuration))

var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()))
streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor)
streamInterceptors = append(streamInterceptors, middleware.StreamClientInstrumentInterceptor(clientRequestDuration))

return unaryInterceptors, streamInterceptors
}

0 comments on commit 832898f

Please sign in to comment.