Skip to content

Commit

Permalink
Zipkin-specific observer support
Browse files Browse the repository at this point in the history
closes #156
  • Loading branch information
iamtakingiteasy committed Jun 27, 2020
1 parent 223664c commit e36ac10
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 19 deletions.
53 changes: 53 additions & 0 deletions observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package zipkintracer

import (
"time"

"github.com/openzipkin/zipkin-go"
"github.com/openzipkin/zipkin-go/model"
)

// ZipkinStartSpanOptions allows ZipkinObserver.OnStartSpan() to inspect
// options used during zipkin.Span creation
type ZipkinStartSpanOptions struct {
// Parent span context reference, if any
Parent *model.SpanContext

// Span's start time
StartTime time.Time

// Kind clarifies context of timestamp, duration and remoteEndpoint in a span.
Kind model.Kind

// Tags used during span creation
Tags map[string]string

// RemoteEndpoint used during span creation
RemoteEndpoint *model.Endpoint
}

// ZipkinObserver may be registered with a Tracer to receive notifications about new Spans
type ZipkinObserver interface {
// OnStartSpan is called when new Span is created. Creates and returns span observer.
// If the observer is not interested in the given span, it must return nil.
OnStartSpan(sp zipkin.Span, operationName string, options ZipkinStartSpanOptions) ZipkinSpanObserver
}

// ZipkinSpanObserver is created by the ZipkinObserver and receives notifications about
// other Span events.
type ZipkinSpanObserver interface {
// Callback called from zipkin.Span.SetName()
OnSetName(operationName string)

// Callback called from zipkin.Span.SetTag()
OnSetTag(key, value string)

// Callback called from zipkin.Span.Annotate()
OnAnnotate(t time.Time, annotation string)

// Callback called from zipkin.Span.FinishedWithDuration()
OnFinishedWithDuration(duration time.Duration)

// Callback called from zipkin.Span.Finish()
OnFinish()
}
59 changes: 51 additions & 8 deletions span.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,22 @@ type FinisherWithDuration interface {
}

type spanImpl struct {
tracer *tracerImpl
zipkinSpan zipkin.Span
startTime time.Time
observer otobserver.SpanObserver
tracer *tracerImpl
zipkinSpan zipkin.Span
startTime time.Time
observer otobserver.SpanObserver
zipkinObserver ZipkinSpanObserver
}

func (s *spanImpl) SetOperationName(operationName string) opentracing.Span {
if s.observer != nil {
s.observer.OnSetOperationName(operationName)
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetName(operationName)
}

s.zipkinSpan.SetName(operationName)
return s
}
Expand All @@ -67,7 +72,13 @@ func (s *spanImpl) SetTag(key string, value interface{}) opentracing.Span {
return s
}

s.zipkinSpan.Tag(key, fmt.Sprint(value))
tagValue := fmt.Sprint(value)

if s.zipkinObserver != nil {
s.zipkinObserver.OnSetTag(key, tagValue)
}

s.zipkinSpan.Tag(key, tagValue)
return s
}

Expand All @@ -78,7 +89,14 @@ func (s *spanImpl) LogKV(keyValues ...interface{}) {
}

for _, field := range fields {
s.zipkinSpan.Annotate(time.Now(), field.String())
t := time.Now()
fieldValue := field.String()

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(t, fieldValue)
}

s.zipkinSpan.Annotate(t, fieldValue)
}
}

Expand All @@ -88,6 +106,10 @@ func (s *spanImpl) LogFields(fields ...log.Field) {

func (s *spanImpl) logFields(t time.Time, fields ...log.Field) {
for _, field := range fields {
if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(t, field.String())
}

s.zipkinSpan.Annotate(t, field.String())
}
}
Expand All @@ -110,14 +132,24 @@ func (s *spanImpl) Log(ld opentracing.LogData) {
ld.Timestamp = time.Now()
}

s.zipkinSpan.Annotate(ld.Timestamp, fmt.Sprintf("%s:%s", ld.Event, ld.Payload))
annotation := fmt.Sprintf("%s:%s", ld.Event, ld.Payload)

