Skip to content

Commit

Permalink
Add the bufferExporter
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Apr 1, 2024
1 parent 2f73208 commit 0d9af64
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 0 deletions.
111 changes: 111 additions & 0 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
Expand Down Expand Up @@ -127,3 +131,110 @@ func (e exportData) respond(err error) {
}
}
}

type bufferExporter struct {
Exporter

input chan exportData
inputMu sync.Mutex

done chan struct{}
stopped atomic.Bool
}

func newBufferExporter(exporter Exporter, size int) *bufferExporter {
input := make(chan exportData, size)
return &bufferExporter{
Exporter: exporter,

input: input,
done: exportSync(input, exporter),
}
}

var errStopped = errors.New("exporter stopped")

func (e *bufferExporter) enqueue(ctx context.Context, records []Record, rCh chan<- error) error {
data := exportData{ctx, records, rCh}

e.inputMu.Lock()
defer e.inputMu.Unlock()

// Check stopped before enqueueing now that e.inputMu is held. This
// prevents sends on a closed chan when Shutdown is called concurrently.
if e.stopped.Load() {
return errStopped
}

select {
case e.input <- data:
case <-ctx.Done():
return ctx.Err()

Check warning on line 172 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L171-L172

Added lines #L171 - L172 were not covered by tests
}
return nil
}

func (e *bufferExporter) EnqueueExport(ctx context.Context, records []Record) bool {
if len(records) == 0 {
// Nothing to enqueue, do not waste input space.
return true

Check warning on line 180 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L180

Added line #L180 was not covered by tests
}
return e.enqueue(ctx, records, nil) == nil
}

func (e *bufferExporter) Export(ctx context.Context, records []Record) error {
if len(records) == 0 {
return nil

Check warning on line 187 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L187

Added line #L187 was not covered by tests
}

resp := make(chan error, 1)
err := e.enqueue(ctx, records, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return fmt.Errorf("%w: dropping %d records", err, len(records))

Check warning on line 196 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L196

Added line #L196 was not covered by tests
}

select {
case err := <-resp:
return err
case <-ctx.Done():
return ctx.Err()

Check warning on line 203 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L202-L203

Added lines #L202 - L203 were not covered by tests
}
}

func (e *bufferExporter) ForceFlush(ctx context.Context) error {
resp := make(chan error, 1)
err := e.enqueue(ctx, nil, resp)
if err != nil {
if errors.Is(err, errStopped) {
return nil
}
return err

Check warning on line 214 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L214

Added line #L214 was not covered by tests
}

select {
case <-resp:
case <-ctx.Done():
return ctx.Err()

Check warning on line 220 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L217-L220

Added lines #L217 - L220 were not covered by tests
}
return e.Exporter.ForceFlush(ctx)

Check warning on line 222 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L222

Added line #L222 was not covered by tests
}

func (e *bufferExporter) Shutdown(ctx context.Context) error {
if e.stopped.Swap(true) {
return nil
}
e.inputMu.Lock()
defer e.inputMu.Unlock()

// No more sends will be made.
close(e.input)
select {
case <-e.done:
case <-ctx.Done():
return errors.Join(ctx.Err(), e.Shutdown(ctx))

Check warning on line 237 in sdk/log/exporter.go

View check run for this annotation

Codecov / codecov/patch

sdk/log/exporter.go#L236-L237

Added lines #L236 - L237 were not covered by tests
}
return e.Exporter.Shutdown(ctx)
}
53 changes: 53 additions & 0 deletions sdk/log/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,56 @@ func TestTimeoutExporter(t *testing.T) {
close(out)
})
}

func TestBufferExporter(t *testing.T) {
t.Run("ConcurrentSafe", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 10)

const goRoutines = 10
ctx := context.Background()
records := make([]Record, 10)

stop := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < goRoutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stop:
return
default:
_ = e.EnqueueExport(ctx, records)
_ = e.Export(ctx, records)
_ = e.ForceFlush(ctx)
}
}
}()
}

assert.Eventually(t, func() bool {
return exp.ExportN() > 0
}, 2*time.Second, time.Microsecond)

assert.NoError(t, e.Shutdown(ctx))
close(stop)
wg.Wait()
})

t.Run("Shutdown", func(t *testing.T) {
t.Run("Multiple", func(t *testing.T) {
exp := newTestExporter(nil)
t.Cleanup(exp.Stop)
e := newBufferExporter(exp, 10)

assert.NoError(t, e.Shutdown(context.Background()))
assert.Equal(t, 1, exp.ShutdownN(), "first Shutdown")

assert.NoError(t, e.Shutdown(context.Background()))
assert.Equal(t, 1, exp.ShutdownN(), "second Shutdown")
})
})
}

0 comments on commit 0d9af64

Please sign in to comment.