From 0d9af64de6d4c24d6b2b0384a41225a6c4a9ffd8 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 1 Apr 2024 09:23:08 -0700 Subject: [PATCH] Add the `bufferExporter` --- sdk/log/exporter.go | 111 +++++++++++++++++++++++++++++++++++++++ sdk/log/exporter_test.go | 53 +++++++++++++++++++ 2 files changed, 164 insertions(+) diff --git a/sdk/log/exporter.go b/sdk/log/exporter.go index 9f85f8a1fd92..90c4359a7169 100644 --- a/sdk/log/exporter.go +++ b/sdk/log/exporter.go @@ -5,6 +5,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( "context" + "errors" + "fmt" + "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel" @@ -127,3 +131,110 @@ func (e exportData) respond(err error) { } } } + +type bufferExporter struct { + Exporter + + input chan exportData + inputMu sync.Mutex + + done chan struct{} + stopped atomic.Bool +} + +func newBufferExporter(exporter Exporter, size int) *bufferExporter { + input := make(chan exportData, size) + return &bufferExporter{ + Exporter: exporter, + + input: input, + done: exportSync(input, exporter), + } +} + +var errStopped = errors.New("exporter stopped") + +func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error { + data := exportData{ctx, records, rCh} + + e.inputMu.Lock() + defer e.inputMu.Unlock() + + // Check stopped before enqueueing now that e.inputMu is held. This + // prevents sends on a closed chan when Shutdown is called concurrently. + if e.stopped.Load() { + return errStopped + } + + select { + case e.input <- data: + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +func (e *bufferExporter) EnqueueExport(ctx context.Context, records []Record) bool { + if len(records) == 0 { + // Nothing to enqueue, do not waste input space. + return true + } + return e.enqueue(ctx, records, nil) == nil +} + +func (e *bufferExporter) Export(ctx context.Context, records []Record) error { + if len(records) == 0 { + return nil + } + + resp := make(chan error, 1) + err := e.enqueue(ctx, records, resp) + if err != nil { + if errors.Is(err, errStopped) { + return nil + } + return fmt.Errorf("%w: dropping %d records", err, len(records)) + } + + select { + case err := <-resp: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +func (e *bufferExporter) ForceFlush(ctx context.Context) error { + resp := make(chan error, 1) + err := e.enqueue(ctx, nil, resp) + if err != nil { + if errors.Is(err, errStopped) { + return nil + } + return err + } + + select { + case <-resp: + case <-ctx.Done(): + return ctx.Err() + } + return e.Exporter.ForceFlush(ctx) +} + +func (e *bufferExporter) Shutdown(ctx context.Context) error { + if e.stopped.Swap(true) { + return nil + } + e.inputMu.Lock() + defer e.inputMu.Unlock() + + // No more sends will be made. + close(e.input) + select { + case <-e.done: + case <-ctx.Done(): + return errors.Join(ctx.Err(), e.Shutdown(ctx)) + } + return e.Exporter.Shutdown(ctx) +} diff --git a/sdk/log/exporter_test.go b/sdk/log/exporter_test.go index 3c37b83ad384..778281ee7803 100644 --- a/sdk/log/exporter_test.go +++ b/sdk/log/exporter_test.go @@ -243,3 +243,56 @@ func TestTimeoutExporter(t *testing.T) { close(out) }) } + +func TestBufferExporter(t *testing.T) { + t.Run("ConcurrentSafe", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 10) + + const goRoutines = 10 + ctx := context.Background() + records := make([]Record, 10) + + stop := make(chan struct{}) + var wg sync.WaitGroup + for i := 0; i < goRoutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stop: + return + default: + _ = e.EnqueueExport(ctx, records) + _ = e.Export(ctx, records) + _ = e.ForceFlush(ctx) + } + } + }() + } + + assert.Eventually(t, func() bool { + return exp.ExportN() > 0 + }, 2*time.Second, time.Microsecond) + + assert.NoError(t, e.Shutdown(ctx)) + close(stop) + wg.Wait() + }) + + t.Run("Shutdown", func(t *testing.T) { + t.Run("Multiple", func(t *testing.T) { + exp := newTestExporter(nil) + t.Cleanup(exp.Stop) + e := newBufferExporter(exp, 10) + + assert.NoError(t, e.Shutdown(context.Background())) + assert.Equal(t, 1, exp.ShutdownN(), "first Shutdown") + + assert.NoError(t, e.Shutdown(context.Background())) + assert.Equal(t, 1, exp.ShutdownN(), "second Shutdown") + }) + }) +}