Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add export sync #5105

Merged
merged 7 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions sdk/log/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"

"go.opentelemetry.io/otel"
)

// Exporter handles the delivery of log records to external receivers.
Expand Down Expand Up @@ -50,3 +52,51 @@ func (noopExporter) Export(context.Context, []Record) error { return nil }
func (noopExporter) Shutdown(context.Context) error { return nil }

func (noopExporter) ForceFlush(context.Context) error { return nil }

// exportSync exports all data from input using exporter in a spawned
// goroutine. The returned chan will be closed when the spawned goroutine
// completes.
func exportSync(input <-chan exportData, exporter Exporter) (done chan struct{}) {
done = make(chan struct{})
go func() {
defer close(done)
for data := range input {
data.DoExport(exporter.Export)
}
}()
return done
}

// exportData is data related to an export.
type exportData struct {
ctx context.Context
records []Record

// respCh is the channel any error returned from the export will be sent
// on. If this is nil, and the export error is non-nil, the error will
// passed to the OTel error handler.
respCh chan<- error
}

// DoExport calls exportFn with the data contained in e. The error response
// will be returned on e's respCh if not nil. The error will be handled by the
// default OTel error handle if it is not nil and respCh is nil or full.
func (e exportData) DoExport(exportFn func(context.Context, []Record) error) {
dashpole marked this conversation as resolved.
Show resolved Hide resolved
if len(e.records) == 0 {
e.respond(nil)
return
}

e.respond(exportFn(e.ctx, e.records))
}

func (e exportData) respond(err error) {
select {
case e.respCh <- err:
default:
// e.respCh is nil or busy, default to otel.Handler.
if err != nil {
otel.Handle(err)
}
}
}
198 changes: 198 additions & 0 deletions sdk/log/exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package log

import (
"context"
"slices"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/log"
)

type instruction struct {
Record *[]Record
Flush chan [][]Record
}

type testExporter struct {
// Err is the error returned by all methods of the testExporter.
Err error

// Counts of method calls.
exportN, shutdownN, forceFlushN *int32

input chan instruction
done chan struct{}
}

func newTestExporter(err error) *testExporter {
e := &testExporter{
Err: err,
exportN: new(int32),
shutdownN: new(int32),
forceFlushN: new(int32),
input: make(chan instruction),
}
e.done = run(e.input)

return e
}

func run(input chan instruction) chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)

var records [][]Record
for in := range input {
if in.Record != nil {
records = append(records, *in.Record)
}
if in.Flush != nil {
cp := slices.Clone(records)
records = records[:0]
in.Flush <- cp
}
}
}()
return done
}

func (e *testExporter) Records() [][]Record {
out := make(chan [][]Record, 1)
e.input <- instruction{Flush: out}
return <-out
}

func (e *testExporter) Export(ctx context.Context, r []Record) error {
atomic.AddInt32(e.exportN, 1)
e.input <- instruction{Record: &r}
return e.Err
}

func (e *testExporter) ExportN() int {
return int(atomic.LoadInt32(e.exportN))
}

func (e *testExporter) Stop() {
close(e.input)
<-e.done
}

func (e *testExporter) Shutdown(ctx context.Context) error {
atomic.AddInt32(e.shutdownN, 1)
return e.Err
}

func (e *testExporter) ShutdownN() int {
return int(atomic.LoadInt32(e.shutdownN))
}

func (e *testExporter) ForceFlush(ctx context.Context) error {
atomic.AddInt32(e.forceFlushN, 1)
return e.Err
}

func (e *testExporter) ForceFlushN() int {
return int(atomic.LoadInt32(e.forceFlushN))
}

func TestExportSync(t *testing.T) {
eventuallyDone := func(t *testing.T, done chan struct{}) {
assert.Eventually(t, func() bool {
select {
case <-done:
return true
default:
return false
}
}, 2*time.Second, time.Microsecond)
}

t.Run("ErrorHandler", func(t *testing.T) {
var got error
handler := otel.ErrorHandlerFunc(func(err error) { got = err })
otel.SetErrorHandler(handler)

in := make(chan exportData, 1)
exp := newTestExporter(assert.AnError)
t.Cleanup(exp.Stop)
done := exportSync(in, exp)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

in <- exportData{
ctx: context.Background(),
records: make([]Record, 1),
}
}()

wg.Wait()
close(in)
eventuallyDone(t, done)

assert.ErrorIs(t, got, assert.AnError, "error not passed to ErrorHandler")
})

t.Run("ConcurrentSafe", func(t *testing.T) {
in := make(chan exportData, 1)
exp := newTestExporter(assert.AnError)
t.Cleanup(exp.Stop)
done := exportSync(in, exp)

const goRoutines = 10
var wg sync.WaitGroup
wg.Add(goRoutines)
for i := 0; i < goRoutines; i++ {
go func(n int) {
defer wg.Done()

var r Record
r.SetBody(log.IntValue(n))

resp := make(chan error, 1)
in <- exportData{
ctx: context.Background(),
records: []Record{r},
respCh: resp,
}

assert.ErrorIs(t, <-resp, assert.AnError)
}(i)
}

// Empty records should be ignored.
in <- exportData{ctx: context.Background()}

wg.Wait()

close(in)
eventuallyDone(t, done)

assert.Equal(t, goRoutines, exp.ExportN(), "Export calls")

want := make([]log.Value, goRoutines)
for i := range want {
want[i] = log.IntValue(i)
}
records := exp.Records()
got := make([]log.Value, len(records))
for i := range got {
if assert.Len(t, records[i], 1, "number of records exported") {
got[i] = records[i][0].Body()
}
}
assert.ElementsMatch(t, want, got, "record bodies")
})
}