if s.zipkinObserver != nil {
s.zipkinObserver.OnAnnotate(ld.Timestamp, annotation)
}

s.zipkinSpan.Annotate(ld.Timestamp, annotation)
}

func (s *spanImpl) Finish() {
if s.observer != nil {
s.observer.OnFinish(opentracing.FinishOptions{})
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinish()
}

s.zipkinSpan.Finish()
}

Expand All @@ -135,10 +167,21 @@ func (s *spanImpl) FinishWithOptions(opts opentracing.FinishOptions) {
if !ok {
return
}
f.FinishedWithDuration(opts.FinishTime.Sub(s.startTime))

dur := opts.FinishTime.Sub(s.startTime)

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinishedWithDuration(dur)
}

f.FinishedWithDuration(dur)
return
}

if s.zipkinObserver != nil {
s.zipkinObserver.OnFinish()
}

s.Finish()
}

Expand Down
15 changes: 13 additions & 2 deletions tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp

zopts := make([]zipkin.SpanOption, 0)

var zipkinStartSpanOptions ZipkinStartSpanOptions

// Parent
if len(startSpanOptions.References) > 0 {
parent, ok := (startSpanOptions.References[0].ReferencedContext).(SpanContext)
if ok {
zopts = append(zopts, zipkin.Parent(model.SpanContext(parent)))
zipkinStartSpanOptions.Parent = (*model.SpanContext)(&parent)
}
}

Expand All @@ -71,9 +74,10 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp
if !startSpanOptions.StartTime.IsZero() {
zopts = append(zopts, zipkin.StartTime(startSpanOptions.StartTime))
startTime = startSpanOptions.StartTime
zipkinStartSpanOptions.StartTime = startSpanOptions.StartTime
}

zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags)...)
zopts = append(zopts, parseTagsAsZipkinOptions(startSpanOptions.Tags, &zipkinStartSpanOptions)...)

newSpan := t.zipkinTracer.StartSpan(operationName, zopts...)

Expand All @@ -87,10 +91,14 @@ func (t *tracerImpl) StartSpan(operationName string, opts ...opentracing.StartSp
sp.observer = observer
}

if t.opts.zipkinObserver != nil {
sp.zipkinObserver = t.opts.zipkinObserver.OnStartSpan(sp.zipkinSpan, operationName, zipkinStartSpanOptions)
}

return sp
}

