Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace cassandra-spanstore tracing instrumentation withOTEL #4599

Merged
merged 6 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/cassandra"
Expand Down Expand Up @@ -57,6 +59,7 @@ type Factory struct {
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primarySession cassandra.Session
Expand All @@ -67,6 +70,7 @@ type Factory struct {
// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
}
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger), nil
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -143,7 +147,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger), nil
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
Expand Down
4 changes: 3 additions & 1 deletion plugin/storage/cassandra/savetracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"time"

"go.opentelemetry.io/otel"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
Expand All @@ -44,8 +45,9 @@ func main() {
if err != nil {
logger.Fatal("Cannot create Cassandra session", zap.Error(err))
}
tracer := otel.GetTracerProvider()
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
spanStore := cSpanStore.NewSpanWriter(cqlSession, time.Hour*12, noScope, logger)
spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger)
spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger, tracer.Tracer("cSpanStore.SpanReader"))
ctx := context.Background()
if err = spanStore.WriteSpan(ctx, getSomeSpan()); err != nil {
logger.Fatal("Failed to save", zap.Error(err))
Expand Down
88 changes: 52 additions & 36 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
"fmt"
"time"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -110,14 +112,19 @@
operationNamesReader operationNamesReader
metrics spanReaderMetrics
logger *zap.Logger
tracer trace.Tracer
}

// NewSpanReader returns a new SpanReader.
func NewSpanReader(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
tracer trace.Tracer,
) *SpanReader {
if tracer == nil {
tracer = otel.GetTracerProvider().Tracer("cSpanStore.SpanReader")
}

Check warning on line 127 in plugin/storage/cassandra/spanstore/reader.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/spanstore/reader.go#L126-L127

Added lines #L126 - L127 were not covered by tests
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
readFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "read", Tags: nil})
serviceNamesStorage := NewServiceNamesStorage(session, 0, metricsFactory, logger)
operationNamesStorage := NewOperationNamesStorage(session, 0, metricsFactory, logger)
Expand All @@ -134,6 +141,7 @@
queryServiceNameIndex: casMetrics.NewTable(readFactory, "service_name_index"),
},
logger: logger,
tracer: tracer,
}
}

Expand All @@ -150,10 +158,13 @@
return s.operationNamesReader(query)
}

func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) {
span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.Finish()
span.LogFields(otlog.String("event", "searching"), otlog.Object("trace_id", traceID))
func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID, tracer trace.Tracer) (*model.Trace, error) {
ctx, span := startSpanForQuery(ctx, "readTrace", querySpanByTraceID, tracer)
defer span.End()
span.SetAttributes(
attribute.Key("event").String("searching"),
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
attribute.Key("trace_id").String(traceID.String()),
)

trace, err := s.readTraceInSpan(ctx, traceID)
logErrorToSpan(span, err)
Expand Down Expand Up @@ -209,7 +220,7 @@

// GetTrace takes a traceID and returns a Trace associated with that traceID
func (s *SpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID))
return s.readTrace(ctx, dbmodel.TraceIDFromDomain(traceID), s.tracer)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
}

func validateQuery(p *spanstore.TraceQueryParameters) error {
Expand Down Expand Up @@ -287,7 +298,7 @@
return nil, err
}
if len(traceQuery.Tags) > 0 {
tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery)
tagTraceIds, err := s.queryByTagsAndLogs(ctx, traceQuery, s.tracer)
if err != nil {
return nil, err
}
Expand All @@ -299,19 +310,22 @@
return traceIds, nil
}
if len(traceQuery.Tags) > 0 {
return s.queryByTagsAndLogs(ctx, traceQuery)
return s.queryByTagsAndLogs(ctx, traceQuery, s.tracer)
}
return s.queryByService(ctx, traceQuery)
}

func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.Finish()
func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters, tracer trace.Tracer) (dbmodel.UniqueTraceIDs, error) {
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
ctx, span := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag, tracer)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
defer span.End()

