diff --git a/plugin/storage/cassandra/spanstore/reader.go b/plugin/storage/cassandra/spanstore/reader.go index 3a4a0519ffb..ec12099515b 100644 --- a/plugin/storage/cassandra/spanstore/reader.go +++ b/plugin/storage/cassandra/spanstore/reader.go @@ -21,10 +21,10 @@ import ( "fmt" "time" - "github.com/opentracing/opentracing-go" - ottag "github.com/opentracing/opentracing-go/ext" - otlog "github.com/opentracing/opentracing-go/log" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -158,10 +158,13 @@ func (s *SpanReader) GetOperations( return s.operationNamesReader(query) } -func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) { - span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID) - defer span.Finish() - span.LogFields(otlog.String("event", "searching"), otlog.Object("trace_id", traceID)) +func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID, tracer trace.Tracer) (*model.Trace, error) { + ctx, span := startSpanForQuery(ctx, "readTrace", querySpanByTraceID, tracer) + defer span.End() + span.SetAttributes( + attribute.Key("event").String("searching"), + attribute.Key("trace_id").String(traceID.String()), + ) trace, err := s.readTraceInSpan(ctx, traceID) logErrorToSpan(span, err) @@ -217,7 +220,7 @@ func (s *SpanReader) readTraceInSpan(ctx context.Context, traceID dbmodel.TraceI // GetTrace takes a traceID and returns a Trace associated with that traceID func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID)) + return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID), s.tracer) } func validateQuery(p *spanstore.TraceQueryParameters) error { @@ -295,7 +298,7 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra return nil, err } if len(traceQuery.Tags) > 0 { - tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery) + tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery, s.tracer) if err != nil { return nil, err } @@ -307,19 +310,22 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra return traceIds, nil } if len(traceQuery.Tags) > 0 { - return s.queryByTagsAndLogs(ctx, traceQuery) + return s.queryByTagsAndLogs(ctx, traceQuery, s.tracer) } return s.queryByService(ctx, traceQuery) } -func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag) - defer span.Finish() +func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters, tracer trace.Tracer) (dbmodel.UniqueTraceIDs, error) { + ctx, span := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag, tracer) + defer span.End() results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags)) for k, v := range tq.Tags { - childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryByTag") - childSpan.LogFields(otlog.String("tag.key", k), otlog.String("tag.value", v)) + _, childSpan := s.tracer.Start(ctx, "queryByTag") + childSpan.SetAttributes( + attribute.Key("tag.key").String(k), + attribute.Key("tag.value").String(v), + ) query := s.session.Query( queryByTag, tq.ServiceName, @@ -330,7 +336,7 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace tq.NumTraces*limitMultiple, ).PageSize(0) t, err := s.executeQuery(childSpan, query, s.metrics.queryTagIndex) - childSpan.Finish() + defer childSpan.End() if err != nil { return nil, err } @@ -340,8 +346,8 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace } func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration) - defer span.Finish() + ctx, span := startSpanForQuery(ctx, "queryByDuration", queryByDuration, s.tracer) + defer span.End() results := dbmodel.UniqueTraceIDs{} @@ -357,8 +363,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore. endTimeByHour := traceQuery.StartTimeMax.Round(durationBucketSize) for timeBucket := endTimeByHour; timeBucket.After(startTimeByHour) || timeBucket.Equal(startTimeByHour); timeBucket = timeBucket.Add(-1 * durationBucketSize) { - childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryForTimeBucket") - childSpan.LogFields(otlog.String("timeBucket", timeBucket.String())) + _, childSpan := s.tracer.Start(ctx, "queryForTimeBucket") + childSpan.SetAttributes(attribute.Key("timeBucket").String(timeBucket.String())) query := s.session.Query( queryByDuration, timeBucket, @@ -368,7 +374,7 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore. maxDurationMicros, traceQuery.NumTraces*limitMultiple) t, err := s.executeQuery(childSpan, query, s.metrics.queryDurationIndex) - childSpan.Finish() + childSpan.End() if err != nil { return nil, err } @@ -384,8 +390,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore. } func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - span, _ := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName) - defer span.Finish() + _, span := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName, s.tracer) + defer span.End() query := s.session.Query( queryByServiceAndOperationName, tq.ServiceName, @@ -398,8 +404,8 @@ func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spa } func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) { - span, _ := startSpanForQuery(ctx, "queryByService", queryByServiceName) - defer span.Finish() + _, span := startSpanForQuery(ctx, "queryByService", queryByServiceName, s.tracer) + defer span.End() query := s.session.Query( queryByServiceName, tq.ServiceName, @@ -410,7 +416,7 @@ func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQuer return s.executeQuery(span, query, s.metrics.queryServiceNameIndex) } -func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) { +func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) { start := time.Now() i := query.Iter() retMe := dbmodel.UniqueTraceIDs{} @@ -422,25 +428,27 @@ func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics.Emit(err, time.Since(start)) if err != nil { logErrorToSpan(span, err) - span.LogFields(otlog.String("query", query.String())) + span.SetAttributes(attribute.Key("query").String(query.String())) s.logger.Error("Failed to exec query", zap.Error(err), zap.String("query", query.String())) return nil, err } return retMe, nil } -func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) { - span, ctx := opentracing.StartSpanFromContext(ctx, name) - ottag.DBStatement.Set(span, query) - ottag.DBType.Set(span, "cassandra") - ottag.Component.Set(span, "gocql") - return span, ctx +func startSpanForQuery(ctx context.Context, name, query string, tp trace.Tracer) (context.Context, trace.Span) { + ctx, span := tp.Start(ctx, name) + span.SetAttributes( + attribute.Key(semconv.DBStatementKey).String(query), + attribute.Key(semconv.DBSystemKey).String("cassandra"), + attribute.Key("component").String("gocql"), + ) + return ctx, span } -func logErrorToSpan(span opentracing.Span, err error) { +func logErrorToSpan(span trace.Span, err error) { if err == nil { return } - ottag.Error.Set(span, true) - span.LogFields(otlog.Error(err)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) }