Skip to content

Commit

Permalink
Split Changes Into Two Separate Flows
Browse files Browse the repository at this point in the history
Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 committed Oct 5, 2024
1 parent 7f949dd commit 8602c9e
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 51 deletions.
3 changes: 2 additions & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,9 @@ func startQuery(
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, telset.Metrics)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)

allowSamePort := true
// TODO: replace componenttest.NewNopHost() with solution from https://github.com/jaegertracing/jaeger/issues/6049
server, err := queryApp.NewServer(context.Background(), componenttest.NewNopHost(), qs, metricsQueryService, qOpts, tm, telset)
server, err := queryApp.NewServer(context.Background(), componenttest.NewNopHost(), qs, metricsQueryService, qOpts, tm, telset, allowSamePort)
if err != nil {
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
Expand Down
1 change: 1 addition & 0 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
&s.config.QueryOptions,
tm,
telset,
false,
)
if err != nil {
return fmt.Errorf("could not create jaeger-query: %w", err)
Expand Down
183 changes: 169 additions & 14 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"net/http"
"strings"
"sync"
"time"

"github.com/gorilla/handlers"
"github.com/soheilhy/cmux"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
Expand All @@ -21,7 +23,9 @@ import (
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
Expand All @@ -31,12 +35,15 @@ import (
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/pkg/bearertoken"
"github.com/jaegertracing/jaeger/pkg/netutils"
"github.com/jaegertracing/jaeger/pkg/recoveryhandler"
"github.com/jaegertracing/jaeger/pkg/telemetery"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
)

var ErrSameGRPCandHTTPPort = errors.New("cannot use same port for grpc and http server")

// Server runs HTTP, Mux and a grpc server
type Server struct {
querySvc *querysvc.QueryService
Expand All @@ -62,6 +69,7 @@ func NewServer(
options *QueryOptions,
tm *tenancy.Manager,
telset telemetery.Setting,
allowSamePort bool,
) (*Server, error) {
_, httpPort, err := net.SplitHostPort(options.HTTP.Endpoint)
if err != nil {
Expand All @@ -72,19 +80,37 @@ func NewServer(
return nil, fmt.Errorf("invalid gRPC server host:port: %w", err)
}
separatePorts := grpcPort != httpPort || grpcPort == "0" || httpPort == "0"
if !separatePorts && !allowSamePort {
return nil, ErrSameGRPCandHTTPPort
}

if (options.HTTP.TLSSetting != nil || options.GRPC.TLSSetting != nil) && !separatePorts {
return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead")
}

grpcServer, err := createGRPCServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
}
var grpcServer *grpc.Server
var httpServer *httpServer
if separatePorts {
grpcServer, err = createGRPCServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
}

httpServer, err := createHTTPServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
httpServer, err = createHTTPServer(ctx, host, querySvc, metricsQuerySvc, options, tm, telset)
if err != nil {
return nil, err
}
} else {
telset.Logger.Error("using the same port for gRPC and HTTP is deprecated; please use dedicated host ports intead")
grpcServer, err = createGRPCServerDeprecated(querySvc, metricsQuerySvc, options, tm, telset)

Check failure on line 105 in cmd/query/app/server.go

View workflow job for this annotation

GitHub Actions / lint

Function `createGRPCServerDeprecated->NewGuardingStreamInterceptor->NewGuardingStreamInterceptor$1` should pass the context parameter (contextcheck)
if err != nil {
return nil, err
}

httpServer, err = createHTTPServerDeprecated(querySvc, metricsQuerySvc, options, tm, telset)

Check failure on line 110 in cmd/query/app/server.go

View workflow job for this annotation

GitHub Actions / lint

Function `createHTTPServerDeprecated` should pass the context parameter (contextcheck)
if err != nil {
return nil, err
}
}

return &Server{
Expand Down Expand Up @@ -228,6 +254,106 @@ func createHTTPServer(
return httpServer, nil
}

func createGRPCServerDeprecated(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, telset telemetery.Setting) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.GRPC.TLSSetting != nil {
tlsCfg, err := options.GRPC.TLSSetting.LoadTLSConfig(context.Background())
if err != nil {
return nil, err
}

creds := credentials.NewTLS(tlsCfg)

grpcOpts = append(grpcOpts, grpc.Creds(creds))
}
if tm.Enabled {
grpcOpts = append(grpcOpts,
grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)),
grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)),
)
}

server := grpc.NewServer(grpcOpts...)
reflection.Register(server)

handler := NewGRPCHandler(querySvc, metricsQuerySvc, GRPCHandlerOptions{
Logger: telset.Logger,
})
healthServer := health.NewServer()

api_v2.RegisterQueryServiceServer(server, handler)
metrics.RegisterMetricsQueryServiceServer(server, handler)
api_v3.RegisterQueryServiceServer(server, &apiv3.Handler{QueryService: querySvc})

