Skip to content

Commit

Permalink
Fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Apr 3, 2024
1 parent 2bfbbc6 commit e712689
Showing 1 changed file with 18 additions and 93 deletions.
111 changes: 18 additions & 93 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"container/ring"
"context"
"errors"
"slices"
Expand Down Expand Up @@ -117,7 +116,7 @@ func (b *BatchingProcessor) poll(interval time.Duration) (done chan struct{}) {
return
}

qLen := b.q.TryFlush(buf, func(r []Record) bool {
qLen := b.q.TryDequeue(buf, func(r []Record) bool {
ok := b.exporter.EnqueueExport(r)
if ok {
buf = slices.Clone(buf)
Expand Down Expand Up @@ -193,6 +192,9 @@ func (b *BatchingProcessor) ForceFlush(ctx context.Context) error {
}

// queue holds a queue of logging records.
//
// When the queue becomes full, the oldest records in the queue are
// overwritten.
type queue struct {
sync.Mutex

Expand All @@ -211,6 +213,9 @@ func newQueue(size int) *queue {

// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
//
// If enqueueing r will exceed the capacity of q, the oldest Record held in q
// will be dropped and r retained.
func (q *queue) Enqueue(r Record) int {
q.Lock()
defer q.Unlock()
Expand All @@ -227,12 +232,15 @@ func (q *queue) Enqueue(r Record) int {
return q.len
}

// TryFlush attempts to flush up to len(buf) Records. The available Records
// will be assigned into buf and passed to flush. If flush fails, returning
// false, the Records will not be removed from the queue. If flush succeeds,
// returning true, the flushed Records are removed from the queue. The number
// TryDequeue attempts to dequeue up to len(buf) Records. The available Records
// will be assigned into buf and passed to write. If write fails, returning
// false, the Records will not be removed from the queue. If write succeeds,
// returning true, the dequeued Records are removed from the queue. The number
// of Records remaining in the queue are returned.
func (q *queue) TryFlush(buf []Record, flush func([]Record) bool) int {
//
// When write is called the lock of q is held. The write function must not call
// other methods of this q that acquire the lock.
func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {
q.Lock()
defer q.Unlock()

Expand All @@ -244,14 +252,16 @@ func (q *queue) TryFlush(buf []Record, flush func([]Record) bool) int {
q.read = q.read.Next()
}

if flush(buf[:n]) {
if write(buf[:n]) {
q.len -= n
} else {
q.read = origRead
}
return q.len
}

// Flush returns all the Records held in the queue and resets it to be
// empty.
func (q *queue) Flush() []Record {
q.Lock()
defer q.Unlock()
Expand Down Expand Up @@ -340,91 +350,6 @@ func (r *ring) Do(f func(Record)) {
}
}

// queue holds a queue of logging records.
//
// When the queue becomes full, the oldest records in the queue are
// overwritten.
type queue struct {
sync.Mutex

cap, len int
read, write *ring.Ring
}

func newQueue(size int) *queue {
r := ring.New(size)
return &queue{
cap: size,
read: r,
write: r,
}
}

// Enqueue adds r to the queue. The queue size, including the addition of r, is
// returned.
//
// If enqueueing r will exceed the capacity of q, the oldest Record held in q
// will be dropped and r retained.
func (q *queue) Enqueue(r Record) int {
q.Lock()
defer q.Unlock()

q.write.Value = r
q.write = q.write.Next()

q.len++
if q.len > q.cap {
// Overflow. Advance read to be the new "oldest".
q.len = q.cap
q.read = q.read.Next()
}
return q.len
}

// TryDequeue attempts to dequeue up to len(buf) Records. The available Records
// will be assigned into buf and passed to write. If write fails, returning
// false, the Records will not be removed from the queue. If write succeeds,
// returning true, the dequeued Records are removed from the queue. The number
// of Records remaining in the queue are returned.
//
// When write is called the lock of q is held. The write function must not call
// other methods of this q that acquire the lock.
func (q *queue) TryDequeue(buf []Record, write func([]Record) bool) int {
q.Lock()
defer q.Unlock()

origRead := q.read

n := min(len(buf), q.len)
for i := 0; i < n; i++ {
buf[i] = q.read.Value.(Record)
q.read = q.read.Next()
}

if write(buf[:n]) {
q.len -= n
} else {
q.read = origRead
}
return q.len
}

// Flush returns all the Records held in the queue and resets it to be
// empty.
func (q *queue) Flush() []Record {
q.Lock()
defer q.Unlock()

out := make([]Record, q.len)
for i := range out {
out[i] = q.read.Value.(Record)
q.read = q.read.Next()
}
q.len = 0

return out
}

type batchingConfig struct {
maxQSize setting[int]
expInterval setting[time.Duration]
Expand Down

0 comments on commit e712689

Please sign in to comment.