Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace cassandra-spanstore tracing instrumentation withOTEL #4599

Merged
merged 6 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/cassandra"
Expand Down Expand Up @@ -57,6 +59,7 @@ type Factory struct {
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

primaryConfig config.SessionBuilder
primarySession cassandra.Session
Expand All @@ -67,6 +70,7 @@ type Factory struct {
// NewFactory creates a new Factory.
func NewFactory() *Factory {
return &Factory{
tracer: otel.GetTracerProvider(),
Options: NewOptions(primaryStorageConfig, archiveStorageConfig),
}
}
Expand Down Expand Up @@ -120,7 +124,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger), nil
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -143,7 +147,7 @@ func (f *Factory) CreateArchiveSpanReader() (spanstore.Reader, error) {
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger), nil
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader")), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
Expand Down
7 changes: 6 additions & 1 deletion plugin/storage/cassandra/savetracetest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/jaegertracing/jaeger/model"
cascfg "github.com/jaegertracing/jaeger/pkg/cassandra/config"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -44,8 +45,12 @@ func main() {
if err != nil {
logger.Fatal("Cannot create Cassandra session", zap.Error(err))
}
tracer, err := jtracer.New("cassandra")
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}
spanStore := cSpanStore.NewSpanWriter(cqlSession, time.Hour*12, noScope, logger)
spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger)
spanReader := cSpanStore.NewSpanReader(cqlSession, noScope, logger, tracer.OTEL.Tracer("cSpanStore.SpanReader"))
ctx := context.Background()
if err = spanStore.WriteSpan(ctx, getSomeSpan()); err != nil {
logger.Fatal("Failed to save", zap.Error(err))
Expand Down
73 changes: 42 additions & 31 deletions plugin/storage/cassandra/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import (
"fmt"
"time"

"github.com/opentracing/opentracing-go"
ottag "github.com/opentracing/opentracing-go/ext"
otlog "github.com/opentracing/opentracing-go/log"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -110,13 +111,15 @@ type SpanReader struct {
operationNamesReader operationNamesReader
metrics spanReaderMetrics
logger *zap.Logger
tracer trace.Tracer
}

// NewSpanReader returns a new SpanReader.
func NewSpanReader(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
tracer trace.Tracer,
) *SpanReader {
readFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "read", Tags: nil})
serviceNamesStorage := NewServiceNamesStorage(session, 0, metricsFactory, logger)
Expand All @@ -134,6 +137,7 @@ func NewSpanReader(
queryServiceNameIndex: casMetrics.NewTable(readFactory, "service_name_index"),
},
logger: logger,
tracer: tracer,
}
}

Expand All @@ -151,9 +155,11 @@ func (s *SpanReader) GetOperations(
}

func (s *SpanReader) readTrace(ctx context.Context, traceID dbmodel.TraceID) (*model.Trace, error) {
span, ctx := startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.Finish()
span.LogFields(otlog.String("event", "searching"), otlog.Object("trace_id", traceID))
ctx, span := s.startSpanForQuery(ctx, "readTrace", querySpanByTraceID)
defer span.End()
span.SetAttributes(
attribute.Key("trace_id").String(traceID.String()),
)
afzal442 marked this conversation as resolved.
Show resolved Hide resolved

trace, err := s.readTraceInSpan(ctx, traceID)
logErrorToSpan(span, err)
Expand Down Expand Up @@ -305,13 +311,16 @@ func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.Tra
}

func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.Finish()
ctx, span := s.startSpanForQuery(ctx, "queryByTagsAndLogs", queryByTag)
defer span.End()

