Skip to content

Commit

Permalink
[chore] [exporter/signalfx] Remove redundant traces shim (open-teleme…
Browse files Browse the repository at this point in the history
…try#31058)

Also, remove `ActiveServiceTracker.spansProcessed` field as redundant
dependency on the shim
  • Loading branch information
dmitryax authored Feb 6, 2024
1 parent 633aea2 commit 0dc5c1f
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 325 deletions.
2 changes: 1 addition & 1 deletion exporter/signalfxexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func createTracesExporter(
ctx,
set,
cfg,
tracker.AddSpans,
tracker.ProcessTraces,
exporterhelper.WithStart(tracker.Start),
exporterhelper.WithShutdown(tracker.Shutdown))
}
Expand Down
57 changes: 0 additions & 57 deletions exporter/signalfxexporter/internal/apm/tracetracker/shims.go

This file was deleted.

61 changes: 34 additions & 27 deletions exporter/signalfxexporter/internal/apm/tracetracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ package tracetracker // import "github.com/open-telemetry/opentelemetry-collecto
import (
"context"
"strings"
"sync/atomic"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/correlations"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/log"
)
Expand Down Expand Up @@ -55,9 +58,6 @@ type ActiveServiceTracker struct {
// correlationClient is the client used for updating infrastructure correlation properties
correlationClient correlations.CorrelationClient

// Internal metrics
spansProcessed int64

// Map of dimensions to sync to with the key being the span attribute to lookup and the value being
// the dimension to sync to.
dimsToSyncSource map[string]string
Expand Down Expand Up @@ -119,29 +119,35 @@ func New(
return a
}

// AddSpansGeneric accepts a list of trace spans and uses them to update the
// ProcessTraces accepts a list of trace spans and uses them to update the
// current list of active services. This is thread-safe.
func (a *ActiveServiceTracker) AddSpansGeneric(_ context.Context, spans SpanList) {
func (a *ActiveServiceTracker) ProcessTraces(_ context.Context, traces ptrace.Traces) {
// Take current time once since this is a system call.
now := a.timeNow()

for i := 0; i < spans.Len(); i++ {
a.processEnvironment(spans.At(i), now)
a.processService(spans.At(i), now)
for i := 0; i < traces.ResourceSpans().Len(); i++ {
a.processEnvironment(traces.ResourceSpans().At(i).Resource(), now)
a.processService(traces.ResourceSpans().At(i).Resource(), now)
}

// Protected by lock above
atomic.AddInt64(&a.spansProcessed, int64(spans.Len()))
}

func (a *ActiveServiceTracker) processEnvironment(span Span, now time.Time) {
if span.NumTags() == 0 {
func (a *ActiveServiceTracker) processEnvironment(res pcommon.Resource, now time.Time) {
attrs := res.Attributes()
if attrs.Len() == 0 {
return
}
environment, environmentFound := span.Environment()

// If spans are coming in with no environment, we use the same fallback value that is being set on the backend.
if !environmentFound || strings.TrimSpace(environment) == "" {
// Determine the environment value from the incoming spans.
// First check "deployment.environment" attribute.
// Then, try "environment" attribute (SignalFx schema).
// Otherwise, use the same fallback value as set on the backend.
var environment string
if env, ok := attrs.Get(conventions.AttributeDeploymentEnvironment); ok {
environment = env.Str()
} else if env, ok = attrs.Get("environment"); ok {
environment = env.Str()
}
if strings.TrimSpace(environment) == "" {
environment = fallbackEnvironment
}

Expand Down Expand Up @@ -175,29 +181,30 @@ func (a *ActiveServiceTracker) processEnvironment(span Span, now time.Time) {
for sourceAttr, dimName := range a.dimsToSyncSource {
sourceAttr := sourceAttr
dimName := dimName
if dimValue, ok := span.Tag(sourceAttr); ok {
if val, ok := attrs.Get(sourceAttr); ok {
// Note that the value is not set on the cache key. We only send the first environment received for a
// given pod/container, and we never delete the values set on the container/pod dimension.
// So we only need to cache the dim name and dim value that have been associated with an environment.
if exists := a.tenantEnvironmentCache.UpdateIfExists(&CacheKey{dimName: dimName, dimValue: dimValue}, now); !exists {
if exists := a.tenantEnvironmentCache.UpdateIfExists(&CacheKey{dimName: dimName, dimValue: val.Str()}, now); !exists {
a.correlationClient.Correlate(&correlations.Correlation{
Type: correlations.Environment,
DimName: dimName,
DimValue: dimValue,
DimValue: val.Str(),
Value: environment,
}, func(cor *correlations.Correlation, err error) {
if err == nil {
a.tenantEnvironmentCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: dimValue}, now)
a.tenantEnvironmentCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: val.Str()}, now)
}
})
}
}
}
}

func (a *ActiveServiceTracker) processService(span Span, now time.Time) {
func (a *ActiveServiceTracker) processService(res pcommon.Resource, now time.Time) {
// Can't do anything if the spans don't have a local service name
service, ok := span.ServiceName()
serviceNameAttr, ok := res.Attributes().Get(conventions.AttributeServiceName)
service := serviceNameAttr.Str()
if !ok || service == "" {
return
}
Expand Down Expand Up @@ -234,19 +241,19 @@ func (a *ActiveServiceTracker) processService(span Span, now time.Time) {
for sourceAttr, dimName := range a.dimsToSyncSource {
sourceAttr := sourceAttr
dimName := dimName
if dimValue, ok := span.Tag(sourceAttr); ok {
if val, ok := res.Attributes().Get(sourceAttr); ok {
// Note that the value is not set on the cache key. We only send the first service received for a
// given pod/container, and we never delete the values set on the container/pod dimension.
// So we only need to cache the dim name and dim value that have been associated with a service.
if exists := a.tenantServiceCache.UpdateIfExists(&CacheKey{dimName: dimName, dimValue: dimValue}, now); !exists {
if exists := a.tenantServiceCache.UpdateIfExists(&CacheKey{dimName: dimName, dimValue: val.Str()}, now); !exists {
a.correlationClient.Correlate(&correlations.Correlation{
Type: correlations.Service,
DimName: dimName,
DimValue: dimValue,
DimValue: val.Str(),
Value: service,
}, func(cor *correlations.Correlation, err error) {
if err == nil {
a.tenantServiceCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: dimValue}, now)
a.tenantServiceCache.UpdateOrCreate(&CacheKey{dimName: dimName, dimValue: val.Str()}, now)
}
})
}
Expand Down
80 changes: 34 additions & 46 deletions exporter/signalfxexporter/internal/apm/tracetracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/correlations"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter/internal/apm/log"
Expand All @@ -26,17 +29,15 @@ func advanceTime(a *ActiveServiceTracker, minutes int64) {
a.timeNow = func() time.Time { return newNow }
}

// mergeStringMaps merges n maps with a later map's keys overriding earlier maps
func mergeStringMaps(maps ...map[string]string) map[string]string {
ret := map[string]string{}

// newResourceWithAttrs creates a new resource with the given attributes.
func newResourceWithAttrs(maps ...map[string]string) pcommon.Resource {
res := pcommon.NewResource()
for _, m := range maps {
for k, v := range m {
ret[k] = v
res.Attributes().PutStr(k, v)
}
}

return ret
return res
}

func TestExpiration(t *testing.T) {
Expand All @@ -46,32 +47,24 @@ func TestExpiration(t *testing.T) {
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, DefaultDimsToSyncSource)
setTime(a, time.Unix(100, 0))

a.AddSpansGeneric(context.Background(), fakeSpanList{
{
serviceName: "one",
tags: mergeStringMaps(hostIDDims, map[string]string{"environment": "environment1"}),
},
{
serviceName: "two",
tags: mergeStringMaps(hostIDDims, map[string]string{"environment": "environment2"}),
},
{
serviceName: "three",
tags: mergeStringMaps(hostIDDims, map[string]string{"environment": "environment3"}),
},
})
fakeTraces := ptrace.NewTraces()
newResourceWithAttrs(hostIDDims, map[string]string{conventions.AttributeServiceName: "one", "environment": "environment1"}).
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
newResourceWithAttrs(hostIDDims, map[string]string{conventions.AttributeServiceName: "two", "environment": "environment2"}).
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
newResourceWithAttrs(hostIDDims, map[string]string{conventions.AttributeServiceName: "three", "environment": "environment3"}).
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
a.ProcessTraces(context.Background(), fakeTraces)

assert.Equal(t, int64(3), a.hostServiceCache.ActiveCount, "activeServiceCount is not properly tracked")
assert.Equal(t, int64(3), a.hostEnvironmentCache.ActiveCount, "activeEnvironmentCount is not properly tracked")

advanceTime(a, 4)

a.AddSpansGeneric(context.Background(), fakeSpanList{
{
serviceName: "two",
tags: mergeStringMaps(hostIDDims, map[string]string{"environment": "environment2"}),
},
})
fakeTraces = ptrace.NewTraces()
newResourceWithAttrs(hostIDDims, map[string]string{conventions.AttributeServiceName: "two", "environment": "environment2"}).
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
a.ProcessTraces(context.Background(), fakeTraces)

advanceTime(a, 2)
a.Purge()
Expand Down Expand Up @@ -148,11 +141,12 @@ func TestCorrelationEmptyEnvironment(t *testing.T) {
a := New(log.Nil, 5*time.Minute, correlationClient, hostIDDims, DefaultDimsToSyncSource)
wg.Wait() // wait for the initial fetch of hostIDDims to complete

a.AddSpansGeneric(context.Background(), fakeSpanList{
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
{tags: mergeStringMaps(hostIDDims, containerLevelIDDims)},
})
fakeTraces := ptrace.NewTraces()
fakeResource := newResourceWithAttrs(hostIDDims, containerLevelIDDims)
fakeResource.CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
fakeResource.CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
fakeResource.CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
a.ProcessTraces(context.Background(), fakeTraces)

cors := correlationClient.getCorrelations()
assert.Equal(t, 4, len(cors), "expected 4 correlations to be made")
Expand Down Expand Up @@ -194,20 +188,14 @@ func TestCorrelationUpdates(t *testing.T) {

setTime(a, time.Unix(100, 0))

a.AddSpansGeneric(context.Background(), fakeSpanList{
{
serviceName: "one",
tags: mergeStringMaps(hostIDDims, mergeStringMaps(containerLevelIDDims, map[string]string{"environment": "environment1"})),
},
{
serviceName: "two",
tags: mergeStringMaps(hostIDDims, mergeStringMaps(containerLevelIDDims, map[string]string{"environment": "environment2"})),
},
{
serviceName: "three",
tags: mergeStringMaps(hostIDDims, mergeStringMaps(containerLevelIDDims, map[string]string{"environment": "environment3"})),
},
})
fakeTraces := ptrace.NewTraces()
newResourceWithAttrs(containerLevelIDDims, map[string]string{conventions.AttributeServiceName: "one", "environment": "environment1"}).
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
newResourceWithAttrs(containerLevelIDDims, map[string]string{conventions.AttributeServiceName: "two", "environment": "environment2"}).
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
newResourceWithAttrs(containerLevelIDDims, map[string]string{conventions.AttributeServiceName: "three", "environment": "environment3"}).
CopyTo(fakeTraces.ResourceSpans().AppendEmpty().Resource())
a.ProcessTraces(context.Background(), fakeTraces)

assert.Equal(t, int64(3), a.hostServiceCache.ActiveCount, "activeServiceCount is not properly tracked")
assert.Equal(t, int64(3), a.hostEnvironmentCache.ActiveCount, "activeEnvironmentCount is not properly tracked")
Expand Down
6 changes: 3 additions & 3 deletions exporter/signalfxexporter/internal/correlation/correlation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ func newCorrelationClient(cfg *Config, accessToken configopaque.String, params e
}, nil
}

// AddSpans processes the provided spans to correlate the services and environment observed
// ProcessTraces processes the provided spans to correlate the services and environment observed
// to the resources (host, pods, etc.) emitting the spans.
func (cor *Tracker) AddSpans(ctx context.Context, traces ptrace.Traces) error {
func (cor *Tracker) ProcessTraces(ctx context.Context, traces ptrace.Traces) error {
if cor == nil || traces.ResourceSpans().Len() == 0 {
return nil
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func (cor *Tracker) AddSpans(ctx context.Context, traces ptrace.Traces) error {
})

if cor.traceTracker != nil {
cor.traceTracker.AddSpansGeneric(ctx, spanListWrap{traces.ResourceSpans()})
cor.traceTracker.ProcessTraces(ctx, traces)
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func TestTrackerAddSpans(t *testing.T) {
attr.PutStr("host.name", "localhost")

// Add empty first, should ignore.
assert.NoError(t, tracker.AddSpans(context.Background(), ptrace.NewTraces()))
assert.NoError(t, tracker.ProcessTraces(context.Background(), ptrace.NewTraces()))
assert.Nil(t, tracker.traceTracker)

assert.NoError(t, tracker.AddSpans(context.Background(), traces))
assert.NoError(t, tracker.ProcessTraces(context.Background(), traces))

assert.NotNil(t, tracker.traceTracker, "trace tracker should be set")

Expand Down
Loading

0 comments on commit 0dc5c1f

Please sign in to comment.