Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [query] use otel's implementation for constructing http and grpc servers #6055

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ func startQuery(
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, telset.Metrics)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)

server, err := queryApp.NewServer(qs, metricsQueryService, qOpts, tm, telset)
// TODO: replace componenttest.NewNopHost() with solution from https://github.com/jaegertracing/jaeger/issues/6049
server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
if err != nil {
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (*server) Dependencies() []component.ID {
return []component.ID{jaegerstorage.ID}
}

func (s *server) Start(_ context.Context, host component.Host) error {
func (s *server) Start(ctx context.Context, host component.Host) error {
mf := otelmetrics.NewFactory(s.telset.MeterProvider)
baseFactory := mf.Namespace(metrics.NSOptions{Name: "jaeger"})
queryMetricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "query"})
Expand Down Expand Up @@ -100,11 +100,11 @@ func (s *server) Start(_ context.Context, host component.Host) error {
ReportStatus: func(event *componentstatus.Event) {
componentstatus.ReportStatus(host, event)
},
Host: host,
}

// TODO contextcheck linter complains about next line that context is not passed. It is not wrong.
//nolint
s.server, err = queryApp.NewServer(
ctx,
// TODO propagate healthcheck updates up to the collector's runtime
qs,
mqs,
Expand Down
188 changes: 143 additions & 45 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/soheilhy/cmux"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
Expand Down Expand Up @@ -54,7 +60,9 @@
}

// NewServer creates and initializes Server
func NewServer(querySvc *querysvc.QueryService,
func NewServer(
ctx context.Context,
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
options *QueryOptions,
tm *tenancy.Manager,
Expand All @@ -74,12 +82,13 @@
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
}

grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, tm, telset)
legacy := !separatePorts
grpcServer, err := createGRPCServer(ctx, legacy, querySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
}

httpServer, err := createHTTPServer(querySvc, metricsQuerySvc, options, tm, telset)
httpServer, err := createHTTPServer(ctx, legacy, querySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
}
Expand All @@ -94,29 +103,13 @@
}, nil
}

func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, telset telemetery.Setting) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.GRPC.TLSSetting != nil {
tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(context.Background())
if err != nil {
return nil, err
}

creds := credentials.NewTLS(tlsCfg)

grpcOpts = append(grpcOpts, grpc.Creds(creds))
}
if tm.Enabled {
grpcOpts = append(grpcOpts,
grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)),
grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)),
)
}

server := grpc.NewServer(grpcOpts...)
func registerGRPCServer(
server *grpc.Server,
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
telset telemetery.Setting,
) {
reflection.Register(server)

handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{
Logger: telset.Logger,
})
Expand All @@ -131,7 +124,67 @@
healthServer.SetServingStatus("jaeger.api_v3.QueryService", grpc_health_v1.HealthCheckResponse_SERVING)

grpc_health_v1.RegisterHealthServer(server, healthServer)
return server, nil
}

func createGRPCServer(
ctx context.Context,
legacy bool,
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
options *QueryOptions,
tm *tenancy.Manager,
telset telemetery.Setting,
) (*grpc.Server, error) {
var grpcServer *grpc.Server
if legacy {
var grpcOpts []grpc.ServerOption
if options.GRPC.TLSSetting != nil {
tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(ctx)
if err != nil {
return nil, err

Check warning on line 144 in cmd/query/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/server.go#L142-L144

Added lines #L142 - L144 were not covered by tests
}

creds := credentials.NewTLS(tlsCfg)

Check warning on line 147 in cmd/query/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/server.go#L147

Added line #L147 was not covered by tests

grpcOpts = append(grpcOpts, grpc.Creds(creds))

Check warning on line 149 in cmd/query/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/server.go#L149

Added line #L149 was not covered by tests
}
if tm.Enabled {
grpcOpts = append(grpcOpts,
//nolint
grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)),
grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)),
)
}

grpcServer = grpc.NewServer(grpcOpts...)
} else {
var grpcOpts []configgrpc.ToServerOption
if tm.Enabled {
grpcOpts = append(grpcOpts,

Check warning on line 163 in cmd/query/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/server.go#L163

Added line #L163 was not covered by tests
//nolint
configgrpc.WithGrpcServerOption(grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm))),
configgrpc.WithGrpcServerOption(grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm))),
)

