From 8602c9efa1919b5642720dd49e2799abaf725763 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sat, 5 Oct 2024 18:58:37 -0400 Subject: [PATCH] Split Changes Into Two Separate Flows Signed-off-by: Mahad Zaryab --- cmd/all-in-one/main.go | 3 +- .../internal/extension/jaegerquery/server.go | 1 + cmd/query/app/server.go | 183 ++++++++++++++++-- cmd/query/app/server_test.go | 55 ++---- cmd/query/main.go | 2 +- 5 files changed, 193 insertions(+), 51 deletions(-) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 2551614d382..eb43406a5bb 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -275,8 +275,9 @@ func startQuery( spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, telset.Metrics) qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) + allowSamePort := true // TODO: replace componenttest.NewNopHost() with solution from https://github.com/jaegertracing/jaeger/issues/6049 - server, err := queryApp.NewServer(context.Background(), componenttest.NewNopHost(), qs, metricsQueryService, qOpts, tm, telset) + server, err := queryApp.NewServer(context.Background(), componenttest.NewNopHost(), qs, metricsQueryService, qOpts, tm, telset, allowSamePort) if err != nil { svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err)) } diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 53e83bc19f3..f85e63a9c61 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -111,6 +111,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error { &s.config.QueryOptions, tm, telset, + false, ) if err != nil { return fmt.Errorf("could not create jaeger-query: %w", err) diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 92f71ba3c2d..fa4db4d3e68 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -12,7 +12,9 @@ import ( "net/http" "strings" "sync" + "time" + "github.com/gorilla/handlers" "github.com/soheilhy/cmux" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" @@ -21,7 +23,9 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" @@ -31,12 +35,15 @@ import ( "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/pkg/bearertoken" "github.com/jaegertracing/jaeger/pkg/netutils" + "github.com/jaegertracing/jaeger/pkg/recoveryhandler" "github.com/jaegertracing/jaeger/pkg/telemetery" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" ) +var ErrSameGRPCandHTTPPort = errors.New("cannot use same port for grpc and http server") + // Server runs HTTP, Mux and a grpc server type Server struct { querySvc *querysvc.QueryService @@ -62,6 +69,7 @@ func NewServer( options *QueryOptions, tm *tenancy.Manager, telset telemetery.Setting, + allowSamePort bool, ) (*Server, error) { _, httpPort, err := net.SplitHostPort(options.HTTP.Endpoint) if err != nil { @@ -72,19 +80,37 @@ func NewServer( return nil, fmt.Errorf("invalid gRPC server host:port: %w", err) } separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0" + if !separatePorts && !allowSamePort { + return nil, ErrSameGRPCandHTTPPort + } if (options.HTTP.TLSSetting != nil || options.GRPC.TLSSetting != nil) && !separatePorts { return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead") } - grpcServer, err := createGRPCServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset) - if err != nil { - return nil, err - } + var grpcServer *grpc.Server + var httpServer *httpServer + if separatePorts { + grpcServer, err = createGRPCServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset) + if err != nil { + return nil, err + } - httpServer, err := createHTTPServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset) - if err != nil { - return nil, err + httpServer, err = createHTTPServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset) + if err != nil { + return nil, err + } + } else { + telset.Logger.Error("using the same port for gRPC and HTTP is deprecated; please use dedicated host ports intead") + grpcServer, err = createGRPCServerDeprecated(querySvc, metricsQuerySvc, options, tm, telset) + if err != nil { + return nil, err + } + + httpServer, err = createHTTPServerDeprecated(querySvc, metricsQuerySvc, options, tm, telset) + if err != nil { + return nil, err + } } return &Server{ @@ -228,6 +254,106 @@ func createHTTPServer( return httpServer, nil } +func createGRPCServerDeprecated(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, telset telemetery.Setting) (*grpc.Server, error) { + var grpcOpts []grpc.ServerOption + + if options.GRPC.TLSSetting != nil { + tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(context.Background()) + if err != nil { + return nil, err + } + + creds := credentials.NewTLS(tlsCfg) + + grpcOpts = append(grpcOpts, grpc.Creds(creds)) + } + if tm.Enabled { + grpcOpts = append(grpcOpts, + grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)), + grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)), + ) + } + + server := grpc.NewServer(grpcOpts...) + reflection.Register(server) + + handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{ + Logger: telset.Logger, + }) + healthServer := health.NewServer() + + api_v2.RegisterQueryServiceServer(server, handler) + metrics.RegisterMetricsQueryServiceServer(server, handler) + api_v3.RegisterQueryServiceServer(server, &apiv3.Handler{QueryService: querySvc}) + + healthServer.SetServingStatus("jaeger.api_v2.QueryService", grpc_health_v1.HealthCheckResponse_SERVING) + healthServer.SetServingStatus("jaeger.api_v2.metrics.MetricsQueryService", grpc_health_v1.HealthCheckResponse_SERVING) + healthServer.SetServingStatus("jaeger.api_v3.QueryService", grpc_health_v1.HealthCheckResponse_SERVING) + + grpc_health_v1.RegisterHealthServer(server, healthServer) + return server, nil +} + +func createHTTPServerDeprecated( + querySvc *querysvc.QueryService, + metricsQuerySvc querysvc.MetricsQueryService, + queryOpts *QueryOptions, + tm *tenancy.Manager, + telset telemetery.Setting, +) (*httpServer, error) { + apiHandlerOptions := []HandlerOption{ + HandlerOptions.Logger(telset.Logger), + HandlerOptions.Tracer(telset.TracerProvider), + HandlerOptions.MetricsQueryService(metricsQuerySvc), + } + + apiHandler := NewAPIHandler( + querySvc, + tm, + apiHandlerOptions...) + r := NewRouter() + if queryOpts.BasePath != "/" { + r = r.PathPrefix(queryOpts.BasePath).Subrouter() + } + + (&apiv3.HTTPGateway{ + QueryService: querySvc, + TenancyMgr: tm, + Logger: telset.Logger, + Tracer: telset.TracerProvider, + }).RegisterRoutes(r) + + apiHandler.RegisterRoutes(r) + var handler http.Handler = r + handler = responseHeadersHandler(handler, queryOpts.HTTP.ResponseHeaders) + if queryOpts.BearerTokenPropagation { + handler = bearertoken.PropagationHandler(telset.Logger, handler) + } + handler = handlers.CompressHandler(handler) + recoveryHandler := recoveryhandler.NewRecoveryHandler(telset.Logger, true) + + errorLog, _ := zap.NewStdLogAt(telset.Logger, zapcore.ErrorLevel) + server := &httpServer{ + Server: &http.Server{ + Handler: recoveryHandler(handler), + ErrorLog: errorLog, + ReadHeaderTimeout: 2 * time.Second, + }, + } + + if queryOpts.HTTP.TLSSetting != nil { + tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(context.Background()) // This checks if the certificates are correctly provided + if err != nil { + return nil, err + } + server.TLSConfig = tlsCfg + } + + server.staticHandlerCloser = RegisterStaticHandler(r, telset.Logger, queryOpts, querySvc.GetCapabilities()) + + return server, nil +} + func (hS httpServer) Close() error { var errs []error errs = append(errs, hS.Server.Close()) @@ -236,15 +362,36 @@ func (hS httpServer) Close() error { } // initListener initialises listeners of the server -func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) { +func (s *Server) initListener(ctx context.Context) error { + var err error + s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx) + if err != nil { + return err + } + + s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx) + if err != nil { + return err + } + s.Logger.Info( + "Query server started", + zap.String("http_addr", s.HTTPAddr()), + zap.String("grpc_addr", s.GRPCAddr()), + ) + return nil + +} + +// initListener initialises listeners of the server +func (s *Server) initListenerDeprecated() (cmux.CMux, error) { if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests var err error - s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx) + s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPC.NetAddr.Endpoint) if err != nil { return nil, err } - s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx) + s.httpConn, err = net.Listen("tcp", s.queryOptions.HTTP.Endpoint) if err != nil { return nil, err } @@ -288,11 +435,19 @@ func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) { // Start http, GRPC and cmux servers concurrently func (s *Server) Start(ctx context.Context) error { - cmuxServer, err := s.initListener(ctx) - if err != nil { - return fmt.Errorf("query server failed to initialize listener: %w", err) + var cmuxServer cmux.CMux + if !s.separatePorts { + cmuxServer, err := s.initListenerDeprecated() + if err != nil { + return fmt.Errorf("query server failed to initialize listener: %w", err) + } + s.cmuxServer = cmuxServer + } else { + err := s.initListener(ctx) + if err != nil { + return fmt.Errorf("query server failed to initialize listener: %w", err) + } } - s.cmuxServer = cmuxServer var tcpPort int if !s.separatePorts { diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 61bf047c0ba..0032316ca40 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -75,7 +75,7 @@ func TestCreateTLSServerSinglePortError(t *testing.T) { HTTP: confighttp.ServerConfig{Endpoint: ":8080", TLSSetting: &tlsCfg}, GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8080", Transport: "tcp"}, TLSSetting: &tlsCfg}, }, - tenancy.NewManager(&tenancy.Options{}), telset) + tenancy.NewManager(&tenancy.Options{}), telset, false) require.Error(t, err) } @@ -93,7 +93,7 @@ func TestCreateTLSGrpcServerError(t *testing.T) { HTTP: confighttp.ServerConfig{Endpoint: ":8080"}, GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8081", Transport: "tcp"}, TLSSetting: &tlsCfg}, }, - tenancy.NewManager(&tenancy.Options{}), telset) + tenancy.NewManager(&tenancy.Options{}), telset, false) require.Error(t, err) } @@ -110,7 +110,7 @@ func TestCreateTLSHttpServerError(t *testing.T) { &QueryOptions{ HTTP: confighttp.ServerConfig{Endpoint: ":8080", TLSSetting: &tlsCfg}, GRPC: configgrpc.ServerConfig{NetAddr: confignet.AddrConfig{Endpoint: ":8081", Transport: "tcp"}}, - }, tenancy.NewManager(&tenancy.Options{}), telset) + }, tenancy.NewManager(&tenancy.Options{}), telset, false) require.Error(t, err) } @@ -393,7 +393,7 @@ func TestServerHTTPTLS(t *testing.T) { querySvc := makeQuerySvc() server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), - telset) + telset, false) require.NoError(t, err) require.NoError(t, server.Start(context.Background())) t.Cleanup(func() { @@ -532,7 +532,7 @@ func TestServerGRPCTLS(t *testing.T) { telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil, serverOptions, tenancy.NewManager(&tenancy.Options{}), - telset) + telset, false) require.NoError(t, err) require.NoError(t, server.Start(context.Background())) t.Cleanup(func() { @@ -586,7 +586,7 @@ func TestServerBadHostPort(t *testing.T) { }, }, tenancy.NewManager(&tenancy.Options{}), - telset) + telset, false) require.Error(t, err) _, err = NewServer(context.Background(), componenttest.NewNopHost(), &querysvc.QueryService{}, nil, @@ -603,7 +603,7 @@ func TestServerBadHostPort(t *testing.T) { }, }, tenancy.NewManager(&tenancy.Options{}), - telset) + telset, false) require.Error(t, err) } @@ -643,6 +643,7 @@ func TestServerInUseHostPort(t *testing.T) { }, tenancy.NewManager(&tenancy.Options{}), telset, + false, ) require.NoError(t, err) require.Error(t, server.Start(context.Background())) @@ -657,7 +658,7 @@ func TestServerSinglePort(t *testing.T) { hostPort := ports.PortToHostPort(ports.QueryHTTP) querySvc := makeQuerySvc() telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) - server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil, + _, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil, &QueryOptions{ BearerTokenPropagation: true, HTTP: confighttp.ServerConfig{ @@ -671,25 +672,8 @@ func TestServerSinglePort(t *testing.T) { }, }, tenancy.NewManager(&tenancy.Options{}), - telset) - require.NoError(t, err) - require.NoError(t, server.Start(context.Background())) - t.Cleanup(func() { - require.NoError(t, server.Close()) - }) - - client := newGRPCClient(t, hostPort) - t.Cleanup(func() { - require.NoError(t, client.conn.Close()) - }) - - // using generous timeout since grpc.NewClient no longer does a handshake. - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) - require.NoError(t, err) - assert.Equal(t, querySvc.expectedServices, res.Services) + telset, false) + require.ErrorIs(t, err, ErrSameGRPCandHTTPPort) } func TestServerGracefulExit(t *testing.T) { @@ -699,29 +683,30 @@ func TestServerGracefulExit(t *testing.T) { assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.") flagsSvc.Logger = zap.New(zapCore) - hostPort := ports.PortToHostPort(ports.QueryAdminHTTP) + httpHostPort := ports.PortToHostPort(ports.QueryHTTP) + grpcHostPort := ports.PortToHostPort(ports.QueryGRPC) querySvc := makeQuerySvc() telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil, &QueryOptions{ HTTP: confighttp.ServerConfig{ - Endpoint: hostPort, + Endpoint: httpHostPort, }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: hostPort, + Endpoint: grpcHostPort, Transport: "tcp", }, }, }, - tenancy.NewManager(&tenancy.Options{}), telset) + tenancy.NewManager(&tenancy.Options{}), telset, false) require.NoError(t, err) require.NoError(t, server.Start(context.Background())) // Wait for servers to come up before we can call .Close() { - client := newGRPCClient(t, hostPort) + client := newGRPCClient(t, grpcHostPort) t.Cleanup(func() { require.NoError(t, client.conn.Close()) }) @@ -759,7 +744,7 @@ func TestServerHandlesPortZero(t *testing.T) { }, }, tenancy.NewManager(&tenancy.Options{}), - telset) + telset, false) require.NoError(t, err) require.NoError(t, server.Start(context.Background())) defer server.Close() @@ -805,7 +790,7 @@ func TestServerHTTPTenancy(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: ":8080", + Endpoint: ":8081", Transport: "tcp", }, }, @@ -815,7 +800,7 @@ func TestServerHTTPTenancy(t *testing.T) { querySvc.spanReader.On("FindTraces", mock.Anything, mock.Anything).Return([]*model.Trace{mockTrace}, nil).Once() telset := initTelSet(zaptest.NewLogger(t), jtracer.NoOp(), healthcheck.New()) server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, - nil, serverOptions, tenancyMgr, telset) + nil, serverOptions, tenancyMgr, telset, false) require.NoError(t, err) require.NoError(t, server.Start(context.Background())) t.Cleanup(func() { diff --git a/cmd/query/main.go b/cmd/query/main.go index 209a654b425..4e6b0c352fe 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -110,7 +110,7 @@ func main() { ReportStatus: telemetery.HCAdapter(svc.HC()), } // TODO: replace componenttest.NewNopHost() with solution from https://github.com/jaegertracing/jaeger/issues/6049 - server, err := app.NewServer(context.Background(), componenttest.NewNopHost(), queryService, metricsQueryService, queryOpts, tm, telset) + server, err := app.NewServer(context.Background(), componenttest.NewNopHost(), queryService, metricsQueryService, queryOpts, tm, telset, true) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) }