From 82044f55ad709d500f7a4e1ab823cc3c6edeff70 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab Date: Sat, 5 Oct 2024 18:05:34 -0400 Subject: [PATCH] Revert "Remove CMux Server And Handling of Requests On Same Port" This reverts commit 61cbdd732317cc17f683b6ecf6624adbf834f78d. Signed-off-by: Mahad Zaryab --- cmd/query/app/server.go | 112 +++++++++++++++++++++++++++-------- cmd/query/app/server_test.go | 32 +++++++--- 2 files changed, 111 insertions(+), 33 deletions(-) diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index 762f1b3af6e..92f71ba3c2d 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -10,6 +10,7 @@ import ( "io" "net" "net/http" + "strings" "sync" "github.com/soheilhy/cmux" @@ -36,18 +37,19 @@ import ( "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" ) -var ErrSameGRPCandHTTPPort = errors.New("cannot use same port for grpc and http server") - // Server runs HTTP, Mux and a grpc server type Server struct { querySvc *querysvc.QueryService queryOptions *QueryOptions - grpcConn net.Listener - httpConn net.Listener - grpcServer *grpc.Server - httpServer *httpServer - bgFinished sync.WaitGroup + conn net.Listener + grpcConn net.Listener + httpConn net.Listener + cmuxServer cmux.CMux + grpcServer *grpc.Server + httpServer *httpServer + separatePorts bool + bgFinished sync.WaitGroup telemetery.Setting } @@ -71,8 +73,8 @@ func NewServer( } separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0" - if !separatePorts { - return nil, ErrSameGRPCandHTTPPort + if (options.HTTP.TLSSetting != nil || options.GRPC.TLSSetting != nil) && !separatePorts { + return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead") } grpcServer, err := createGRPCServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset) @@ -86,11 +88,12 @@ func NewServer( } return &Server{ - querySvc: querySvc, - queryOptions: options, - grpcServer: grpcServer, - httpServer: httpServer, - Setting: telset, + querySvc: querySvc, + queryOptions: options, + grpcServer: grpcServer, + httpServer: httpServer, + separatePorts: separatePorts, + Setting: telset, }, nil } @@ -233,32 +236,70 @@ func (hS httpServer) Close() error { } // initListener initialises listeners of the server -func (s *Server) initListener(ctx context.Context) error { - var err error - s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx) - if err != nil { - return err +func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) { + if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests + var err error + s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx) + if err != nil { + return nil, err + } + + s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx) + if err != nil { + return nil, err + } + s.Logger.Info( + "Query server started", + zap.String("http_addr", s.HTTPAddr()), + zap.String("grpc_addr", s.GRPCAddr()), + ) + return nil, nil } - s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx) + // old behavior using cmux + conn, err := net.Listen("tcp", s.queryOptions.HTTP.Endpoint) if err != nil { - return err + return nil, err } + + s.conn = conn + + var tcpPort int + if port, err := netutils.GetPort(s.conn.Addr()); err == nil { + tcpPort = port + } + s.Logger.Info( "Query server started", - zap.String("http_addr", s.HTTPAddr()), - zap.String("grpc_addr", s.GRPCAddr()), + zap.Int("port", tcpPort), + zap.String("addr", s.queryOptions.HTTP.Endpoint)) + + // cmux server acts as a reverse-proxy between HTTP and GRPC backends. + cmuxServer := cmux.New(s.conn) + + s.grpcConn = cmuxServer.MatchWithWriters( + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"), ) - return nil + s.httpConn = cmuxServer.Match(cmux.Any()) + return cmuxServer, nil } // Start http, GRPC and cmux servers concurrently func (s *Server) Start(ctx context.Context) error { - err := s.initListener(ctx) + cmuxServer, err := s.initListener(ctx) if err != nil { return fmt.Errorf("query server failed to initialize listener: %w", err) } + s.cmuxServer = cmuxServer + + var tcpPort int + if !s.separatePorts { + if port, err := netutils.GetPort(s.conn.Addr()); err == nil { + tcpPort = port + } + } var httpPort int if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil { @@ -303,6 +344,23 @@ func (s *Server) Start(ctx context.Context) error { s.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint)) }() + // Start cmux server concurrently. + if !s.separatePorts { + s.bgFinished.Add(1) + go func() { + defer s.bgFinished.Done() + s.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) + + err := cmuxServer.Serve() + // TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here. + if err != nil && !strings.Contains(err.Error(), "use of closed network connection") { + s.Logger.Error("Could not start multiplexed server", zap.Error(err)) + s.ReportStatus(componentstatus.NewFatalErrorEvent(err)) + return + } + s.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint)) + }() + } return nil } @@ -326,6 +384,10 @@ func (s *Server) Close() error { s.Logger.Info("Stopping gRPC server") s.grpcServer.Stop() + if !s.separatePorts { + s.Logger.Info("Closing CMux server") + s.cmuxServer.Close() + } s.bgFinished.Wait() s.Logger.Info("Server stopped") diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 32934dac64e..61bf047c0ba 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -657,7 +657,7 @@ func TestServerSinglePort(t *testing.T) { hostPort := ports.PortToHostPort(ports.QueryHTTP) querySvc := makeQuerySvc() telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) - _, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil, + server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil, &QueryOptions{ BearerTokenPropagation: true, HTTP: confighttp.ServerConfig{ @@ -672,7 +672,24 @@ func TestServerSinglePort(t *testing.T) { }, tenancy.NewManager(&tenancy.Options{}), telset) - require.ErrorIs(t, err, ErrSameGRPCandHTTPPort) + require.NoError(t, err) + require.NoError(t, server.Start(context.Background())) + t.Cleanup(func() { + require.NoError(t, server.Close()) + }) + + client := newGRPCClient(t, hostPort) + t.Cleanup(func() { + require.NoError(t, client.conn.Close()) + }) + + // using generous timeout since grpc.NewClient no longer does a handshake. + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{}) + require.NoError(t, err) + assert.Equal(t, querySvc.expectedServices, res.Services) } func TestServerGracefulExit(t *testing.T) { @@ -682,19 +699,18 @@ func TestServerGracefulExit(t *testing.T) { assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.") flagsSvc.Logger = zap.New(zapCore) - httpHostPort := ports.PortToHostPort(ports.QueryHTTP) - grpcHostPort := ports.PortToHostPort(ports.QueryGRPC) + hostPort := ports.PortToHostPort(ports.QueryAdminHTTP) querySvc := makeQuerySvc() telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC()) server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil, &QueryOptions{ HTTP: confighttp.ServerConfig{ - Endpoint: httpHostPort, + Endpoint: hostPort, }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: grpcHostPort, + Endpoint: hostPort, Transport: "tcp", }, }, @@ -705,7 +721,7 @@ func TestServerGracefulExit(t *testing.T) { // Wait for servers to come up before we can call .Close() { - client := newGRPCClient(t, grpcHostPort) + client := newGRPCClient(t, hostPort) t.Cleanup(func() { require.NoError(t, client.conn.Close()) }) @@ -789,7 +805,7 @@ func TestServerHTTPTenancy(t *testing.T) { }, GRPC: configgrpc.ServerConfig{ NetAddr: confignet.AddrConfig{ - Endpoint: ":8081", + Endpoint: ":8080", Transport: "tcp", }, },