Skip to content

Commit

Permalink
mergeBatchFunc moved to request
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-sili committed Oct 17, 2024
1 parent 527df61 commit 39122c8
Show file tree
Hide file tree
Showing 17 changed files with 276 additions and 389 deletions.
24 changes: 0 additions & 24 deletions exporter/exporterbatcher/batch_func.go

This file was deleted.

8 changes: 0 additions & 8 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,3 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
func WithBatcher(cfg exporterbatcher.Config) Option {
return internal.WithBatcher(cfg)
}

// WithBatchFuncs enables setting custom batch merge functions.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request],
msf exporterbatcher.BatchMergeSplitFunc[Request]) Option {
return internal.WithBatchFuncs(mf, msf)
}
1 change: 1 addition & 0 deletions exporter/exporterhelper/exporterhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import "go.opentelemetry.io/collector/exporter/internal"
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type Request = internal.Request
type BatchRequest = internal.BatchRequest

// RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial
// temporary failures. For example, if some items failed to process and can be retried, this interface allows to
Expand Down
18 changes: 1 addition & 17 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ type BaseExporter struct {

Signal pipeline.Signal

BatchMergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
BatchMergeSplitfunc exporterbatcher.BatchMergeSplitFunc[internal.Request]

Marshaler exporterqueue.Marshaler[internal.Request]
Unmarshaler exporterqueue.Unmarshaler[internal.Request]

Expand Down Expand Up @@ -104,10 +101,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe
}

if be.BatcherCfg.Enabled {
bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc)
if bs.mergeFunc == nil || bs.mergeSplitFunc == nil {
err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters"))
}
bs := NewBatchSender(be.BatcherCfg, be.Set)
be.BatchSender = bs
}

Expand Down Expand Up @@ -298,16 +292,6 @@ func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Op
}
}

// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters.
// It must be provided as the first option when creating a new exporter helper.
func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) Option {
return func(o *BaseExporter) error {
o.BatchMergeFunc = mf
o.BatchMergeSplitfunc = msf
return nil
}
}

func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
if err != nil {
require.Equal(t, codes.Error, sd.Status().Code, "SpanData %v", sd)
Expand Down
36 changes: 19 additions & 17 deletions exporter/exporterhelper/internal/batch_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
// - concurrencyLimit is reached.
type BatchSender struct {
BaseRequestSender
cfg exporterbatcher.Config
mergeFunc exporterbatcher.BatchMergeFunc[internal.Request]
mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request]
cfg exporterbatcher.Config

// concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher.
// If this number is reached and all the goroutines are busy, the batch will be sent right away.
Expand All @@ -46,14 +44,11 @@ type BatchSender struct {
}

// newBatchSender returns a new batch consumer component.
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings,
mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) *BatchSender {
func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender {
bs := &BatchSender{
activeBatch: newEmptyBatch(),
cfg: cfg,
logger: set.Logger,
mergeFunc: mf,
mergeSplitFunc: msf,
shutdownCh: nil,
shutdownCompleteCh: make(chan struct{}),
stopped: &atomic.Bool{},
Expand Down Expand Up @@ -104,7 +99,7 @@ func (bs *BatchSender) Start(_ context.Context, _ component.Host) error {

type batch struct {
ctx context.Context
request internal.Request
request internal.BatchRequest
done chan struct{}
err error

Expand Down Expand Up @@ -147,19 +142,26 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error {
}

if bs.cfg.MaxSizeItems > 0 {
return bs.sendMergeSplitBatch(ctx, req)
return bs.sendMergeSplitBatch(ctx, req.(internal.BatchRequest))
}
return bs.sendMergeBatch(ctx, req)
return bs.sendMergeBatch(ctx, req.(internal.BatchRequest))
}

// sendMergeSplitBatch sends the request to the batch which may be split into multiple requests.
func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error {
func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.BatchRequest) error {
bs.mu.Lock()

reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req)
if err != nil || len(reqs) == 0 {
var reqs []internal.BatchRequest
var mergeSplitErr error
if bs.activeBatch.request == nil {
reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil)
} else {
reqs, mergeSplitErr = bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req)
}

if mergeSplitErr != nil || len(reqs) == 0 {
bs.mu.Unlock()
return err
return mergeSplitErr
}

bs.activeRequests.Add(1)
Expand Down Expand Up @@ -196,12 +198,12 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Req
}

// sendMergeBatch sends the request to the batch and waits for the batch to be exported.
func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) error {
func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.BatchRequest) error {
bs.mu.Lock()

if bs.activeBatch.request != nil {
var err error
req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req)
req, err = bs.activeBatch.request.Merge(ctx, req)
if err != nil {
bs.mu.Unlock()
return err
Expand All @@ -224,7 +226,7 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request)
// The context is only set once and is not updated after the first call.
// Merging the context would be complex and require an additional goroutine to handle the context cancellation.
// We take the approach of using the context from the first request since it's likely to have the shortest timeout.
func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.Request) {
func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.BatchRequest) {
if bs.activeBatch.request == nil {
bs.activeBatch.ctx = ctx
}
Expand Down
Loading

0 comments on commit 39122c8

Please sign in to comment.