Skip to content

Commit

Permalink
Remove CMux Server And Handling of Requests On Same Port
Browse files Browse the repository at this point in the history
Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 committed Oct 5, 2024
1 parent 168a374 commit b640f72
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 111 deletions.
112 changes: 25 additions & 87 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"io"
"net"
"net/http"
"strings"
"sync"

"github.com/soheilhy/cmux"
Expand All @@ -37,19 +36,18 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
)

var ErrSameGRPCandHTTPPort = errors.New("cannot use same port for grpc and http server")

// Server runs HTTP, Mux and a grpc server
type Server struct {
querySvc *querysvc.QueryService
queryOptions *QueryOptions

conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *httpServer
separatePorts bool
bgFinished sync.WaitGroup
grpcConn net.Listener
httpConn net.Listener
grpcServer *grpc.Server
httpServer *httpServer
bgFinished sync.WaitGroup
telemetery.Setting
}

Expand All @@ -73,8 +71,8 @@ func NewServer(
}
separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0"

if (options.HTTP.TLSSetting != nil || options.GRPC.TLSSetting != nil) && !separatePorts {
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
if !separatePorts {
return nil, ErrSameGRPCandHTTPPort
}

grpcServer, err := createGRPCServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset)
Expand All @@ -88,12 +86,11 @@ func NewServer(
}

return &Server{
querySvc: querySvc,
queryOptions: options,
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: separatePorts,
Setting: telset,
querySvc: querySvc,
queryOptions: options,
grpcServer: grpcServer,
httpServer: httpServer,
Setting: telset,
}, nil
}

Expand Down Expand Up @@ -236,70 +233,32 @@ func (hS httpServer) Close() error {
}

// initListener initialises listeners of the server
func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
var err error
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
if err != nil {
return nil, err
}

s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
if err != nil {
return nil, err
}
s.Logger.Info(
"Query server started",
zap.String("http_addr", s.HTTPAddr()),
zap.String("grpc_addr", s.GRPCAddr()),
)
return nil, nil
}

// old behavior using cmux
conn, err := net.Listen("tcp", s.queryOptions.HTTP.Endpoint)
func (s *Server) initListener(ctx context.Context) error {
var err error
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
if err != nil {
return nil, err
return err
}

s.conn = conn

var tcpPort int
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
tcpPort = port
s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
if err != nil {
return err
}

s.Logger.Info(
"Query server started",
zap.Int("port", tcpPort),
zap.String("addr", s.queryOptions.HTTP.Endpoint))

// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
cmuxServer := cmux.New(s.conn)

s.grpcConn = cmuxServer.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
zap.String("http_addr", s.HTTPAddr()),
zap.String("grpc_addr", s.GRPCAddr()),
)
s.httpConn = cmuxServer.Match(cmux.Any())
return nil

return cmuxServer, nil
}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start(ctx context.Context) error {
cmuxServer, err := s.initListener(ctx)
err := s.initListener(ctx)
if err != nil {
return fmt.Errorf("query server failed to initialize listener: %w", err)
}
s.cmuxServer = cmuxServer

var tcpPort int
if !s.separatePorts {
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
tcpPort = port
}
}

var httpPort int
if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil {
Expand Down Expand Up @@ -344,23 +303,6 @@ func (s *Server) Start(ctx context.Context) error {
s.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint))
}()

// Start cmux server concurrently.
if !s.separatePorts {
s.bgFinished.Add(1)
go func() {
defer s.bgFinished.Done()
s.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))

err := cmuxServer.Serve()
// TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here.
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.Logger.Error("Could not start multiplexed server", zap.Error(err))
s.ReportStatus(componentstatus.NewFatalErrorEvent(err))
return
}
s.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
}()
}
return nil
}

Expand All @@ -384,10 +326,6 @@ func (s *Server) Close() error {
s.Logger.Info("Stopping gRPC server")
s.grpcServer.Stop()

if !s.separatePorts {
s.Logger.Info("Closing CMux server")
s.cmuxServer.Close()
}
s.bgFinished.Wait()

s.Logger.Info("Server stopped")
Expand Down
32 changes: 8 additions & 24 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func TestServerSinglePort(t *testing.T) {
hostPort := ports.PortToHostPort(ports.QueryHTTP)
querySvc := makeQuerySvc()
telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC())
server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
_, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
&QueryOptions{
BearerTokenPropagation: true,
HTTP: confighttp.ServerConfig{
Expand All @@ -672,24 +672,7 @@ func TestServerSinglePort(t *testing.T) {
},
tenancy.NewManager(&tenancy.Options{}),
telset)
require.NoError(t, err)
require.NoError(t, server.Start(context.Background()))
t.Cleanup(func() {
require.NoError(t, server.Close())
})

client := newGRPCClient(t, hostPort)
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})

// using generous timeout since grpc.NewClient no longer does a handshake.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
require.NoError(t, err)
assert.Equal(t, querySvc.expectedServices, res.Services)
require.ErrorIs(t, err, ErrSameGRPCandHTTPPort)
}

func TestServerGracefulExit(t *testing.T) {
Expand All @@ -699,18 +682,19 @@ func TestServerGracefulExit(t *testing.T) {
assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.")

flagsSvc.Logger = zap.New(zapCore)
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)
httpHostPort := ports.PortToHostPort(ports.QueryHTTP)
grpcHostPort := ports.PortToHostPort(ports.QueryGRPC)

querySvc := makeQuerySvc()
telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC())
server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
&QueryOptions{
HTTP: confighttp.ServerConfig{
Endpoint: hostPort,
Endpoint: httpHostPort,
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: hostPort,
Endpoint: grpcHostPort,
Transport: "tcp",
},
},
Expand All @@ -721,7 +705,7 @@ func TestServerGracefulExit(t *testing.T) {

// Wait for servers to come up before we can call .Close()
{
client := newGRPCClient(t, hostPort)
client := newGRPCClient(t, grpcHostPort)
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})
Expand Down Expand Up @@ -805,7 +789,7 @@ func TestServerHTTPTenancy(t *testing.T) {
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ":8080",
Endpoint: ":8081",
Transport: "tcp",
},
},
Expand Down

0 comments on commit b640f72

Please sign in to comment.