Skip to content

Commit

Permalink
(batchprocessor): Multi/single-shard consistent start method and fiel…
Browse files Browse the repository at this point in the history
…d names (#11314)

#### Description
There are two implementations of the `batcher` interface with several
inconsistencies. Notably, the single-shard case would start a goroutine
before `Start()` is called, which can be confusing when the goroutine
leak checker notices it. This makes the single-shard batcher wait until
Start() to create its single shard. A confusing field name `batcher` in
this case becomes `single`, and a new field is added for use in start
named `processor` to create the new shard.

For the multi-shard batcher, `start()` does nothing, but this structure
confusingly embeds the batchProcessor. Rename the field `processor` for
consistency with the single-shard case.

#### Link to tracking issue
Part of #11308.
  • Loading branch information
jmacd authored Oct 1, 2024
1 parent 3c7c005 commit dd054f1
Showing 1 changed file with 33 additions and 14 deletions.
47 changes: 33 additions & 14 deletions processor/batchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,19 @@ type batchProcessor struct {

telemetry *batchProcessorTelemetry

// batcher will be either *singletonBatcher or *multiBatcher
// batcher will be either *singletonBatcher or *multiBatcher
batcher batcher
}

// batcher is describes a *singletonBatcher or *multiBatcher.
type batcher interface {
// start initializes background resources used by this batcher.
start(ctx context.Context) error

// consume incorporates a new item of data into the pending batch.
consume(ctx context.Context, data any) error

// currentMetadataCardinality returns the number of shards.
currentMetadataCardinality() int
}

Expand Down Expand Up @@ -135,12 +142,13 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
metadataLimit: int(cfg.MetadataCardinalityLimit),
}
if len(bp.metadataKeys) == 0 {
s := bp.newShard(nil)
s.start()
bp.batcher = &singleShardBatcher{batcher: s}
bp.batcher = &singleShardBatcher{
processor: bp,
single: nil, // created in start
}
} else {
bp.batcher = &multiShardBatcher{
batchProcessor: bp,
processor: bp,
}
}

Expand Down Expand Up @@ -172,8 +180,8 @@ func (bp *batchProcessor) Capabilities() consumer.Capabilities {
}

// Start is invoked during service startup.
func (bp *batchProcessor) Start(context.Context, component.Host) error {
return nil
func (bp *batchProcessor) Start(ctx context.Context, _ component.Host) error {
return bp.batcher.start(ctx)
}

// Shutdown is invoked during service shutdown.
Expand Down Expand Up @@ -281,11 +289,18 @@ func (b *shard) sendItems(trigger trigger) {
// singleShardBatcher is used when metadataKeys is empty, to avoid the
// additional lock and map operations used in multiBatcher.
type singleShardBatcher struct {
batcher *shard
processor *batchProcessor
single *shard
}

func (sb *singleShardBatcher) start(context.Context) error {
sb.single = sb.processor.newShard(nil)
sb.single.start()
return nil
}

func (sb *singleShardBatcher) consume(_ context.Context, data any) error {
sb.batcher.newItem <- data
sb.single.newItem <- data
return nil
}

Expand All @@ -295,22 +310,26 @@ func (sb *singleShardBatcher) currentMetadataCardinality() int {

// multiBatcher is used when metadataKeys is not empty.
type multiShardBatcher struct {
*batchProcessor
batchers sync.Map
processor *batchProcessor
batchers sync.Map

// Guards the size and the storing logic to ensure no more than limit items are stored.
// If we are willing to allow "some" extra items than the limit this can be removed and size can be made atomic.
lock sync.Mutex
size int
}

func (mb *multiShardBatcher) start(context.Context) error {
return nil
}

func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
// Get each metadata key value, form the corresponding
// attribute set for use as a map lookup key.
info := client.FromContext(ctx)
md := map[string][]string{}
var attrs []attribute.KeyValue
for _, k := range mb.metadataKeys {
for _, k := range mb.processor.metadataKeys {
// Lookup the value in the incoming metadata, copy it
// into the outgoing metadata, and create a unique
// value for the attributeSet.
Expand All @@ -327,15 +346,15 @@ func (mb *multiShardBatcher) consume(ctx context.Context, data any) error {
b, ok := mb.batchers.Load(aset)
if !ok {
mb.lock.Lock()
if mb.metadataLimit != 0 && mb.size >= mb.metadataLimit {
if mb.processor.metadataLimit != 0 && mb.size >= mb.processor.metadataLimit {
mb.lock.Unlock()
return errTooManyBatchers
}

// aset.ToSlice() returns the sorted, deduplicated,
// and name-downcased list of attributes.
var loaded bool
b, loaded = mb.batchers.LoadOrStore(aset, mb.newShard(md))
b, loaded = mb.batchers.LoadOrStore(aset, mb.processor.newShard(md))
if !loaded {
// Start the goroutine only if we added the object to the map, otherwise is already started.
b.(*shard).start()
Expand Down

0 comments on commit dd054f1

Please sign in to comment.