Check warning on line 167 in cmd/query/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/server.go#L165-L167

Added lines #L165 - L167 were not covered by tests
}
var err error
grpcServer, err = options.GRPC.ToServer(
ctx,
telset.Host,
component.TelemetrySettings{
Logger: telset.Logger,
TracerProvider: telset.TracerProvider,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
},
grpcOpts...)
if err != nil {
return nil, err
}
}
registerGRPCServer(grpcServer, querySvc, metricsQuerySvc, telset)

return grpcServer, nil
}

type httpServer struct {
Expand All @@ -141,24 +194,25 @@

var _ io.Closer = (*httpServer)(nil)

func createHTTPServer(
func createHTTPRouter(
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
queryOpts *QueryOptions,
tm *tenancy.Manager,
telset telemetery.Setting,
) (*httpServer, error) {
) *mux.Router {
apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(telset.Logger),
HandlerOptions.Tracer(telset.TracerProvider),
HandlerOptions.MetricsQueryService(metricsQuerySvc),
}

apiHandler := NewAPIHandler(
querySvc,
tm,
apiHandlerOptions...)

r := NewRouter()

if queryOpts.BasePath != "/" {
r = r.PathPrefix(queryOpts.BasePath).Subrouter()
}
Expand All @@ -169,33 +223,77 @@
Logger: telset.Logger,
Tracer: telset.TracerProvider,
}).RegisterRoutes(r)

apiHandler.RegisterRoutes(r)

return r
}

func createHTTPServer(
ctx context.Context,
legacy bool,
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
queryOpts *QueryOptions,
tm *tenancy.Manager,
telset telemetery.Setting,
) (*httpServer, error) {
r := createHTTPRouter(querySvc, metricsQuerySvc, queryOpts, tm, telset)
var handler http.Handler = r
handler = responseHeadersHandler(handler, queryOpts.HTTP.ResponseHeaders)
if queryOpts.BearerTokenPropagation {
handler = bearertoken.PropagationHandler(telset.Logger, handler)
}
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(telset.Logger, true)
handler = recoveryHandler(handler)
var server *httpServer
if legacy {
handler = responseHeadersHandler(handler, queryOpts.HTTP.ResponseHeaders)
handler = handlers.CompressHandler(handler)

errorLog, _ := zap.NewStdLogAt(telset.Logger, zapcore.ErrorLevel)
server = &httpServer{
Server: &http.Server{
Handler: handler,
ErrorLog: errorLog,
ReadHeaderTimeout: 2 * time.Second,
},
}

errorLog, _ := zap.NewStdLogAt(telset.Logger, zapcore.ErrorLevel)
server := &httpServer{
Server: &http.Server{
Handler: recoveryHandler(handler),
ErrorLog: errorLog,
ReadHeaderTimeout: 2 * time.Second,
},
}

if queryOpts.HTTP.TLSSetting != nil {
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(context.Background()) // This checks if the certificates are correctly provided
if queryOpts.HTTP.TLSSetting != nil {
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(ctx) // This checks if the certificates are correctly provided
if err != nil {
return nil, err

Check warning on line 264 in cmd/query/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/server.go#L262-L264

Added lines #L262 - L264 were not covered by tests
}
server.TLSConfig = tlsCfg

Check warning on line 266 in cmd/query/app/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/query/app/server.go#L266

Added line #L266 was not covered by tests
}
} else {
s, err := queryOpts.HTTP.ToServer(
ctx,
telset.Host,
component.TelemetrySettings{
Logger: telset.Logger,
TracerProvider: telset.TracerProvider,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
},
handler,
)
if err != nil {
return nil, err
}
server.TLSConfig = tlsCfg
}
server = &httpServer{
Server: s,
}

// TODO: remove after ToListener is implemented
if queryOpts.HTTP.TLSSetting != nil {
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(ctx) // This checks if the certificates are correctly provided
if err != nil {
return nil, err
}
server.TLSConfig = tlsCfg
}
}
server.staticHandlerCloser = RegisterStaticHandler(r, telset.Logger, queryOpts, querySvc.GetCapabilities())

return server, nil
Expand Down
Loading
Loading