Skip to content

Commit

Permalink
Add an option to configure the exporter buffer of the BatchProcessor (#…
Browse files Browse the repository at this point in the history
…5877)

resolve: #5238

---------

Co-authored-by: Tyler Yahn <[email protected]>
Co-authored-by: Damien Mathieu <[email protected]>
  • Loading branch information
3 people authored Oct 10, 2024
1 parent eb9279b commit a7d5c1a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `TraceBasedFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747, #5862)
- Add `WithExportBufferSize` option to log batch processor.(#5877)

### Changed

Expand Down
21 changes: 19 additions & 2 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const (
dfltExpInterval = time.Second
dfltExpTimeout = 30 * time.Second
dfltExpMaxBatchSize = 512
dfltExpBufferSize = 1

envarMaxQSize = "OTEL_BLRP_MAX_QUEUE_SIZE"
envarExpInterval = "OTEL_BLRP_SCHEDULE_DELAY"
Expand Down Expand Up @@ -119,8 +120,7 @@ func NewBatchProcessor(exporter Exporter, opts ...BatchProcessorOption) *BatchPr
exporter = newChunkExporter(exporter, cfg.expMaxBatchSize.Value)

b := &BatchProcessor{
// TODO: explore making the size of this configurable.
exporter: newBufferExporter(exporter, 1),
exporter: newBufferExporter(exporter, cfg.expBufferSize.Value),

q: newQueue(cfg.maxQSize.Value),
batchSize: cfg.expMaxBatchSize.Value,
Expand Down Expand Up @@ -349,6 +349,7 @@ type batchConfig struct {
expInterval setting[time.Duration]
expTimeout setting[time.Duration]
expMaxBatchSize setting[int]
expBufferSize setting[int]
}

func newBatchConfig(options []BatchProcessorOption) batchConfig {
Expand Down Expand Up @@ -382,6 +383,10 @@ func newBatchConfig(options []BatchProcessorOption) batchConfig {
clampMax[int](c.maxQSize.Value),
fallback[int](dfltExpMaxBatchSize),
)
c.expBufferSize = c.expBufferSize.Resolve(
clearLessThanOne[int](),
fallback[int](dfltExpBufferSize),
)

return c
}
Expand Down Expand Up @@ -458,3 +463,15 @@ func WithExportMaxBatchSize(size int) BatchProcessorOption {
return cfg
})
}

// WithExportBufferSize sets the batch buffer size.
// Batches will be temporarily kept in a memory buffer until they are exported.
//
// By default, a value of 1 will be used.
// The default value is also used when the provided value is less than one.
func WithExportBufferSize(size int) BatchProcessorOption {
return batchOptionFunc(func(cfg batchConfig) batchConfig {
cfg.expBufferSize = newSetting(size)
return cfg
})
}
11 changes: 11 additions & 0 deletions sdk/log/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestNewBatchConfig(t *testing.T) {
expInterval: newSetting(dfltExpInterval),
expTimeout: newSetting(dfltExpTimeout),
expMaxBatchSize: newSetting(dfltExpMaxBatchSize),
expBufferSize: newSetting(dfltExpBufferSize),
},
},
{
Expand All @@ -79,12 +80,14 @@ func TestNewBatchConfig(t *testing.T) {
WithExportInterval(time.Microsecond),
WithExportTimeout(time.Hour),
WithExportMaxBatchSize(2),
WithExportBufferSize(3),
},
want: batchConfig{
maxQSize: newSetting(10),
expInterval: newSetting(time.Microsecond),
expTimeout: newSetting(time.Hour),
expMaxBatchSize: newSetting(2),
expBufferSize: newSetting(3),
},
},
{
Expand All @@ -100,6 +103,7 @@ func TestNewBatchConfig(t *testing.T) {
expInterval: newSetting(100 * time.Millisecond),
expTimeout: newSetting(1000 * time.Millisecond),
expMaxBatchSize: newSetting(1),
expBufferSize: newSetting(dfltExpBufferSize),
},
},
{
Expand All @@ -109,12 +113,14 @@ func TestNewBatchConfig(t *testing.T) {
WithExportInterval(-1 * time.Microsecond),
WithExportTimeout(-1 * time.Hour),
WithExportMaxBatchSize(-2),
WithExportBufferSize(-2),
},
want: batchConfig{
maxQSize: newSetting(dfltMaxQSize),
expInterval: newSetting(dfltExpInterval),
expTimeout: newSetting(dfltExpTimeout),
expMaxBatchSize: newSetting(dfltExpMaxBatchSize),
expBufferSize: newSetting(dfltExpBufferSize),
},
},
{
Expand All @@ -130,6 +136,7 @@ func TestNewBatchConfig(t *testing.T) {
expInterval: newSetting(dfltExpInterval),
expTimeout: newSetting(dfltExpTimeout),
expMaxBatchSize: newSetting(dfltExpMaxBatchSize),
expBufferSize: newSetting(dfltExpBufferSize),
},
},
{
Expand All @@ -146,25 +153,29 @@ func TestNewBatchConfig(t *testing.T) {
WithExportInterval(time.Microsecond),
WithExportTimeout(time.Hour),
WithExportMaxBatchSize(2),
WithExportBufferSize(2),
},
want: batchConfig{
maxQSize: newSetting(3),
expInterval: newSetting(time.Microsecond),
expTimeout: newSetting(time.Hour),
expMaxBatchSize: newSetting(2),
expBufferSize: newSetting(2),
},
},
{
name: "BatchLessThanOrEqualToQSize",
options: []BatchProcessorOption{
WithMaxQueueSize(1),
WithExportMaxBatchSize(10),
WithExportBufferSize(3),
},
want: batchConfig{
maxQSize: newSetting(1),
expInterval: newSetting(dfltExpInterval),
expTimeout: newSetting(dfltExpTimeout),
expMaxBatchSize: newSetting(1),
expBufferSize: newSetting(3),
},
},
}
Expand Down

0 comments on commit a7d5c1a

Please sign in to comment.