Skip to content

Commit

Permalink
Replace es-spanstore tracing instrumentation with OpenTelemetry (#4596)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
* Part of #3381 
* This PR adds OTEL Provider to es-spanstore component

## Short description of the changes
- Replaces OT tracer with OTEL tracer to support jtracer pkg

---------

Signed-off-by: Afzal Ansari <[email protected]>
Signed-off-by: Afzal <[email protected]>
Co-authored-by: Afzal <[email protected]>
  • Loading branch information
afzal442 and afzalbin64 authored Jul 30, 2023
1 parent 287562c commit de3cf99
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 62 deletions.
2 changes: 2 additions & 0 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/otel"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand Down Expand Up @@ -104,6 +105,7 @@ by default uses only in-memory database.`,
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}
otel.SetTracerProvider(tracer.OTEL)

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(metricsFactory, logger); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/otel"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand Down Expand Up @@ -76,6 +77,7 @@ func main() {
if err != nil {
logger.Fatal("Failed to create tracer:", zap.Error(err))
}
otel.SetTracerProvider(jtracer.OTEL)
queryOpts, err := new(app.QueryOptions).InitFromViper(v, logger)
if err != nil {
logger.Fatal("Failed to configure query service", zap.Error(err))
Expand Down
34 changes: 20 additions & 14 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/es"
Expand Down Expand Up @@ -50,6 +52,7 @@ type Factory struct {

metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

Expand All @@ -64,6 +67,7 @@ func NewFactory() *Factory {
return &Factory{
Options: NewOptions(primaryNamespace, archiveNamespace),
newClientFn: config.NewClient,
tracer: otel.GetTracerProvider(),
}
}

Expand Down Expand Up @@ -108,49 +112,48 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false)
return createSpanReader(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.metricsFactory, f.logger, f.primaryClient, f.primaryConfig, false)
return createSpanWriter(f.primaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return createDependencyReader(f.logger, f.primaryClient, f.primaryConfig)
return createDependencyReader(f.primaryClient, f.primaryConfig, f.logger)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
return createSpanReader(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.metricsFactory, f.logger, f.archiveClient, f.archiveConfig, true)
return createSpanWriter(f.archiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
}

func createSpanReader(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
tp trace.TracerProvider,
) (spanstore.Reader, error) {
if cfg.UseILM && !cfg.UseReadWriteAliases {
return nil, fmt.Errorf("--es.use-ilm must always be used in conjunction with --es.use-aliases to ensure ES writers and readers refer to the single index mapping")
}
return esSpanStore.NewSpanReader(esSpanStore.SpanReaderParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
MaxDocCount: cfg.MaxDocCount,
MaxSpanAge: cfg.MaxSpanAge,
IndexPrefix: cfg.IndexPrefix,
Expand All @@ -162,15 +165,18 @@ func createSpanReader(
UseReadWriteAliases: cfg.UseReadWriteAliases,
Archive: archive,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
MetricsFactory: mFactory,
Tracer: tp.Tracer("esSpanStore.SpanReader"),
}), nil
}

func createSpanWriter(
mFactory metrics.Factory,
logger *zap.Logger,
client es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
) (spanstore.Writer, error) {
var tags []string
var err error
Expand All @@ -197,8 +203,6 @@ func createSpanWriter(
}
writer := esSpanStore.NewSpanWriter(esSpanStore.SpanWriterParams{
Client: client,
Logger: logger,
MetricsFactory: mFactory,
IndexPrefix: cfg.IndexPrefix,
SpanIndexDateLayout: cfg.IndexDateLayoutSpans,
ServiceIndexDateLayout: cfg.IndexDateLayoutServices,
Expand All @@ -207,6 +211,8 @@ func createSpanWriter(
TagDotReplacement: cfg.Tags.DotReplacement,
Archive: archive,
UseReadWriteAliases: cfg.UseReadWriteAliases,
Logger: logger,
MetricsFactory: mFactory,
})

// Creating a template here would conflict with the one created for ILM resulting to no index rollover
Expand All @@ -220,9 +226,9 @@ func createSpanWriter(
}

func createDependencyReader(
logger *zap.Logger,
client es.Client,
cfg *config.Configuration,
logger *zap.Logger,
) (dependencystore.Reader, error) {
reader := esDepStore.NewDependencyStore(esDepStore.DependencyStoreParams{
Client: client,
Expand Down
60 changes: 35 additions & 25 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"time"

"github.com/olivere/elastic"
"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -93,7 +93,6 @@ var (
// SpanReader can query for and load traces from ElasticSearch
type SpanReader struct {
client es.Client
logger *zap.Logger
// The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day,
// this will be rounded down to UTC 00:00 of that day.
maxSpanAge time.Duration
Expand All @@ -109,15 +108,15 @@ type SpanReader struct {
sourceFn sourceFn
maxDocCount int
useReadWriteAliases bool
logger *zap.Logger
tracer trace.Tracer
}

// SpanReaderParams holds constructor params for NewSpanReader
type SpanReaderParams struct {
Client es.Client
Logger *zap.Logger
MaxSpanAge time.Duration
MaxDocCount int
MetricsFactory metrics.Factory
IndexPrefix string
SpanIndexDateLayout string
ServiceIndexDateLayout string
Expand All @@ -127,6 +126,9 @@ type SpanReaderParams struct {
Archive bool
UseReadWriteAliases bool
RemoteReadClusters []string
MetricsFactory metrics.Factory
Logger *zap.Logger
Tracer trace.Tracer
}

// NewSpanReader returns a new SpanReader with a metrics.
Expand All @@ -139,7 +141,6 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
}
return &SpanReader{
client: p.Client,
logger: p.Logger,
maxSpanAge: maxSpanAge,
serviceOperationStorage: NewServiceOperationStorage(p.Client, p.Logger, 0), // the decorator takes care of metrics
spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex),
Expand All @@ -153,6 +154,8 @@ func NewSpanReader(p SpanReaderParams) *SpanReader {
sourceFn: getSourceFn(p.Archive, p.MaxDocCount),
maxDocCount: p.MaxDocCount,
useReadWriteAliases: p.UseReadWriteAliases,
logger: p.Logger,
tracer: p.Tracer,
}
}

Expand Down Expand Up @@ -238,8 +241,8 @@ func indexNames(prefix, index string) string {

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetTrace")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "GetTrace")
defer span.End()
currentTime := time.Now()
traces, err := s.multiRead(ctx, []model.TraceID{traceID}, currentTime.Add(-s.maxSpanAge), currentTime)
if err != nil {
Expand Down Expand Up @@ -283,8 +286,8 @@ func (s *SpanReader) unmarshalJSONSpan(esSpanRaw *elastic.SearchHit) (*dbmodel.S

// GetServices returns all services traced by Jaeger, ordered by frequency
func (s *SpanReader) GetServices(ctx context.Context) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetServices")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "GetService")
defer span.End()
currentTime := time.Now()
jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency)
return s.serviceOperationStorage.getServices(ctx, jaegerIndices, s.maxDocCount)
Expand All @@ -295,8 +298,8 @@ func (s *SpanReader) GetOperations(
ctx context.Context,
query spanstore.OperationQueryParameters,
) ([]spanstore.Operation, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "GetOperations")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "GetOperations")
defer span.End()
currentTime := time.Now()
jaegerIndices := s.timeRangeIndices(s.serviceIndexPrefix, s.serviceIndexDateLayout, currentTime.Add(-s.maxSpanAge), currentTime, s.serviceIndexRolloverFrequency)
operations, err := s.serviceOperationStorage.getOperations(ctx, jaegerIndices, query.ServiceName, s.maxDocCount)
Expand Down Expand Up @@ -329,8 +332,8 @@ func bucketToStringArray(buckets []*elastic.AggregationBucketKeyItem) ([]string,

// FindTraces retrieves traces that match the traceQuery
func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]*model.Trace, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "FindTraces")
defer span.End()

uniqueTraceIDs, err := s.FindTraceIDs(ctx, traceQuery)
if err != nil {
Expand All @@ -341,8 +344,8 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace

// FindTraceIDs retrieves traces IDs that match the traceQuery
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs")
defer span.Finish()
ctx, span := s.tracer.Start(ctx, "FindTraceIDs")
defer span.End()

if err := validateQuery(traceQuery); err != nil {
return nil, err
Expand All @@ -360,9 +363,16 @@ func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.Tra
}

func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, startTime, endTime time.Time) ([]*model.Trace, error) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "multiRead")
childSpan.LogFields(otlog.Object("trace_ids", traceIDs))
defer childSpan.Finish()
ctx, childSpan := s.tracer.Start(ctx, "multiRead")
defer childSpan.End()

if childSpan.IsRecording() {
tracesIDs := make([]string, len(traceIDs))
for i, traceID := range traceIDs {
tracesIDs[i] = traceID.String()
}
childSpan.SetAttributes(attribute.Key("trace_ids").StringSlice(tracesIDs))
}

if len(traceIDs) == 0 {
return []*model.Trace{}, nil
Expand Down Expand Up @@ -503,8 +513,8 @@ func validateQuery(p *spanstore.TraceQueryParameters) error {
}

func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "findTraceIDs")
defer childSpan.Finish()
ctx, childSpan := s.tracer.Start(ctx, "findTraceIDs")
defer childSpan.End()
// Below is the JSON body to our HTTP GET request to ElasticSearch. This function creates this.
// {
// "size": 0,
Expand Down Expand Up @@ -686,7 +696,7 @@ func (s *SpanReader) buildObjectQuery(field string, k string, v string) elastic.
return elastic.NewBoolQuery().Must(keyQuery)
}

func logErrorToSpan(span opentracing.Span, err error) {
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
func logErrorToSpan(span trace.Span, err error) {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
Loading

0 comments on commit de3cf99

Please sign in to comment.