results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags))
for k, v := range tq.Tags {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryByTag")
childSpan.LogFields(otlog.String("tag.key", k), otlog.String("tag.value", v))
_, childSpan := s.tracer.Start(ctx, "queryByTag")
childSpan.SetAttributes(
attribute.Key("tag.key").String(k),
attribute.Key("tag.value").String(v),
)
query := s.session.Query(
queryByTag,
tq.ServiceName,
Expand All @@ -322,7 +336,7 @@
tq.NumTraces*limitMultiple,
).PageSize(0)
t, err := s.executeQuery(childSpan, query, s.metrics.queryTagIndex)
childSpan.Finish()
defer childSpan.End()
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand All @@ -332,8 +346,8 @@
}

func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.Finish()
ctx, span := startSpanForQuery(ctx, "queryByDuration", queryByDuration, s.tracer)
defer span.End()

results := dbmodel.UniqueTraceIDs{}

Expand All @@ -349,8 +363,8 @@
endTimeByHour := traceQuery.StartTimeMax.Round(durationBucketSize)

for timeBucket := endTimeByHour; timeBucket.After(startTimeByHour) || timeBucket.Equal(startTimeByHour); timeBucket = timeBucket.Add(-1 * durationBucketSize) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryForTimeBucket")
childSpan.LogFields(otlog.String("timeBucket", timeBucket.String()))
_, childSpan := s.tracer.Start(ctx, "queryForTimeBucket")
childSpan.SetAttributes(attribute.Key("timeBucket").String(timeBucket.String()))
query := s.session.Query(
queryByDuration,
timeBucket,
Expand All @@ -360,7 +374,7 @@
maxDurationMicros,
traceQuery.NumTraces*limitMultiple)
t, err := s.executeQuery(childSpan, query, s.metrics.queryDurationIndex)
childSpan.Finish()
childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -376,8 +390,8 @@
}

func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.Finish()
_, span := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName, s.tracer)
defer span.End()
query := s.session.Query(
queryByServiceAndOperationName,
tq.ServiceName,
Expand All @@ -390,8 +404,8 @@
}

func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByService", queryByServiceName)
defer span.Finish()
_, span := startSpanForQuery(ctx, "queryByService", queryByServiceName, s.tracer)
defer span.End()
query := s.session.Query(
queryByServiceName,
tq.ServiceName,
Expand All @@ -402,7 +416,7 @@
return s.executeQuery(span, query, s.metrics.queryServiceNameIndex)
}

func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
start := time.Now()
i := query.Iter()
retMe := dbmodel.UniqueTraceIDs{}
Expand All @@ -414,25 +428,27 @@
tableMetrics.Emit(err, time.Since(start))
if err != nil {
logErrorToSpan(span, err)
span.LogFields(otlog.String("query", query.String()))
span.SetAttributes(attribute.Key("query").String(query.String()))
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
s.logger.Error("Failed to exec query", zap.Error(err), zap.String("query", query.String()))
return nil, err
}
return retMe, nil
}

func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, name)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "cassandra")
ottag.Component.Set(span, "gocql")
return span, ctx
func startSpanForQuery(ctx context.Context, name, query string, tp trace.Tracer) (context.Context, trace.Span) {
ctx, span := tp.Start(ctx, name)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("cassandra"),
attribute.Key("component").String("gocql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
func logErrorToSpan(span trace.Span, err error) {
if err == nil {
return
}
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
4 changes: 3 additions & 1 deletion plugin/storage/cassandra/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -52,11 +53,12 @@ func withSpanReader(fn func(r *spanReaderTest)) {
query.On("Exec").Return(nil)
logger, logBuffer := testutils.NewLogger()
metricsFactory := metricstest.NewFactory(0)
tracer := jtracer.NoOp().OTEL
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
r := &spanReaderTest{
session: session,
logger: logger,
logBuffer: logBuffer,
reader: NewSpanReader(session, metricsFactory, logger),
reader: NewSpanReader(session, metricsFactory, logger, tracer.Tracer("test")),
}
fn(r)
}
Expand Down