func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
func parseTagsAsZipkinOptions(t map[string]interface{}, options *ZipkinStartSpanOptions) []zipkin.SpanOption {
zopts := make([]zipkin.SpanOption, 0)

tags := map[string]string{}
Expand All @@ -112,6 +120,7 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {
mKind == model.Producer ||
mKind == model.Consumer {
zopts = append(zopts, zipkin.Kind(mKind))
options.Kind = mKind
} else {
tags["span.kind"] = kind
}
Expand Down Expand Up @@ -151,10 +160,12 @@ func parseTagsAsZipkinOptions(t map[string]interface{}) []zipkin.SpanOption {

if len(tags) > 0 {
zopts = append(zopts, zipkin.Tags(tags))
options.Tags = tags
}

if !remoteEndpoint.Empty() {
zopts = append(zopts, zipkin.RemoteEndpoint(remoteEndpoint))
options.RemoteEndpoint = remoteEndpoint
}

return zopts
Expand Down
12 changes: 10 additions & 2 deletions tracer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (

// TracerOptions allows creating a customized Tracer.
type TracerOptions struct {
observer otobserver.Observer
b3InjectOpt B3InjectOption
observer otobserver.Observer
b3InjectOpt B3InjectOption
zipkinObserver ZipkinObserver
}

// TracerOption allows for functional options.
Expand All @@ -46,6 +47,13 @@ func WithObserver(observer otobserver.Observer) TracerOption {
}
}

// WithZipkinObserver assigns an initialized zipkin observer to opts.zipkinObserver
func WithZipkinObserver(zipkinObserver ZipkinObserver) TracerOption {
return func(opts *TracerOptions) {
opts.zipkinObserver = zipkinObserver
}
}

// WithB3InjectOption sets the B3 injection style if using the native OpenTracing HTTPHeadersCarrier
func WithB3InjectOption(b3InjectOption B3InjectOption) TracerOption {
return func(opts *TracerOptions) {
Expand Down
54 changes: 47 additions & 7 deletions tracer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ func TestOTKindTagIsParsedSuccessfuly(t *testing.T) {
{"span.kind": ext.SpanKindRPCServerEnum},
}
for _, tags := range tagCases {
opts := parseTagsAsZipkinOptions(tags)
var zipkinStartSpanOptions ZipkinStartSpanOptions

opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions)

rec := recorder.NewReporter()
tr, _ := zipkin.NewTracer(rec)
Expand All @@ -52,12 +54,18 @@ func TestOTKindTagIsParsedSuccessfuly(t *testing.T) {
if want, have := model.Server, spans[0].Kind; want != have {
t.Errorf("unexpected kind value, want %s, have %s", want, have)
}

if want, have := model.Server, zipkinStartSpanOptions.Kind; want != have {
t.Errorf("unexpected start options kind value, want %s, have %s", want, have)
}
}
}

func TestOTKindTagIsCantBeParsed(t *testing.T) {
var zipkinStartSpanOptions ZipkinStartSpanOptions

tags := map[string]interface{}{"span.kind": "banana"}
opts := parseTagsAsZipkinOptions(tags)
opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions)

rec := recorder.NewReporter()
tr, _ := zipkin.NewTracer(rec)
Expand All @@ -75,13 +83,29 @@ func TestOTKindTagIsCantBeParsed(t *testing.T) {
if want, have := "banana", spans[0].Tags["span.kind"]; want != have {
t.Errorf("unexpected tag value, want %s, have %s", want, have)
}

if zipkinStartSpanOptions.Tags == nil {
t.Errorf("unexpected start options tags value, want non-nil map, have %v", zipkinStartSpanOptions.Tags)
}

if want, have := "banana", zipkinStartSpanOptions.Tags["span.kind"]; want != have {
t.Errorf("unexpected start options tags[span.kind] value, want %s, have %s", want, have)
}
}

func TestOptionsFromOTTags(t *testing.T) {
var zipkinStartSpanOptions ZipkinStartSpanOptions

const (
sServiceA = "service_a"
sValue = "value"
sKey = "key"
)

tags := map[string]interface{}{}
tags[string(ext.PeerService)] = "service_a"
tags["key"] = "value"
opts := parseTagsAsZipkinOptions(tags)
tags[string(ext.PeerService)] = sServiceA
tags[sKey] = sValue
opts := parseTagsAsZipkinOptions(tags, &zipkinStartSpanOptions)

rec := recorder.NewReporter()
tr, _ := zipkin.NewTracer(rec)
Expand All @@ -92,11 +116,27 @@ func TestOptionsFromOTTags(t *testing.T) {
t.Fatalf("unexpected number of spans, want %d, have %d", want, have)
}

if want, have := "service_a", spans[0].RemoteEndpoint.ServiceName; want != have {
if want, have := sServiceA, spans[0].RemoteEndpoint.ServiceName; want != have {
t.Errorf("unexpected remote service name, want %s, have %s", want, have)
}

if want, have := "value", spans[0].Tags["key"]; want != have {
if want, have := sValue, spans[0].Tags[sKey]; want != have {
t.Errorf("unexpected tag value, want %s, have %s", want, have)
}

if zipkinStartSpanOptions.Tags == nil {
t.Errorf("unexpected start options tags value, want non-nil map, have %s", zipkinStartSpanOptions.Tags)
}

if want, have := sValue, zipkinStartSpanOptions.Tags[sKey]; want != have {
t.Errorf("unexpected start options tags[key] value, want %s, have %s", want, have)
}

if zipkinStartSpanOptions.RemoteEndpoint == nil {
t.Errorf("unexpected start options remote endpoint value, want non-nil instance, have %v", zipkinStartSpanOptions.RemoteEndpoint)
}

if want, have := sServiceA, zipkinStartSpanOptions.RemoteEndpoint.ServiceName; want != have {
t.Errorf("unexpected start options remote service name, want %s, have %s", want, have)
}
}

0 comments on commit e36ac10

Please sign in to comment.