From e712689e5e01d32417630e165784f44641997acd Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Wed, 3 Apr 2024 07:43:36 -0700 Subject: [PATCH] Fix merge --- sdk/log/batch.go | 111 ++++++++--------------------------------------- 1 file changed, 18 insertions(+), 93 deletions(-) diff --git a/sdk/log/batch.go b/sdk/log/batch.go index bddb979ef75..d915dfb963a 100644 --- a/sdk/log/batch.go +++ b/sdk/log/batch.go @@ -8,7 +8,6 @@ package log // import "go.opentelemetry.io/otel/sdk/log" import ( - "container/ring" "context" "errors" "slices" @@ -117,7 +116,7 @@ func (b *BatchingProcessor) poll(interval time.Duration) (done chan struct{}) { return } - qLen := b.q.TryFlush(buf, func(r []Record) bool { + qLen := b.q.TryDequeue(buf, func(r []Record) bool { ok := b.exporter.EnqueueExport(r) if ok { buf = slices.Clone(buf) @@ -193,6 +192,9 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error { } // queue holds a queue of logging records. +// +// When the queue becomes full, the oldest records in the queue are +// overwritten. type queue struct { sync.Mutex @@ -211,6 +213,9 @@ func newQueue(size int) *queue { // Enqueue adds r to the queue. The queue size, including the addition of r, is // returned. +// +// If enqueueing r will exceed the capacity of q, the oldest Record held in q +// will be dropped and r retained. func (q *queue) Enqueue(r Record) int { q.Lock() defer q.Unlock() @@ -227,12 +232,15 @@ func (q *queue) Enqueue(r Record) int { return q.len } -// TryFlush attempts to flush up to len(buf) Records. The available Records -// will be assigned into buf and passed to flush. If flush fails, returning -// false, the Records will not be removed from the queue. If flush succeeds, -// returning true, the flushed Records are removed from the queue. The number +// TryDequeue attempts to dequeue up to len(buf) Records. The available Records +// will be assigned into buf and passed to write. If write fails, returning +// false, the Records will not be removed from the queue. If write succeeds, +// returning true, the dequeued Records are removed from the queue. The number // of Records remaining in the queue are returned. -func (q *queue) TryFlush(buf []Record, flush func([]Record) bool) int { +// +// When write is called the lock of q is held. The write function must not call +// other methods of this q that acquire the lock. +func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { q.Lock() defer q.Unlock() @@ -244,7 +252,7 @@ func (q *queue) TryFlush(buf []Record, flush func([]Record) bool) int { q.read = q.read.Next() } - if flush(buf[:n]) { + if write(buf[:n]) { q.len -= n } else { q.read = origRead @@ -252,6 +260,8 @@ func (q *queue) TryFlush(buf []Record, flush func([]Record) bool) int { return q.len } +// Flush returns all the Records held in the queue and resets it to be +// empty. func (q *queue) Flush() []Record { q.Lock() defer q.Unlock() @@ -340,91 +350,6 @@ func (r *ring) Do(f func(Record)) { } } -// queue holds a queue of logging records. -// -// When the queue becomes full, the oldest records in the queue are -// overwritten. -type queue struct { - sync.Mutex - - cap, len int - read, write *ring.Ring -} - -func newQueue(size int) *queue { - r := ring.New(size) - return &queue{ - cap: size, - read: r, - write: r, - } -} - -// Enqueue adds r to the queue. The queue size, including the addition of r, is -// returned. -// -// If enqueueing r will exceed the capacity of q, the oldest Record held in q -// will be dropped and r retained. -func (q *queue) Enqueue(r Record) int { - q.Lock() - defer q.Unlock() - - q.write.Value = r - q.write = q.write.Next() - - q.len++ - if q.len > q.cap { - // Overflow. Advance read to be the new "oldest". - q.len = q.cap - q.read = q.read.Next() - } - return q.len -} - -// TryDequeue attempts to dequeue up to len(buf) Records. The available Records -// will be assigned into buf and passed to write. If write fails, returning -// false, the Records will not be removed from the queue. If write succeeds, -// returning true, the dequeued Records are removed from the queue. The number -// of Records remaining in the queue are returned. -// -// When write is called the lock of q is held. The write function must not call -// other methods of this q that acquire the lock. -func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int { - q.Lock() - defer q.Unlock() - - origRead := q.read - - n := min(len(buf), q.len) - for i := 0; i < n; i++ { - buf[i] = q.read.Value.(Record) - q.read = q.read.Next() - } - - if write(buf[:n]) { - q.len -= n - } else { - q.read = origRead - } - return q.len -} - -// Flush returns all the Records held in the queue and resets it to be -// empty. -func (q *queue) Flush() []Record { - q.Lock() - defer q.Unlock() - - out := make([]Record, q.len) - for i := range out { - out[i] = q.read.Value.(Record) - q.read = q.read.Next() - } - q.len = 0 - - return out -} - type batchingConfig struct { maxQSize setting[int] expInterval setting[time.Duration]