results := make([]dbmodel.UniqueTraceIDs, 0, len(tq.Tags))
for k, v := range tq.Tags {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryByTag")
childSpan.LogFields(otlog.String("tag.key", k), otlog.String("tag.value", v))
_, childSpan := s.tracer.Start(ctx, "queryByTag")
childSpan.SetAttributes(
attribute.Key("tag.key").String(k),
attribute.Key("tag.value").String(v),
)
query := s.session.Query(
queryByTag,
tq.ServiceName,
Expand All @@ -322,7 +331,7 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace
tq.NumTraces*limitMultiple,
).PageSize(0)
t, err := s.executeQuery(childSpan, query, s.metrics.queryTagIndex)
childSpan.Finish()
childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -332,8 +341,8 @@ func (s *SpanReader) queryByTagsAndLogs(ctx context.Context, tq *spanstore.Trace
}

func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, ctx := startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.Finish()
ctx, span := s.startSpanForQuery(ctx, "queryByDuration", queryByDuration)
defer span.End()

results := dbmodel.UniqueTraceIDs{}

Expand All @@ -349,8 +358,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
endTimeByHour := traceQuery.StartTimeMax.Round(durationBucketSize)

for timeBucket := endTimeByHour; timeBucket.After(startTimeByHour) || timeBucket.Equal(startTimeByHour); timeBucket = timeBucket.Add(-1 * durationBucketSize) {
childSpan, _ := opentracing.StartSpanFromContext(ctx, "queryForTimeBucket")
childSpan.LogFields(otlog.String("timeBucket", timeBucket.String()))
_, childSpan := s.tracer.Start(ctx, "queryForTimeBucket")
childSpan.SetAttributes(attribute.Key("timeBucket").String(timeBucket.String()))
query := s.session.Query(
queryByDuration,
timeBucket,
Expand All @@ -360,7 +369,7 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
maxDurationMicros,
traceQuery.NumTraces*limitMultiple)
t, err := s.executeQuery(childSpan, query, s.metrics.queryDurationIndex)
childSpan.Finish()
childSpan.End()
if err != nil {
return nil, err
}
Expand All @@ -376,8 +385,8 @@ func (s *SpanReader) queryByDuration(ctx context.Context, traceQuery *spanstore.
}

func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.Finish()
_, span := s.startSpanForQuery(ctx, "queryByServiceNameAndOperation", queryByServiceAndOperationName)
defer span.End()
query := s.session.Query(
queryByServiceAndOperationName,
tq.ServiceName,
Expand All @@ -390,8 +399,8 @@ func (s *SpanReader) queryByServiceNameAndOperation(ctx context.Context, tq *spa
}

func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQueryParameters) (dbmodel.UniqueTraceIDs, error) {
span, _ := startSpanForQuery(ctx, "queryByService", queryByServiceName)
defer span.Finish()
_, span := s.startSpanForQuery(ctx, "queryByService", queryByServiceAndOperationName)
defer span.End()
query := s.session.Query(
queryByServiceName,
tq.ServiceName,
Expand All @@ -402,7 +411,7 @@ func (s *SpanReader) queryByService(ctx context.Context, tq *spanstore.TraceQuer
return s.executeQuery(span, query, s.metrics.queryServiceNameIndex)
}

func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
func (s *SpanReader) executeQuery(span trace.Span, query cassandra.Query, tableMetrics *casMetrics.Table) (dbmodel.UniqueTraceIDs, error) {
start := time.Now()
i := query.Iter()
retMe := dbmodel.UniqueTraceIDs{}
Expand All @@ -414,25 +423,27 @@ func (s *SpanReader) executeQuery(span opentracing.Span, query cassandra.Query,
tableMetrics.Emit(err, time.Since(start))
if err != nil {
logErrorToSpan(span, err)
span.LogFields(otlog.String("query", query.String()))
span.SetAttributes(attribute.Key("query").String(query.String()))
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
s.logger.Error("Failed to exec query", zap.Error(err), zap.String("query", query.String()))
return nil, err
}
return retMe, nil
}

func startSpanForQuery(ctx context.Context, name, query string) (opentracing.Span, context.Context) {
span, ctx := opentracing.StartSpanFromContext(ctx, name)
ottag.DBStatement.Set(span, query)
ottag.DBType.Set(span, "cassandra")
ottag.Component.Set(span, "gocql")
return span, ctx
func (s *SpanReader) startSpanForQuery(ctx context.Context, name, query string) (context.Context, trace.Span) {
ctx, span := s.tracer.Start(ctx, name)
span.SetAttributes(
attribute.Key(semconv.DBStatementKey).String(query),
attribute.Key(semconv.DBSystemKey).String("cassandra"),
attribute.Key("component").String("gocql"),
)
return ctx, span
}

func logErrorToSpan(span opentracing.Span, err error) {
func logErrorToSpan(span trace.Span, err error) {
if err == nil {
return
}
ottag.Error.Set(span, true)
span.LogFields(otlog.Error(err))
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
4 changes: 3 additions & 1 deletion plugin/storage/cassandra/spanstore/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/cassandra"
"github.com/jaegertracing/jaeger/pkg/cassandra/mocks"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage/spanstore"
Expand All @@ -52,11 +53,12 @@ func withSpanReader(fn func(r *spanReaderTest)) {
query.On("Exec").Return(nil)
logger, logBuffer := testutils.NewLogger()
metricsFactory := metricstest.NewFactory(0)
tracer := jtracer.NoOp().OTEL
afzal442 marked this conversation as resolved.
Show resolved Hide resolved
r := &spanReaderTest{
session: session,
logger: logger,
logBuffer: logBuffer,
reader: NewSpanReader(session, metricsFactory, logger),
reader: NewSpanReader(session, metricsFactory, logger, tracer.Tracer("test")),
}
fn(r)
}
Expand Down
Loading