healthServer.SetServingStatus("jaeger.api_v2.QueryService", grpc_health_v1.HealthCheckResponse_SERVING)
healthServer.SetServingStatus("jaeger.api_v2.metrics.MetricsQueryService", grpc_health_v1.HealthCheckResponse_SERVING)
healthServer.SetServingStatus("jaeger.api_v3.QueryService", grpc_health_v1.HealthCheckResponse_SERVING)

grpc_health_v1.RegisterHealthServer(server, healthServer)
return server, nil
}

func createHTTPServerDeprecated(
querySvc *querysvc.QueryService,
metricsQuerySvc querysvc.MetricsQueryService,
queryOpts *QueryOptions,
tm *tenancy.Manager,
telset telemetery.Setting,
) (*httpServer, error) {
apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(telset.Logger),
HandlerOptions.Tracer(telset.TracerProvider),
HandlerOptions.MetricsQueryService(metricsQuerySvc),
}

apiHandler := NewAPIHandler(
querySvc,
tm,
apiHandlerOptions...)
r := NewRouter()
if queryOpts.BasePath != "/" {
r = r.PathPrefix(queryOpts.BasePath).Subrouter()
}

(&apiv3.HTTPGateway{
QueryService: querySvc,
TenancyMgr: tm,
Logger: telset.Logger,
Tracer: telset.TracerProvider,
}).RegisterRoutes(r)

apiHandler.RegisterRoutes(r)
var handler http.Handler = r
handler = responseHeadersHandler(handler, queryOpts.HTTP.ResponseHeaders)
if queryOpts.BearerTokenPropagation {
handler = bearertoken.PropagationHandler(telset.Logger, handler)
}
handler = handlers.CompressHandler(handler)
recoveryHandler := recoveryhandler.NewRecoveryHandler(telset.Logger, true)

errorLog, _ := zap.NewStdLogAt(telset.Logger, zapcore.ErrorLevel)
server := &httpServer{
Server: &http.Server{
Handler: recoveryHandler(handler),
ErrorLog: errorLog,
ReadHeaderTimeout: 2 * time.Second,
},
}

if queryOpts.HTTP.TLSSetting != nil {
tlsCfg, err := queryOpts.HTTP.TLSSetting.LoadTLSConfig(context.Background()) // This checks if the certificates are correctly provided
if err != nil {
return nil, err
}
server.TLSConfig = tlsCfg
}

server.staticHandlerCloser = RegisterStaticHandler(r, telset.Logger, queryOpts, querySvc.GetCapabilities())

return server, nil
}

func (hS httpServer) Close() error {
var errs []error
errs = append(errs, hS.Server.Close())
Expand All @@ -236,15 +362,36 @@ func (hS httpServer) Close() error {
}

// initListener initialises listeners of the server
func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {
func (s *Server) initListener(ctx context.Context) error {

Check failure on line 365 in cmd/query/app/server.go

View workflow job for this annotation

GitHub Actions / lint

empty-lines: extra empty line at the end of a block (revive)
var err error
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
if err != nil {
return err
}

s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
if err != nil {
return err
}
s.Logger.Info(
"Query server started",
zap.String("http_addr", s.HTTPAddr()),
zap.String("grpc_addr", s.GRPCAddr()),
)
return nil

}

// initListener initialises listeners of the server
func (s *Server) initListenerDeprecated() (cmux.CMux, error) {
if s.separatePorts { // use separate ports and listeners each for gRPC and HTTP requests
var err error
s.grpcConn, err = s.queryOptions.GRPC.NetAddr.Listen(ctx)
s.grpcConn, err = net.Listen("tcp", s.queryOptions.GRPC.NetAddr.Endpoint)
if err != nil {
return nil, err
}

s.httpConn, err = s.queryOptions.HTTP.ToListener(ctx)
s.httpConn, err = net.Listen("tcp", s.queryOptions.HTTP.Endpoint)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -288,11 +435,19 @@ func (s *Server) initListener(ctx context.Context) (cmux.CMux, error) {

// Start http, GRPC and cmux servers concurrently
func (s *Server) Start(ctx context.Context) error {
cmuxServer, err := s.initListener(ctx)
if err != nil {
return fmt.Errorf("query server failed to initialize listener: %w", err)
var cmuxServer cmux.CMux
if !s.separatePorts {
cmuxServer, err := s.initListenerDeprecated()
if err != nil {
return fmt.Errorf("query server failed to initialize listener: %w", err)
}
s.cmuxServer = cmuxServer
} else {
err := s.initListener(ctx)
if err != nil {
return fmt.Errorf("query server failed to initialize listener: %w", err)
}
}
s.cmuxServer = cmuxServer

var tcpPort int
if !s.separatePorts {
Expand Down
Loading

0 comments on commit 8602c9e

Please sign in to comment.