Skip to content

Commit

Permalink
[exporterhelper] Fix potential deadlock in the batch sender (#10315)
Browse files Browse the repository at this point in the history
Concurrent handling of the flush timeouts can run into a deadlock when a
batch is simultaneously sent by reaching the minimum size and flush
timeout. The deadlock can happen on the following lines:
-
https://github.com/open-telemetry/opentelemetry-collector/blob/115bc8e28e009ca93565dc4deb4cf6608fa63622/exporter/exporterhelper/batch_sender.go#L131
-
https://github.com/open-telemetry/opentelemetry-collector/blob/115bc8e28e009ca93565dc4deb4cf6608fa63622/exporter/exporterhelper/batch_sender.go#L87

Co-authored-by: Carson Ip <[email protected]>
  • Loading branch information
dmitryax and carsonip authored Jun 5, 2024
1 parent 1e44a9c commit 9c3481b
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 18 deletions.
19 changes: 19 additions & 0 deletions .chloggen/batchseder-fix-potential-deadlock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix potential deadlock in the batch sender

# One or more tracking issues or pull requests related to the change
issues: [10315]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
change_logs: [user]
28 changes: 10 additions & 18 deletions exporter/exporterhelper/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ type batchSender struct {
concurrencyLimit uint64
activeRequests atomic.Uint64

resetTimerCh chan struct{}

mu sync.Mutex
activeBatch *batch
lastFlushed time.Time

logger *zap.Logger

Expand All @@ -57,7 +56,6 @@ func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings,
shutdownCh: nil,
shutdownCompleteCh: make(chan struct{}),
stopped: &atomic.Bool{},
resetTimerCh: make(chan struct{}),
}
return bs
}
Expand Down Expand Up @@ -85,16 +83,17 @@ func (bs *batchSender) Start(_ context.Context, _ component.Host) error {
return
case <-timer.C:
bs.mu.Lock()
nextFlush := bs.cfg.FlushTimeout
if bs.activeBatch.request != nil {
bs.exportActiveBatch()
sinceLastFlush := time.Since(bs.lastFlushed)
if sinceLastFlush >= bs.cfg.FlushTimeout {
bs.exportActiveBatch()
} else {
nextFlush = bs.cfg.FlushTimeout - sinceLastFlush
}
}
bs.mu.Unlock()
timer.Reset(bs.cfg.FlushTimeout)
case <-bs.resetTimerCh:
if !timer.Stop() {
<-timer.C
}
timer.Reset(bs.cfg.FlushTimeout)
timer.Reset(nextFlush)
}
}
}()
Expand Down Expand Up @@ -123,15 +122,10 @@ func (bs *batchSender) exportActiveBatch() {
b.err = bs.nextSender.send(b.ctx, b.request)
close(b.done)
}(bs.activeBatch)
bs.lastFlushed = time.Now()
bs.activeBatch = newEmptyBatch()
}

func (bs *batchSender) resetTimer() {
if !bs.stopped.Load() {
bs.resetTimerCh <- struct{}{}
}
}

// isActiveBatchReady returns true if the active batch is ready to be exported.
// The batch is ready if it has reached the minimum size or the concurrency limit is reached.
// Caller must hold the lock.
Expand Down Expand Up @@ -168,7 +162,6 @@ func (bs *batchSender) sendMergeSplitBatch(ctx context.Context, req Request) err
batch := bs.activeBatch
if bs.isActiveBatchReady() || len(reqs) > 1 {
bs.exportActiveBatch()
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
Expand Down Expand Up @@ -208,7 +201,6 @@ func (bs *batchSender) sendMergeBatch(ctx context.Context, req Request) error {
batch := bs.activeBatch
if bs.isActiveBatchReady() {
bs.exportActiveBatch()
bs.resetTimer()
}
bs.mu.Unlock()
<-batch.done
Expand Down
85 changes: 85 additions & 0 deletions exporter/exporterhelper/batch_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,91 @@ func TestBatchSenderWithTimeout(t *testing.T) {
assert.EqualValues(t, 12, sink.itemsCount.Load())
}

func TestBatchSenderTimerResetNoConflict(t *testing.T) {
delayBatchMergeFunc := func(_ context.Context, r1 Request, r2 Request) (Request, error) {
time.Sleep(30 * time.Millisecond)
if r1 == nil {
return r2, nil
}
fr1 := r1.(*fakeRequest)
fr2 := r2.(*fakeRequest)
if fr2.mergeErr != nil {
return nil, fr2.mergeErr
}
return &fakeRequest{
items: fr1.items + fr2.items,
sink: fr1.sink,
exportErr: fr2.exportErr,
delay: fr1.delay + fr2.delay,
}, nil
}
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 8
bCfg.FlushTimeout = 50 * time.Millisecond
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc)))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
sink := newFakeRequestSink()

// Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()
time.Sleep(30 * time.Millisecond)
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()

// The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
assert.EqualValues(c, 8, sink.itemsCount.Load())
}, 200*time.Millisecond, 10*time.Millisecond)

require.NoError(t, be.Shutdown(context.Background()))
}

func TestBatchSenderTimerFlush(t *testing.T) {
bCfg := exporterbatcher.NewDefaultConfig()
bCfg.MinSizeItems = 8
bCfg.FlushTimeout = 100 * time.Millisecond
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender,
WithBatcher(bCfg, WithRequestBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)))
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
sink := newFakeRequestSink()
time.Sleep(50 * time.Millisecond)

// Send 2 concurrent requests that should be merged in one batch and sent immediately
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()
assert.EventuallyWithT(t, func(c *assert.CollectT) {
assert.LessOrEqual(c, uint64(1), sink.requestsCount.Load())
assert.EqualValues(c, 8, sink.itemsCount.Load())
}, 30*time.Millisecond, 5*time.Millisecond)

// Send another request that should be flushed after 100ms instead of 50ms since last flush
go func() {
require.NoError(t, be.send(context.Background(), &fakeRequest{items: 4, sink: sink}))
}()

// Confirm that it is not flushed in 50ms
time.Sleep(60 * time.Millisecond)
assert.LessOrEqual(t, uint64(1), sink.requestsCount.Load())
assert.EqualValues(t, 8, sink.itemsCount.Load())

// Confirm that it is flushed after 100ms (using 60+50=110 here to be safe)
time.Sleep(50 * time.Millisecond)
assert.LessOrEqual(t, uint64(2), sink.requestsCount.Load())
assert.EqualValues(t, 12, sink.itemsCount.Load())
require.NoError(t, be.Shutdown(context.Background()))
}

func queueBatchExporter(t *testing.T, batchOption Option) *baseExporter {
be, err := newBaseExporter(defaultSettings, defaultDataType, newNoopObsrepSender, batchOption,
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]()))
Expand Down

0 comments on commit 9c3481b

Please sign in to comment.