Skip to content

Commit

Permalink
Revert "Remove CMux Server And Handling of Requests On Same Port"
Browse files Browse the repository at this point in the history
This reverts commit 61cbdd7.

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 committed Oct 5, 2024
1 parent b640f72 commit 82044f5
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 33 deletions.
112 changes: 87 additions & 25 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net"
"net/http"
"strings"
"sync"

"github.com/soheilhy/cmux"
Expand All @@ -36,18 +37,19 @@ import (
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
)

var ErrSameGRPCandHTTPPort = errors.New("cannot use same port for grpc and http server")

// Server runs HTTP, Mux and a grpc server
type Server struct {
querySvc *querysvc.QueryService
queryOptions *QueryOptions

grpcConn net.Listener
httpConn net.Listener
grpcServer *grpc.Server
httpServer *httpServer
bgFinished sync.WaitGroup
conn net.Listener
grpcConn net.Listener
httpConn net.Listener
cmuxServer cmux.CMux
grpcServer *grpc.Server
httpServer *httpServer
separatePorts bool
bgFinished sync.WaitGroup
telemetery.Setting
}

Expand All @@ -71,8 +73,8 @@ func NewServer(
}
separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0"

if !separatePorts {
return nil, ErrSameGRPCandHTTPPort
if (options.HTTP.TLSSetting != nil || options.GRPC.TLSSetting != nil) && !separatePorts {
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
}

grpcServer, err := createGRPCServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset)
Expand All @@ -86,11 +88,12 @@ func NewServer(
}

return &Server{
querySvc: querySvc,
queryOptions: options,
grpcServer: grpcServer,
httpServer: httpServer,
Setting: telset,
querySvc: querySvc,
queryOptions: options,
grpcServer: grpcServer,
httpServer: httpServer,
separatePorts: separatePorts,
Setting: telset,
}, nil
}

Expand Down Expand Up @@ -233,32 +236,70 @@ func (hS httpServer) Close() error {
}

// initListener initialises listeners of the server
func (s *Server) initListener(ctx context.Context) error {
var err error
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
if err != nil {
return err
func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
var err error
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
if err != nil {
return nil, err
}

s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
if err != nil {
return nil, err
}
s.Logger.Info(
"Query server started",
zap.String("http_addr", s.HTTPAddr()),
zap.String("grpc_addr", s.GRPCAddr()),
)
return nil, nil
}

s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
// old behavior using cmux
conn, err := net.Listen("tcp", s.queryOptions.HTTP.Endpoint)
if err != nil {
return err
return nil, err
}

s.conn = conn

var tcpPort int
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
tcpPort = port
}

s.Logger.Info(
"Query server started",
zap.String("http_addr", s.HTTPAddr()),
zap.String("grpc_addr", s.GRPCAddr()),
zap.Int("port", tcpPort),
zap.String("addr", s.queryOptions.HTTP.Endpoint))

// cmux server acts as a reverse-proxy between HTTP and GRPC backends.
cmuxServer := cmux.New(s.conn)

s.grpcConn = cmuxServer.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
)
return nil
s.httpConn = cmuxServer.Match(cmux.Any())

return cmuxServer, nil
}

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start(ctx context.Context) error {
err := s.initListener(ctx)
cmuxServer, err := s.initListener(ctx)
if err != nil {
return fmt.Errorf("query server failed to initialize listener: %w", err)
}
s.cmuxServer = cmuxServer

var tcpPort int
if !s.separatePorts {
if port, err := netutils.GetPort(s.conn.Addr()); err == nil {
tcpPort = port
}
}

var httpPort int
if port, err := netutils.GetPort(s.httpConn.Addr()); err == nil {
Expand Down Expand Up @@ -303,6 +344,23 @@ func (s *Server) Start(ctx context.Context) error {
s.Logger.Info("GRPC server stopped", zap.Int("port", grpcPort), zap.String("addr", s.queryOptions.GRPC.NetAddr.Endpoint))
}()

// Start cmux server concurrently.
if !s.separatePorts {
s.bgFinished.Add(1)
go func() {
defer s.bgFinished.Done()
s.Logger.Info("Starting CMUX server", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))

err := cmuxServer.Serve()
// TODO: find a way to avoid string comparison. Even though cmux has ErrServerClosed, it's not returned here.
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
s.Logger.Error("Could not start multiplexed server", zap.Error(err))
s.ReportStatus(componentstatus.NewFatalErrorEvent(err))
return
}
s.Logger.Info("CMUX server stopped", zap.Int("port", tcpPort), zap.String("addr", s.queryOptions.HTTP.Endpoint))
}()
}
return nil
}

Expand All @@ -326,6 +384,10 @@ func (s *Server) Close() error {
s.Logger.Info("Stopping gRPC server")
s.grpcServer.Stop()

if !s.separatePorts {
s.Logger.Info("Closing CMux server")
s.cmuxServer.Close()
}
s.bgFinished.Wait()

s.Logger.Info("Server stopped")
Expand Down
32 changes: 24 additions & 8 deletions cmd/query/app/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ func TestServerSinglePort(t *testing.T) {
hostPort := ports.PortToHostPort(ports.QueryHTTP)
querySvc := makeQuerySvc()
telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC())
_, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
&QueryOptions{
BearerTokenPropagation: true,
HTTP: confighttp.ServerConfig{
Expand All @@ -672,7 +672,24 @@ func TestServerSinglePort(t *testing.T) {
},
tenancy.NewManager(&tenancy.Options{}),
telset)
require.ErrorIs(t, err, ErrSameGRPCandHTTPPort)
require.NoError(t, err)
require.NoError(t, server.Start(context.Background()))
t.Cleanup(func() {
require.NoError(t, server.Close())
})

client := newGRPCClient(t, hostPort)
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})

// using generous timeout since grpc.NewClient no longer does a handshake.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

res, err := client.GetServices(ctx, &api_v2.GetServicesRequest{})
require.NoError(t, err)
assert.Equal(t, querySvc.expectedServices, res.Services)
}

func TestServerGracefulExit(t *testing.T) {
Expand All @@ -682,19 +699,18 @@ func TestServerGracefulExit(t *testing.T) {
assert.Equal(t, 0, logs.Len(), "Expected initial ObservedLogs to have zero length.")

flagsSvc.Logger = zap.New(zapCore)
httpHostPort := ports.PortToHostPort(ports.QueryHTTP)
grpcHostPort := ports.PortToHostPort(ports.QueryGRPC)
hostPort := ports.PortToHostPort(ports.QueryAdminHTTP)

querySvc := makeQuerySvc()
telset := initTelSet(flagsSvc.Logger, jtracer.NoOp(), flagsSvc.HC())
server, err := NewServer(context.Background(), componenttest.NewNopHost(), querySvc.qs, nil,
&QueryOptions{
HTTP: confighttp.ServerConfig{
Endpoint: httpHostPort,
Endpoint: hostPort,
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: grpcHostPort,
Endpoint: hostPort,
Transport: "tcp",
},
},
Expand All @@ -705,7 +721,7 @@ func TestServerGracefulExit(t *testing.T) {

// Wait for servers to come up before we can call .Close()
{
client := newGRPCClient(t, grpcHostPort)
client := newGRPCClient(t, hostPort)
t.Cleanup(func() {
require.NoError(t, client.conn.Close())
})
Expand Down Expand Up @@ -789,7 +805,7 @@ func TestServerHTTPTenancy(t *testing.T) {
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ":8081",
Endpoint: ":8080",
Transport: "tcp",
},
},
Expand Down

0 comments on commit 82044f5

Please sign in to comment.