From e36ac10db0f606b22ce10013cdaa40a42b2cff87 Mon Sep 17 00:00:00 2001 From: Alexander Tumin Date: Sat, 27 Jun 2020 22:46:50 +0300 Subject: [PATCH] Zipkin-specific observer support closes #156 --- observer.go | 53 ++++++++++++++++++++++++++++++++++++++++++ span.go | 59 ++++++++++++++++++++++++++++++++++++++++------- tracer.go | 15 ++++++++++-- tracer_options.go | 12 ++++++++-- tracer_test.go | 54 +++++++++++++++++++++++++++++++++++++------ 5 files changed, 174 insertions(+), 19 deletions(-) create mode 100644 observer.go diff --git a/observer.go b/observer.go new file mode 100644 index 0000000..d0c7102 --- /dev/null +++ b/observer.go @@ -0,0 +1,53 @@ +package zipkintracer + +import ( + "time" + + "github.com/openzipkin/zipkin-go" + "github.com/openzipkin/zipkin-go/model" +) + +// ZipkinStartSpanOptions allows ZipkinObserver.OnStartSpan() to inspect +// options used during zipkin.Span creation +type ZipkinStartSpanOptions struct { + // Parent span context reference, if any + Parent *model.SpanContext + + // Span's start time + StartTime time.Time + + // Kind clarifies context of timestamp, duration and remoteEndpoint in a span. + Kind model.Kind + + // Tags used during span creation + Tags map[string]string + + // RemoteEndpoint used during span creation + RemoteEndpoint *model.Endpoint +} + +// ZipkinObserver may be registered with a Tracer to receive notifications about new Spans +type ZipkinObserver interface { + // OnStartSpan is called when new Span is created. Creates and returns span observer. + // If the observer is not interested in the given span, it must return nil. + OnStartSpan(sp zipkin.Span, operationName string, options ZipkinStartSpanOptions) ZipkinSpanObserver +} + +// ZipkinSpanObserver is created by the ZipkinObserver and receives notifications about +// other Span events. +type ZipkinSpanObserver interface { + // Callback called from zipkin.Span.SetName() + OnSetName(operationName string) + + // Callback called from zipkin.Span.SetTag() + OnSetTag(key, value string) + + // Callback called from zipkin.Span.Annotate() + OnAnnotate(t time.Time, annotation string) + + // Callback called from zipkin.Span.FinishedWithDuration() + OnFinishedWithDuration(duration time.Duration) + + // Callback called from zipkin.Span.Finish() + OnFinish() +} diff --git a/span.go b/span.go index fe7915d..63264de 100644 --- a/span.go +++ b/span.go @@ -31,10 +31,11 @@ type FinisherWithDuration interface { } type spanImpl struct { - tracer *tracerImpl - zipkinSpan zipkin.Span - startTime time.Time - observer otobserver.SpanObserver + tracer *tracerImpl + zipkinSpan zipkin.Span + startTime time.Time + observer otobserver.SpanObserver + zipkinObserver ZipkinSpanObserver } func (s *spanImpl) SetOperationName(operationName string) opentracing.Span { @@ -42,6 +43,10 @@ func (s *spanImpl) SetOperationName(operationName string) opentracing.Span { s.observer.OnSetOperationName(operationName) } + if s.zipkinObserver != nil { + s.zipkinObserver.OnSetName(operationName) + } + s.zipkinSpan.SetName(operationName) return s } @@ -67,7 +72,13 @@ func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span { return s } - s.zipkinSpan.Tag(key, fmt.Sprint(value)) + tagValue := fmt.Sprint(value) + + if s.zipkinObserver != nil { + s.zipkinObserver.OnSetTag(key, tagValue) + } + + s.zipkinSpan.Tag(key, tagValue) return s } @@ -78,7 +89,14 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) { } for _, field := range fields { - s.zipkinSpan.Annotate(time.Now(), field.String()) + t := time.Now() + fieldValue := field.String() + + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(t, fieldValue) + } + + s.zipkinSpan.Annotate(t, fieldValue) } } @@ -88,6 +106,10 @@ func (s *spanImpl) LogFields(fields ...log.Field) { func (s *spanImpl) logFields(t time.Time, fields ...log.Field) { for _, field := range fields { + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(t, field.String()) + } + s.zipkinSpan.Annotate(t, field.String()) } } @@ -110,7 +132,13 @@ func (s *spanImpl) Log(ld opentracing.LogData) { ld.Timestamp = time.Now() } - s.zipkinSpan.Annotate(ld.Timestamp, fmt.Sprintf("%s:%s", ld.Event, ld.Payload)) + annotation := fmt.Sprintf("%s:%s", ld.Event, ld.Payload) + + if s.zipkinObserver != nil { + s.zipkinObserver.OnAnnotate(ld.Timestamp, annotation) + } + + s.zipkinSpan.Annotate(ld.Timestamp, annotation) } func (s *spanImpl) Finish() { @@ -118,6 +146,10 @@ func (s *spanImpl) Finish() { s.observer.OnFinish(opentracing.FinishOptions{}) } + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinish() + } + s.zipkinSpan.Finish() } @@ -135,10 +167,21 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) { if !ok { return } - f.FinishedWithDuration(opts.FinishTime.Sub(s.startTime)) + + dur := opts.FinishTime.Sub(s.startTime) + + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinishedWithDuration(dur) + } + + f.FinishedWithDuration(dur) return } + if s.zipkinObserver != nil { + s.zipkinObserver.OnFinish() + } + s.Finish() } diff --git a/tracer.go b/tracer.go index b48d92a..5580350 100644 --- a/tracer.go +++ b/tracer.go @@ -58,11 +58,14 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp zopts := make([]zipkin.SpanOption, 0) + var zipkinStartSpanOptions ZipkinStartSpanOptions + // Parent if len(startSpanOptions.References) > 0 { parent, ok := (startSpanOptions.References[0].ReferencedContext).(SpanContext) if ok { zopts = append(zopts, zipkin.Parent(model.SpanContext(parent))) + zipkinStartSpanOptions.Parent = (*model.SpanContext)(&parent) } } @@ -71,9 +74,10 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp if !startSpanOptions.StartTime.IsZero() { zopts = append(zopts, zipkin.StartTime(startSpanOptions.StartTime)) startTime = startSpanOptions.StartTime + zipkinStartSpanOptions.StartTime = startSpanOptions.StartTime } - zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags)...) + zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags, &zipkinStartSpanOptions)...) newSpan := t.zipkinTracer.StartSpan(operationName, zopts...) @@ -87,10 +91,14 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp sp.observer = observer } + if t.opts.zipkinObserver != nil { + sp.zipkinObserver = t.opts.zipkinObserver.OnStartSpan(sp.zipkinSpan, operationName, zipkinStartSpanOptions) + } + return sp } -func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { +func parseTagsAsZipkinOptions(t map[string]interface{}, options *ZipkinStartSpanOptions) []zipkin.SpanOption { zopts := make([]zipkin.SpanOption, 0) tags := map[string]string{} @@ -112,6 +120,7 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { mKind == model.Producer || mKind == model.Consumer { zopts = append(zopts, zipkin.Kind(mKind)) + options.Kind = mKind } else { tags["span.kind"] = kind } @@ -151,10 +160,12 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption { if len(tags) > 0 { zopts = append(zopts, zipkin.Tags(tags)) + options.Tags = tags } if !remoteEndpoint.Empty() { zopts = append(zopts, zipkin.RemoteEndpoint(remoteEndpoint)) + options.RemoteEndpoint = remoteEndpoint } return zopts diff --git a/tracer_options.go b/tracer_options.go index 190a692..ae2e312 100644 --- a/tracer_options.go +++ b/tracer_options.go @@ -31,8 +31,9 @@ const ( // TracerOptions allows creating a customized Tracer. type TracerOptions struct { - observer otobserver.Observer - b3InjectOpt B3InjectOption + observer otobserver.Observer + b3InjectOpt B3InjectOption + zipkinObserver ZipkinObserver } // TracerOption allows for functional options. @@ -46,6 +47,13 @@ func WithObserver(observer otobserver.Observer) TracerOption { } } +// WithZipkinObserver assigns an initialized zipkin observer to opts.zipkinObserver +func WithZipkinObserver(zipkinObserver ZipkinObserver) TracerOption { + return func(opts *TracerOptions) { + opts.zipkinObserver = zipkinObserver + } +} + // WithB3InjectOption sets the B3 injection style if using the native OpenTracing HTTPHeadersCarrier func WithB3InjectOption(b3InjectOption B3InjectOption) TracerOption { return func(opts *TracerOptions) { diff --git a/tracer_test.go b/tracer_test.go index 0c991d0..00aac43 100644 --- a/tracer_test.go +++ b/tracer_test.go @@ -38,7 +38,9 @@ func TestOTKindTagIsParsedSuccessfuly(t *testing.T) { {"span.kind": ext.SpanKindRPCServerEnum}, } for _, tags := range tagCases { - opts := parseTagsAsZipkinOptions(tags) + var zipkinStartSpanOptions ZipkinStartSpanOptions + + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -52,12 +54,18 @@ func TestOTKindTagIsParsedSuccessfuly(t *testing.T) { if want, have := model.Server, spans[0].Kind; want != have { t.Errorf("unexpected kind value, want %s, have %s", want, have) } + + if want, have := model.Server, zipkinStartSpanOptions.Kind; want != have { + t.Errorf("unexpected start options kind value, want %s, have %s", want, have) + } } } func TestOTKindTagIsCantBeParsed(t *testing.T) { + var zipkinStartSpanOptions ZipkinStartSpanOptions + tags := map[string]interface{}{"span.kind": "banana"} - opts := parseTagsAsZipkinOptions(tags) + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -75,13 +83,29 @@ func TestOTKindTagIsCantBeParsed(t *testing.T) { if want, have := "banana", spans[0].Tags["span.kind"]; want != have { t.Errorf("unexpected tag value, want %s, have %s", want, have) } + + if zipkinStartSpanOptions.Tags == nil { + t.Errorf("unexpected start options tags value, want non-nil map, have %v", zipkinStartSpanOptions.Tags) + } + + if want, have := "banana", zipkinStartSpanOptions.Tags["span.kind"]; want != have { + t.Errorf("unexpected start options tags[span.kind] value, want %s, have %s", want, have) + } } func TestOptionsFromOTTags(t *testing.T) { + var zipkinStartSpanOptions ZipkinStartSpanOptions + + const ( + sServiceA = "service_a" + sValue = "value" + sKey = "key" + ) + tags := map[string]interface{}{} - tags[string(ext.PeerService)] = "service_a" - tags["key"] = "value" - opts := parseTagsAsZipkinOptions(tags) + tags[string(ext.PeerService)] = sServiceA + tags[sKey] = sValue + opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions) rec := recorder.NewReporter() tr, _ := zipkin.NewTracer(rec) @@ -92,11 +116,27 @@ func TestOptionsFromOTTags(t *testing.T) { t.Fatalf("unexpected number of spans, want %d, have %d", want, have) } - if want, have := "service_a", spans[0].RemoteEndpoint.ServiceName; want != have { + if want, have := sServiceA, spans[0].RemoteEndpoint.ServiceName; want != have { t.Errorf("unexpected remote service name, want %s, have %s", want, have) } - if want, have := "value", spans[0].Tags["key"]; want != have { + if want, have := sValue, spans[0].Tags[sKey]; want != have { t.Errorf("unexpected tag value, want %s, have %s", want, have) } + + if zipkinStartSpanOptions.Tags == nil { + t.Errorf("unexpected start options tags value, want non-nil map, have %s", zipkinStartSpanOptions.Tags) + } + + if want, have := sValue, zipkinStartSpanOptions.Tags[sKey]; want != have { + t.Errorf("unexpected start options tags[key] value, want %s, have %s", want, have) + } + + if zipkinStartSpanOptions.RemoteEndpoint == nil { + t.Errorf("unexpected start options remote endpoint value, want non-nil instance, have %v", zipkinStartSpanOptions.RemoteEndpoint) + } + + if want, have := sServiceA, zipkinStartSpanOptions.RemoteEndpoint.ServiceName; want != have { + t.Errorf("unexpected start options remote service name, want %s, have %s", want, have) + } }