Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ForceFlush() method to SpanProcessor interface #1166

Merged
merged 6 commits into from
Sep 20, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- In the `go.opentelemetry.io/otel/api/trace` package, `NewTracerConfig` was added to construct new `TracerConfig`s.
This addition was made to conform with our project option conventions. (#1155)
- Instrumentation library information was added to the Zipkin exporter. (#1119)
- The `SpanProcessor` interface now has a `ForceFlush()` method. (#1166)
- More semantic conventions for k8s as resource attributes. (#1167)

### Changed
Expand Down
30 changes: 23 additions & 7 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ type BatchSpanProcessor struct {
queue chan *export.SpanData
dropped uint32

batch []*export.SpanData
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
batch []*export.SpanData
batchMutex sync.Mutex
timer *time.Timer
stopWait sync.WaitGroup
stopOnce sync.Once
stopCh chan struct{}
}

var _ SpanProcessor = (*BatchSpanProcessor)(nil)
Expand Down Expand Up @@ -131,6 +132,11 @@ func (bsp *BatchSpanProcessor) Shutdown() {
})
}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *BatchSpanProcessor) ForceFlush() {
bsp.exportSpans()
}

func WithMaxQueueSize(size int) BatchSpanProcessorOption {
return func(o *BatchSpanProcessorOptions) {
o.MaxQueueSize = size
Expand Down Expand Up @@ -159,6 +165,9 @@ func WithBlocking() BatchSpanProcessorOption {
func (bsp *BatchSpanProcessor) exportSpans() {
bsp.timer.Reset(bsp.o.BatchTimeout)

bsp.batchMutex.Lock()
defer bsp.batchMutex.Unlock()

if len(bsp.batch) > 0 {
if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil {
global.Handle(err)
Expand All @@ -180,8 +189,11 @@ func (bsp *BatchSpanProcessor) processQueue() {
case <-bsp.timer.C:
bsp.exportSpans()
case sd := <-bsp.queue:
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()
if shouldExport {
if !bsp.timer.Stop() {
<-bsp.timer.C
}
Expand All @@ -202,8 +214,12 @@ func (bsp *BatchSpanProcessor) drainQueue() {
return
}

bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
if len(bsp.batch) == bsp.o.MaxExportBatchSize {
shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
bsp.batchMutex.Unlock()

if shouldExport {
bsp.exportSpans()
}
default:
Expand Down
48 changes: 48 additions & 0 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
// These should not panic.
bsp.OnStart(&export.SpanData{})
bsp.OnEnd(&export.SpanData{})
bsp.ForceFlush()
bsp.Shutdown()
}

Expand Down Expand Up @@ -180,6 +181,53 @@ func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
}
}

func TestBatchSpanProcessorForceFlush(t *testing.T) {
option := testOption{
name: "ForceFlush()",
o: []sdktrace.BatchSpanProcessorOption{
sdktrace.WithBatchTimeout(10 * time.Second),
sdktrace.WithMaxQueueSize(2000),
sdktrace.WithMaxExportBatchSize(2000),
},
wantNumSpans: 205,
wantBatchCount: 1,
genNumSpans: 205,
}

te := testBatchExporter{}
tp := basicProvider(t)
ssp := createAndRegisterBatchSP(option, &te)
if ssp == nil {
t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
}
tp.RegisterSpanProcessor(ssp)
tr := tp.Tracer("BatchSpanProcessorWithOptions")

generateSpan(t, true, tr, option)

ssp.ForceFlush()

gotNumOfSpans := te.len()
if 0 == gotNumOfSpans {
t.Errorf("number of flushed spans is zero")
}

tp.UnregisterSpanProcessor(ssp)

gotNumOfSpans = te.len()
if option.wantNumSpans != gotNumOfSpans {
t.Errorf("number of exported span: got %+v, want %+v\n",
gotNumOfSpans, option.wantNumSpans)
}

gotBatchCount := te.getBatchCount()
if gotBatchCount < option.wantBatchCount {
t.Errorf("number batches: got %+v, want >= %+v\n",
gotBatchCount, option.wantBatchCount)
t.Errorf("Batches %v\n", te.sizes)
}
}

func createAndRegisterBatchSP(option testOption, te *testBatchExporter) *sdktrace.BatchSpanProcessor {
// Always use blocking queue to avoid flaky tests.
options := append(option.o, sdktrace.WithBlocking())
Expand Down
4 changes: 4 additions & 0 deletions sdk/trace/simple_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,7 @@ func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) {
// Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown() {
}

// ForceFlush does nothing as there is no data to flush.
func (ssp *SimpleSpanProcessor) ForceFlush() {
}
6 changes: 6 additions & 0 deletions sdk/trace/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type SpanProcessor interface {
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
// made. It should not be blocked indefinitely.
Shutdown()

// ForceFlush exports all ended spans to the configured Exporter that have not yet
// been exported. It should only be called when absolutely necessary, such as when
// using a FaaS provider that may suspend the process after an invocation, but before
// the Processor can export the completed spans.
ForceFlush()
}

type spanProcessorMap map[SpanProcessor]*sync.Once
3 changes: 3 additions & 0 deletions sdk/trace/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ func (t *testSpanProcesor) Shutdown() {
t.shutdownCount++
}

func (t *testSpanProcesor) ForceFlush() {
}

func TestRegisterSpanProcessort(t *testing.T) {
name := "Register span processor before span starts"
tp := basicProvider(t)
Expand Down