Skip to content

Commit

Permalink
Replace Jaeger SDK with OTEL SDK + OT Bridge (#4574)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
* Part of #3381 
* This PR adds OTEL Provider to jtracer pkg

## Short description of the changes
- Upgrades jaeger tracing to support `otel tracer`

---------

Signed-off-by: Afzal Ansari <[email protected]>
Signed-off-by: Afzal <[email protected]>
Co-authored-by: Afzal <[email protected]>
  • Loading branch information
afzal442 and afzalbin64 authored Jul 20, 2023
1 parent 984a512 commit 2c1bf07
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 129 deletions.
14 changes: 10 additions & 4 deletions cmd/all-in-one/all_in_one_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"io"
"net/http"
"strings"
"testing"
"time"

Expand All @@ -46,12 +47,13 @@ const (
agentURL = "http://" + agentHostPort

getServicesURL = queryURL + "/api/services"
getTraceURL = queryURL + "/api/traces?service=jaeger-query&tag=jaeger-debug-id:debug"
getSamplingStrategyURL = agentURL + "/sampling?service=whatever"

getServicesAPIV3URL = queryURL + "/api/v3/services"
)

var getTraceURL = queryURL + "/api/traces/"

var httpClient = &http.Client{
Timeout: time.Second,
}
Expand All @@ -70,11 +72,15 @@ func TestAllInOne(t *testing.T) {
func createTrace(t *testing.T) {
req, err := http.NewRequest(http.MethodGet, getServicesURL, nil)
require.NoError(t, err)
req.Header.Add("jaeger-debug-id", "debug")

resp, err := httpClient.Do(req)
require.NoError(t, err)
resp.Body.Close()
defer resp.Body.Close()
traceResponse := resp.Header.Get("traceresponse")
parts := strings.Split(traceResponse, "-")
require.Len(t, parts, 4) // [version] [trace-id] [child-id] [trace-flags]
traceID := parts[1]
getTraceURL += traceID
}

type response struct {
Expand Down Expand Up @@ -157,5 +163,5 @@ func getServicesAPIV3(t *testing.T) {
jsonpb := runtime.JSONPb{}
err = jsonpb.Unmarshal(body, &servicesResponse)
require.NoError(t, err)
assert.Equal(t, []string{"jaeger-query"}, servicesResponse.GetServices())
assert.Equal(t, []string{"jaeger-all-in-one"}, servicesResponse.GetServices())
}
50 changes: 14 additions & 36 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
package main

import (
"context"
"fmt"
"io"
"log"
"os"

"github.com/opentracing/opentracing-go"
"github.com/spf13/cobra"
"github.com/spf13/viper"
jaegerClientConfig "github.com/uber/jaeger-client-go/config"
jaegerClientZapLog "github.com/uber/jaeger-client-go/log/zap"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand All @@ -43,7 +41,6 @@ import (
"github.com/jaegertracing/jaeger/cmd/status"
"github.com/jaegertracing/jaeger/internal/metrics/expvar"
"github.com/jaegertracing/jaeger/internal/metrics/fork"
"github.com/jaegertracing/jaeger/internal/metrics/jlibadapter"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
Expand Down Expand Up @@ -103,7 +100,10 @@ by default uses only in-memory database.`,
svc.MetricsFactory.Namespace(metrics.NSOptions{Name: "jaeger"}))
version.NewInfoMetrics(metricsFactory)

tracerCloser := initTracer(svc)
tracer, err := jtracer.New("jaeger-all-in-one")
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(metricsFactory, logger); err != nil {
Expand Down Expand Up @@ -197,7 +197,7 @@ by default uses only in-memory database.`,
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
metricsFactory, tm,
metricsFactory, tm, tracer,
)

svc.RunAndThen(func() {
Expand All @@ -213,7 +213,9 @@ by default uses only in-memory database.`,
if err := storageFactory.Close(); err != nil {
logger.Error("Failed to close storage factory", zap.Error(err))
}
_ = tracerCloser.Close()
if err := tracer.Close(context.Background()); err != nil {
logger.Error("Error shutting down tracer provider", zap.Error(err))
}
})
return nil
},
Expand Down Expand Up @@ -271,48 +273,24 @@ func startQuery(
metricsQueryService querysvc.MetricsQueryService,
baseFactory metrics.Factory,
tm *tenancy.Manager,
jt *jtracer.JTracer,
) *queryApp.Server {
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"}))
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
jtracer := jtracer.OT(opentracing.GlobalTracer())
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jtracer)
server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, jt)
if err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
svc.Logger.Fatal("Could not create jaeger-query", zap.Error(err))
}
go func() {
for s := range server.HealthCheckStatus() {
svc.SetHealthCheckStatus(s)
}
}()
if err := server.Start(); err != nil {
svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err))
svc.Logger.Fatal("Could not start jaeger-query", zap.Error(err))
}
return server
}

func initTracer(svc *flags.Service) io.Closer {
logger := svc.Logger
traceCfg := &jaegerClientConfig.Configuration{
ServiceName: "jaeger-query",
Sampler: &jaegerClientConfig.SamplerConfig{
Type: "const",
Param: 1.0,
},
RPCMetrics: true,
}
traceCfg, err := traceCfg.FromEnv()
if err != nil {
logger.Fatal("Failed to read tracer configuration", zap.Error(err))
}
tracer, closer, err := traceCfg.NewTracer(
jaegerClientConfig.Metrics(jlibadapter.NewAdapter(svc.MetricsFactory)),
jaegerClientConfig.Logger(jaegerClientZapLog.NewLogger(logger)),
)
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}
opentracing.SetGlobalTracer(tracer)
return closer
return server
}

func createMetricsQueryService(
Expand Down
6 changes: 3 additions & 3 deletions cmd/query/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ type GRPCHandler struct {
queryService *querysvc.QueryService
metricsQueryService querysvc.MetricsQueryService
logger *zap.Logger
tracer jtracer.JTracer
tracer *jtracer.JTracer
nowFn func() time.Time
}

// GRPCHandlerOptions contains optional members of GRPCHandler.
type GRPCHandlerOptions struct {
Logger *zap.Logger
Tracer jtracer.JTracer
Tracer *jtracer.JTracer
NowFn func() time.Time
}

Expand All @@ -73,7 +73,7 @@ func NewGRPCHandler(queryService *querysvc.QueryService,
options.Logger = zap.NewNop()
}

if options.Tracer.OT == nil {
if options.Tracer == nil {
options.Tracer = jtracer.NoOp()
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ type grpcClient struct {
conn *grpc.ClientConn
}

func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQueryService, logger *zap.Logger, tracer jtracer.JTracer, tenancyMgr *tenancy.Manager) (*grpc.Server, net.Addr) {
func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQueryService, logger *zap.Logger, tracer *jtracer.JTracer, tenancyMgr *tenancy.Manager) (*grpc.Server, net.Addr) {
lis, _ := net.Listen("tcp", ":0")
var grpcOpts []grpc.ServerOption
if tenancyMgr.Enabled {
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/handler_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (handlerOptions) QueryLookbackDuration(queryLookbackDuration time.Duration)
}

// Tracer creates a HandlerOption that initializes OpenTracing tracer
func (handlerOptions) Tracer(tracer jtracer.JTracer) HandlerOption {
func (handlerOptions) Tracer(tracer *jtracer.JTracer) HandlerOption {
return func(apiHandler *APIHandler) {
apiHandler.tracer = tracer
}
Expand Down
32 changes: 23 additions & 9 deletions cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (

"github.com/gogo/protobuf/proto"
"github.com/gorilla/mux"
"github.com/opentracing-contrib/go-stdlib/nethttp"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
Expand Down Expand Up @@ -88,7 +89,7 @@ type APIHandler struct {
basePath string
apiPrefix string
logger *zap.Logger
tracer jtracer.JTracer
tracer *jtracer.JTracer
}

// NewAPIHandler returns an APIHandler
Expand All @@ -111,7 +112,7 @@ func NewAPIHandler(queryService *querysvc.QueryService, tm *tenancy.Manager, opt
if aH.logger == nil {
aH.logger = zap.NewNop()
}
if aH.tracer.OT == nil {
if aH.tracer == nil {
aH.tracer = jtracer.NoOp()
}
return aH
Expand Down Expand Up @@ -146,12 +147,10 @@ func (aH *APIHandler) handleFunc(
if aH.tenancyMgr.Enabled {
handler = tenancy.ExtractTenantHTTPHandler(aH.tenancyMgr, handler)
}
traceMiddleware := nethttp.Middleware(
aH.tracer.OT,
handler,
nethttp.OperationNameFunc(func(r *http.Request) string {
return route
}))
traceMiddleware := otelhttp.NewHandler(
otelhttp.WithRouteTag(route, traceResponseHandler(handler)),
route,
otelhttp.WithTracerProvider(aH.tracer.OTEL))
return router.HandleFunc(route, traceMiddleware.ServeHTTP)
}

Expand Down Expand Up @@ -523,3 +522,18 @@ func (aH *APIHandler) writeJSON(w http.ResponseWriter, r *http.Request, response
aH.handleError(w, fmt.Errorf("failed writing HTTP response: %w", err), http.StatusInternalServerError)
}
}

// Returns a handler that generates a traceresponse header.
// https://github.com/w3c/trace-context/blob/main/spec/21-http_response_header_format.md
func traceResponseHandler(handler http.Handler) http.Handler {
// We use the standard TraceContext propagator, since the formats are identical.
// But the propagator uses "traceparent" header name, so we inject it into a map
// `carrier` and then use the result to set the "tracereponse" header.
var prop propagation.TraceContext
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
carrier := make(map[string]string)
prop.Inject(r.Context(), propagation.MapCarrier(carrier))
w.Header().Add("traceresponse", carrier["traceparent"])
handler.ServeHTTP(w, r)
})
}
22 changes: 13 additions & 9 deletions cmd/query/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import (
testHttp "github.com/stretchr/testify/http"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/sdk/trace/tracetest"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand Down Expand Up @@ -303,12 +304,15 @@ func TestGetTrace(t *testing.T) {
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.suffix, func(t *testing.T) {
reporter := jaeger.NewInMemoryReporter()
jaegerTracer, jaegerCloser := jaeger.NewTracer("test", jaeger.NewConstSampler(true), reporter)
jTracer := jtracer.OT(jaegerTracer)
defer jaegerCloser.Close()

ts := initializeTestServer(HandlerOptions.Tracer(jTracer))
exporter := tracetest.NewInMemoryExporter()
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSyncer(exporter),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
)
jTracer := jtracer.JTracer{OTEL: tracerProvider}
defer tracerProvider.Shutdown(context.Background())

ts := initializeTestServer(HandlerOptions.Tracer(&jTracer))
defer ts.server.Close()

ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), model.NewTraceID(0, 0x123456abc)).
Expand All @@ -319,8 +323,8 @@ func TestGetTrace(t *testing.T) {
assert.NoError(t, err)
assert.Len(t, response.Errors, 0)

assert.Len(t, reporter.GetSpans(), 1, "HTTP request was traced and span reported")
assert.Equal(t, "/api/traces/{traceID}", reporter.GetSpans()[0].(*jaeger.Span).OperationName())
assert.Len(t, exporter.GetSpans(), 1, "HTTP request was traced and span reported")
assert.Equal(t, "/api/traces/{traceID}", exporter.GetSpans()[0].Name)

traces := extractTraces(t, &response)
assert.Len(t, traces[0].Spans, 2)
Expand Down
8 changes: 4 additions & 4 deletions cmd/query/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Server struct {
querySvc *querysvc.QueryService
queryOptions *QueryOptions

tracer jtracer.JTracer // TODO make part of flags.Service
tracer *jtracer.JTracer // TODO make part of flags.Service

conn net.Listener
grpcConn net.Listener
Expand All @@ -65,7 +65,7 @@ type Server struct {
}

// NewServer creates and initializes Server
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer jtracer.JTracer) (*Server, error) {
func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer) (*Server, error) {
_, httpPort, err := net.SplitHostPort(options.HTTPHostPort)
if err != nil {
return nil, err
Expand Down Expand Up @@ -107,7 +107,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status {
return s.unavailableChannel
}

func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer jtracer.JTracer) (*grpc.Server, error) {
func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.Manager, logger *zap.Logger, tracer *jtracer.JTracer) (*grpc.Server, error) {
var grpcOpts []grpc.ServerOption

if options.TLSGRPC.Enabled {
Expand Down Expand Up @@ -148,7 +148,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.
return server, nil
}

func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tm *tenancy.Manager, tracer jtracer.JTracer, logger *zap.Logger) (*http.Server, context.CancelFunc, error) {
func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tm *tenancy.Manager, tracer *jtracer.JTracer, logger *zap.Logger) (*http.Server, context.CancelFunc, error) {
apiHandlerOptions := []HandlerOption{
HandlerOptions.Logger(logger),
HandlerOptions.Tracer(tracer),
Expand Down
Loading

0 comments on commit 2c1bf07

Please sign in to comment.