From 39122c8390622760ebd20ad269fd161bfcdc411d Mon Sep 17 00:00:00 2001 From: Sindy Li Date: Wed, 16 Oct 2024 01:06:04 -0700 Subject: [PATCH] mergeBatchFunc moved to request --- exporter/exporterbatcher/batch_func.go | 24 -- exporter/exporterhelper/common.go | 8 - exporter/exporterhelper/exporterhelper.go | 1 + .../exporterhelper/internal/base_exporter.go | 18 +- .../exporterhelper/internal/batch_sender.go | 36 +- .../internal/batch_sender_test.go | 352 ++++++------------ exporter/exporterhelper/internal/request.go | 70 ++++ exporter/exporterhelper/logs.go | 1 - exporter/exporterhelper/logs_batch.go | 15 +- exporter/exporterhelper/logs_batch_test.go | 35 +- exporter/exporterhelper/metrics.go | 1 - exporter/exporterhelper/metrics_batch.go | 15 +- exporter/exporterhelper/metrics_batch_test.go | 34 +- exporter/exporterhelper/traces.go | 1 - exporter/exporterhelper/traces_batch.go | 15 +- exporter/exporterhelper/traces_batch_test.go | 27 +- exporter/internal/request.go | 12 + 17 files changed, 276 insertions(+), 389 deletions(-) delete mode 100644 exporter/exporterbatcher/batch_func.go diff --git a/exporter/exporterbatcher/batch_func.go b/exporter/exporterbatcher/batch_func.go deleted file mode 100644 index 0298276ba7b..00000000000 --- a/exporter/exporterbatcher/batch_func.go +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exporterbatcher // import "go.opentelemetry.io/collector/exporter/exporterbatcher" - -import "context" - -// BatchMergeFunc is a function that merges two requests into a single request. -// Do not mutate the requests passed to the function if error can be returned after mutation or if the exporter is -// marked as not mutable. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type BatchMergeFunc[T any] func(context.Context, T, T) (T, error) - -// BatchMergeSplitFunc is a function that merge and/or splits one or two requests into multiple requests based on the -// configured limit provided in MaxSizeConfig. -// All the returned requests MUST have a number of items that does not exceed the maximum number of items. -// Size of the last returned request MUST be less or equal than the size of any other returned request. -// The original request MUST not be mutated if error is returned after mutation or if the exporter is -// marked as not mutable. The length of the returned slice MUST not be 0. The optionalReq argument can be nil, -// make sure to check it before using. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type BatchMergeSplitFunc[T any] func(ctx context.Context, cfg MaxSizeConfig, optionalReq T, req T) ([]T, error) diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index 7f396f40776..ab1f0db4e0b 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -69,11 +69,3 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { func WithBatcher(cfg exporterbatcher.Config) Option { return internal.WithBatcher(cfg) } - -// WithBatchFuncs enables setting custom batch merge functions. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], - msf exporterbatcher.BatchMergeSplitFunc[Request]) Option { - return internal.WithBatchFuncs(mf, msf) -} diff --git a/exporter/exporterhelper/exporterhelper.go b/exporter/exporterhelper/exporterhelper.go index d9e90d821d9..488e26a00b6 100644 --- a/exporter/exporterhelper/exporterhelper.go +++ b/exporter/exporterhelper/exporterhelper.go @@ -8,6 +8,7 @@ import "go.opentelemetry.io/collector/exporter/internal" // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type Request = internal.Request +type BatchRequest = internal.BatchRequest // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial // temporary failures. For example, if some items failed to process and can be retried, this interface allows to diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 1aebb318c8f..763972e793e 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -35,9 +35,6 @@ type BaseExporter struct { Signal pipeline.Signal - BatchMergeFunc exporterbatcher.BatchMergeFunc[internal.Request] - BatchMergeSplitfunc exporterbatcher.BatchMergeSplitFunc[internal.Request] - Marshaler exporterqueue.Marshaler[internal.Request] Unmarshaler exporterqueue.Unmarshaler[internal.Request] @@ -104,10 +101,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, osf ObsrepSe } if be.BatcherCfg.Enabled { - bs := NewBatchSender(be.BatcherCfg, be.Set, be.BatchMergeFunc, be.BatchMergeSplitfunc) - if bs.mergeFunc == nil || bs.mergeSplitFunc == nil { - err = multierr.Append(err, fmt.Errorf("WithRequestBatchFuncs must be provided for the batcher applied to the request-based exporters")) - } + bs := NewBatchSender(be.BatcherCfg, be.Set) be.BatchSender = bs } @@ -298,16 +292,6 @@ func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[internal.Request]) Op } } -// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters. -// It must be provided as the first option when creating a new exporter helper. -func WithBatchFuncs(mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) Option { - return func(o *BaseExporter) error { - o.BatchMergeFunc = mf - o.BatchMergeSplitfunc = msf - return nil - } -} - func CheckStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { if err != nil { require.Equal(t, codes.Error, sd.Status().Code, "SpanData %v", sd) diff --git a/exporter/exporterhelper/internal/batch_sender.go b/exporter/exporterhelper/internal/batch_sender.go index 65d7e0965f7..68f9da4a3ec 100644 --- a/exporter/exporterhelper/internal/batch_sender.go +++ b/exporter/exporterhelper/internal/batch_sender.go @@ -24,9 +24,7 @@ import ( // - concurrencyLimit is reached. type BatchSender struct { BaseRequestSender - cfg exporterbatcher.Config - mergeFunc exporterbatcher.BatchMergeFunc[internal.Request] - mergeSplitFunc exporterbatcher.BatchMergeSplitFunc[internal.Request] + cfg exporterbatcher.Config // concurrencyLimit is the maximum number of goroutines that can be blocked by the batcher. // If this number is reached and all the goroutines are busy, the batch will be sent right away. @@ -46,14 +44,11 @@ type BatchSender struct { } // newBatchSender returns a new batch consumer component. -func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings, - mf exporterbatcher.BatchMergeFunc[internal.Request], msf exporterbatcher.BatchMergeSplitFunc[internal.Request]) *BatchSender { +func NewBatchSender(cfg exporterbatcher.Config, set exporter.Settings) *BatchSender { bs := &BatchSender{ activeBatch: newEmptyBatch(), cfg: cfg, logger: set.Logger, - mergeFunc: mf, - mergeSplitFunc: msf, shutdownCh: nil, shutdownCompleteCh: make(chan struct{}), stopped: &atomic.Bool{}, @@ -104,7 +99,7 @@ func (bs *BatchSender) Start(_ context.Context, _ component.Host) error { type batch struct { ctx context.Context - request internal.Request + request internal.BatchRequest done chan struct{} err error @@ -147,19 +142,26 @@ func (bs *BatchSender) Send(ctx context.Context, req internal.Request) error { } if bs.cfg.MaxSizeItems > 0 { - return bs.sendMergeSplitBatch(ctx, req) + return bs.sendMergeSplitBatch(ctx, req.(internal.BatchRequest)) } - return bs.sendMergeBatch(ctx, req) + return bs.sendMergeBatch(ctx, req.(internal.BatchRequest)) } // sendMergeSplitBatch sends the request to the batch which may be split into multiple requests. -func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Request) error { +func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.BatchRequest) error { bs.mu.Lock() - reqs, err := bs.mergeSplitFunc(ctx, bs.cfg.MaxSizeConfig, bs.activeBatch.request, req) - if err != nil || len(reqs) == 0 { + var reqs []internal.BatchRequest + var mergeSplitErr error + if bs.activeBatch.request == nil { + reqs, mergeSplitErr = req.MergeSplit(ctx, bs.cfg.MaxSizeConfig, nil) + } else { + reqs, mergeSplitErr = bs.activeBatch.request.MergeSplit(ctx, bs.cfg.MaxSizeConfig, req) + } + + if mergeSplitErr != nil || len(reqs) == 0 { bs.mu.Unlock() - return err + return mergeSplitErr } bs.activeRequests.Add(1) @@ -196,12 +198,12 @@ func (bs *BatchSender) sendMergeSplitBatch(ctx context.Context, req internal.Req } // sendMergeBatch sends the request to the batch and waits for the batch to be exported. -func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) error { +func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.BatchRequest) error { bs.mu.Lock() if bs.activeBatch.request != nil { var err error - req, err = bs.mergeFunc(ctx, bs.activeBatch.request, req) + req, err = bs.activeBatch.request.Merge(ctx, req) if err != nil { bs.mu.Unlock() return err @@ -224,7 +226,7 @@ func (bs *BatchSender) sendMergeBatch(ctx context.Context, req internal.Request) // The context is only set once and is not updated after the first call. // Merging the context would be complex and require an additional goroutine to handle the context cancellation. // We take the approach of using the context from the first request since it's likely to have the shortest timeout. -func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.Request) { +func (bs *BatchSender) updateActiveBatch(ctx context.Context, req internal.BatchRequest) { if bs.activeBatch.request == nil { bs.activeBatch.ctx = ctx } diff --git a/exporter/exporterhelper/internal/batch_sender_test.go b/exporter/exporterhelper/internal/batch_sender_test.go index f6d53bca0e0..f75febca205 100644 --- a/exporter/exporterhelper/internal/batch_sender_test.go +++ b/exporter/exporterhelper/internal/batch_sender_test.go @@ -47,7 +47,7 @@ func TestBatchSender_Merge(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + be := queueBatchExporter(t, tt.batcherOption) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -117,7 +117,7 @@ func TestBatchSender_BatchExportError(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - be := queueBatchExporter(t, tt.batcherOption, WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + be := queueBatchExporter(t, tt.batcherOption) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -153,7 +153,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { cfg.MinSizeItems = 5 cfg.MaxSizeItems = 10 cfg.FlushTimeout = 100 * time.Millisecond - be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + be := queueBatchExporter(t, WithBatcher(cfg)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -170,7 +170,6 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { // big request should be broken down into two requests, both are sent right away. require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 17, sink: sink})) - assert.Eventually(t, func() bool { return sink.requestsCount.Load() == 3 && sink.itemsCount.Load() == 25 }, 50*time.Millisecond, 10*time.Millisecond) @@ -190,7 +189,7 @@ func TestBatchSender_MergeOrSplit(t *testing.T) { func TestBatchSender_Shutdown(t *testing.T) { batchCfg := exporterbatcher.NewDefaultConfig() batchCfg.MinSizeItems = 10 - be := queueBatchExporter(t, WithBatcher(batchCfg), WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc)) + be := queueBatchExporter(t, WithBatcher(batchCfg)) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -212,7 +211,6 @@ func TestBatchSender_Disabled(t *testing.T) { cfg.Enabled = false cfg.MaxSizeItems = 5 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(cfg)) require.NotNil(t, be) require.NoError(t, err) @@ -229,39 +227,38 @@ func TestBatchSender_Disabled(t *testing.T) { assert.Equal(t, int64(8), sink.itemsCount.Load()) } -func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { - invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ internal.Request, req2 internal.Request) ([]internal.Request, - error) { - // reply with invalid 0 length slice if req2 is more than 20 items - if req2.(*fakeRequest).items > 20 { - return []internal.Request{}, nil - } - // otherwise reply with a single request. - return []internal.Request{req2}, nil - } - cfg := exporterbatcher.NewDefaultConfig() - cfg.FlushTimeout = 50 * time.Millisecond - cfg.MaxSizeItems = 20 - be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)) - - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - t.Cleanup(func() { - require.NoError(t, be.Shutdown(context.Background())) - }) - - sink := newFakeRequestSink() - // first request should be ignored due to invalid merge/split function. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 30, sink: sink})) - // second request should be sent after reaching the timeout. - require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 15, sink: sink})) - assert.Eventually(t, func() bool { - return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 - }, 100*time.Millisecond, 10*time.Millisecond) -} +// func TestBatchSender_InvalidMergeSplitFunc(t *testing.T) { +// invalidMergeSplitFunc := func(_ context.Context, _ exporterbatcher.MaxSizeConfig, _ internal.Request, req2 internal.Request) ([]internal.Request, +// error) { +// // reply with invalid 0 length slice if req2 is more than 20 items +// if req2.(*fakeRequest).items > 20 { +// return []internal.Request{}, nil +// } +// // otherwise reply with a single request. +// return []internal.Request{req2}, nil +// } +// cfg := exporterbatcher.NewDefaultConfig() +// cfg.FlushTimeout = 50 * time.Millisecond +// cfg.MaxSizeItems = 20 +// be := queueBatchExporter(t, WithBatcher(cfg), WithBatchFuncs(fakeBatchMergeFunc, invalidMergeSplitFunc)) + +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// t.Cleanup(func() { +// require.NoError(t, be.Shutdown(context.Background())) +// }) + +// sink := newFakeRequestSink() +// // first request should be ignored due to invalid merge/split function. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 30, sink: sink})) +// // second request should be sent after reaching the timeout. +// require.NoError(t, be.Send(context.Background(), &fakeRequest{items: 15, sink: sink})) +// assert.Eventually(t, func() bool { +// return sink.requestsCount.Load() == 1 && sink.itemsCount.Load() == 15 +// }, 100*time.Millisecond, 10*time.Millisecond) +// } func TestBatchSender_PostShutdown(t *testing.T) { be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())) require.NotNil(t, be) require.NoError(t, err) @@ -323,7 +320,6 @@ func TestBatchSender_ConcurrencyLimitReached(t *testing.T) { qCfg := exporterqueue.NewDefaultConfig() qCfg.NumConsumers = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(tt.batcherCfg), WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[internal.Request]())) require.NotNil(t, be) @@ -379,7 +375,6 @@ func TestBatchSender_BatchBlocking(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 3 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) @@ -410,7 +405,6 @@ func TestBatchSender_BatchCancelled(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) @@ -446,7 +440,6 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() bCfg.MinSizeItems = 2 be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg)) require.NotNil(t, be) require.NoError(t, err) @@ -476,45 +469,8 @@ func TestBatchSender_DrainActiveRequests(t *testing.T) { assert.Equal(t, int64(3), sink.itemsCount.Load()) } -func TestBatchSender_WithBatcherOption(t *testing.T) { - tests := []struct { - name string - opts []Option - expectedErr bool - }{ - { - name: "no_funcs_set", - opts: []Option{WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: true, - }, - { - name: "funcs_set_internally", - opts: []Option{WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: false, - }, - { - name: "nil_funcs", - opts: []Option{WithBatchFuncs(nil, nil), WithBatcher(exporterbatcher.NewDefaultConfig())}, - expectedErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, tt.opts...) - if tt.expectedErr { - assert.Nil(t, be) - assert.Error(t, err) - } else { - assert.NotNil(t, be) - assert.NoError(t, err) - } - }) - } -} - func TestBatchSender_UnstartedShutdown(t *testing.T) { be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(exporterbatcher.NewDefaultConfig())) require.NoError(t, err) @@ -524,51 +480,50 @@ func TestBatchSender_UnstartedShutdown(t *testing.T) { // TestBatchSender_ShutdownDeadlock tests that the exporter does not deadlock when shutting down while a batch is being // merged. -func TestBatchSender_ShutdownDeadlock(t *testing.T) { - blockMerge := make(chan struct{}) - waitMerge := make(chan struct{}, 10) - - // blockedBatchMergeFunc blocks until the blockMerge channel is closed - blockedBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { - waitMerge <- struct{}{} - <-blockMerge - r1.(*fakeRequest).items += r2.(*fakeRequest).items - return r1, nil - } - - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(blockedBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(bCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - - sink := newFakeRequestSink() - - // Send 2 concurrent requests - go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() - - // Wait for the requests to enter the merge function - <-waitMerge - - // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, - // then wait for the exporter to finish. - startShutdown := make(chan struct{}) - doneShutdown := make(chan struct{}) - go func() { - close(startShutdown) - assert.NoError(t, be.Shutdown(context.Background())) - close(doneShutdown) - }() - <-startShutdown - close(blockMerge) - <-doneShutdown - - assert.EqualValues(t, 1, sink.requestsCount.Load()) - assert.EqualValues(t, 8, sink.itemsCount.Load()) -} +// func TestBatchSender_ShutdownDeadlock(t *testing.T) { +// blockMerge := make(chan struct{}) +// waitMerge := make(chan struct{}, 10) + +// // blockedBatchMergeFunc blocks until the blockMerge channel is closed +// blockedBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { +// waitMerge <- struct{}{} +// <-blockMerge +// r1.(*fakeRequest).items += r2.(*fakeRequest).items +// return r1, nil +// } + +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.FlushTimeout = 10 * time.Minute // high timeout to avoid the timeout to trigger +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg)) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) + +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests +// go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() +// go func() { assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) }() + +// // Wait for the requests to enter the merge function +// <-waitMerge + +// // Initiate the exporter shutdown, unblock the batch merge function to catch possible deadlocks, +// // then wait for the exporter to finish. +// startShutdown := make(chan struct{}) +// doneShutdown := make(chan struct{}) +// go func() { +// close(startShutdown) +// assert.NoError(t, be.Shutdown(context.Background())) +// close(doneShutdown) +// }() +// <-startShutdown +// close(blockMerge) +// <-doneShutdown + +// assert.EqualValues(t, 1, sink.requestsCount.Load()) +// assert.EqualValues(t, 8, sink.itemsCount.Load()) +// } func TestBatchSenderWithTimeout(t *testing.T) { bCfg := exporterbatcher.NewDefaultConfig() @@ -576,7 +531,6 @@ func TestBatchSenderWithTimeout(t *testing.T) { tCfg := NewDefaultTimeoutConfig() tCfg.Timeout = 50 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg), WithTimeout(tCfg)) require.NoError(t, err) @@ -614,51 +568,50 @@ func TestBatchSenderWithTimeout(t *testing.T) { assert.EqualValues(t, 12, sink.itemsCount.Load()) } -func TestBatchSenderTimerResetNoConflict(t *testing.T) { - delayBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { - time.Sleep(30 * time.Millisecond) - if r1 == nil { - return r2, nil - } - fr1 := r1.(*fakeRequest) - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: fr1.items + fr2.items, - sink: fr1.sink, - exportErr: fr2.exportErr, - delay: fr1.delay + fr2.delay, - }, nil - } - bCfg := exporterbatcher.NewDefaultConfig() - bCfg.MinSizeItems = 8 - bCfg.FlushTimeout = 50 * time.Millisecond - be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(delayBatchMergeFunc, fakeBatchMergeSplitFunc), - WithBatcher(bCfg)) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - sink := newFakeRequestSink() - - // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - time.Sleep(30 * time.Millisecond) - go func() { - assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) - }() - - // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict - assert.EventuallyWithT(t, func(c *assert.CollectT) { - assert.LessOrEqual(c, int64(1), sink.requestsCount.Load()) - assert.EqualValues(c, 8, sink.itemsCount.Load()) - }, 200*time.Millisecond, 10*time.Millisecond) - - require.NoError(t, be.Shutdown(context.Background())) -} +// func TestBatchSenderTimerResetNoConflict(t *testing.T) { +// delayBatchMergeFunc := func(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { +// time.Sleep(30 * time.Millisecond) +// if r1 == nil { +// return r2, nil +// } +// fr1 := r1.(*fakeRequest) +// fr2 := r2.(*fakeRequest) +// if fr2.mergeErr != nil { +// return nil, fr2.mergeErr +// } +// return &fakeRequest{ +// items: fr1.items + fr2.items, +// sink: fr1.sink, +// exportErr: fr2.exportErr, +// delay: fr1.delay + fr2.delay, +// }, nil +// } +// bCfg := exporterbatcher.NewDefaultConfig() +// bCfg.MinSizeItems = 8 +// bCfg.FlushTimeout = 50 * time.Millisecond +// be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, +// WithBatcher(bCfg)) +// require.NoError(t, err) +// require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) +// sink := newFakeRequestSink() + +// // Send 2 concurrent requests that should be merged in one batch in the same interval as the flush timer +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() +// time.Sleep(30 * time.Millisecond) +// go func() { +// assert.NoError(t, be.Send(context.Background(), &fakeRequest{items: 4, sink: sink})) +// }() + +// // The batch should be sent either with the flush interval or by reaching the minimum items size with no conflict +// assert.EventuallyWithT(t, func(c *assert.CollectT) { +// assert.LessOrEqual(c, int64(1), sink.requestsCount.Load()) +// assert.EqualValues(c, 8, sink.itemsCount.Load()) +// }, 200*time.Millisecond, 10*time.Millisecond) + +// require.NoError(t, be.Shutdown(context.Background())) +// } func TestBatchSenderTimerFlush(t *testing.T) { if runtime.GOOS == "windows" { @@ -668,7 +621,6 @@ func TestBatchSenderTimerFlush(t *testing.T) { bCfg.MinSizeItems = 8 bCfg.FlushTimeout = 100 * time.Millisecond be, err := NewBaseExporter(defaultSettings, defaultSignal, newNoopObsrepSender, - WithBatchFuncs(fakeBatchMergeFunc, fakeBatchMergeSplitFunc), WithBatcher(bCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -711,65 +663,3 @@ func queueBatchExporter(t *testing.T, opts ...Option) *BaseExporter { require.NoError(t, err) return be } - -func fakeBatchMergeFunc(_ context.Context, r1 internal.Request, r2 internal.Request) (internal.Request, error) { - if r1 == nil { - return r2, nil - } - fr1 := r1.(*fakeRequest) - fr2 := r2.(*fakeRequest) - if fr2.mergeErr != nil { - return nil, fr2.mergeErr - } - return &fakeRequest{ - items: fr1.items + fr2.items, - sink: fr1.sink, - exportErr: fr2.exportErr, - delay: fr1.delay + fr2.delay, - }, nil -} - -func fakeBatchMergeSplitFunc(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, r1 internal.Request, r2 internal.Request) ([]internal.Request, error) { - maxItems := cfg.MaxSizeItems - if maxItems == 0 { - r, err := fakeBatchMergeFunc(ctx, r1, r2) - return []internal.Request{r}, err - } - - if r2.(*fakeRequest).mergeErr != nil { - return nil, r2.(*fakeRequest).mergeErr - } - - fr2 := r2.(*fakeRequest) - fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay} - var res []internal.Request - - // fill fr1 to maxItems if it's not nil - if r1 != nil { - fr1 := r1.(*fakeRequest) - fr1 = &fakeRequest{items: fr1.items, sink: fr1.sink, exportErr: fr1.exportErr, delay: fr1.delay} - if fr2.items <= maxItems-fr1.items { - fr1.items += fr2.items - if fr2.exportErr != nil { - fr1.exportErr = fr2.exportErr - } - return []internal.Request{fr1}, nil - } - // if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases - fr2.items -= maxItems - fr1.items - fr1.items = maxItems - res = append(res, fr1) - } - - // split fr2 to maxItems - for { - if fr2.items <= maxItems { - res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) - break - } - res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) - fr2.items -= maxItems - } - - return res, nil -} diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 33a1915c65d..d1578d854ca 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/exporter/exporterbatcher" "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -54,6 +55,75 @@ func (r *fakeRequest) ItemsCount() int { return r.items } +func (r *fakeRequest) Merge(_ context.Context, + r2 internal.BatchRequest) (internal.BatchRequest, error) { + if r == nil { + return r2, nil + } + fr2 := r2.(*fakeRequest) + if fr2.mergeErr != nil { + return nil, fr2.mergeErr + } + return &fakeRequest{ + items: r.items + fr2.items, + sink: r.sink, + exportErr: fr2.exportErr, + delay: r.delay + fr2.delay, + }, nil +} + +func (r *fakeRequest) MergeSplit(ctx context.Context, cfg exporterbatcher.MaxSizeConfig, + r2 internal.BatchRequest) ([]internal.BatchRequest, error) { + if r.mergeErr != nil { + return nil, r.mergeErr + } + + maxItems := cfg.MaxSizeItems + if maxItems == 0 { + r, err := r.Merge(ctx, r2) + return []internal.BatchRequest{r}, err + } + + var fr2 *fakeRequest + if r2 == nil { + fr2 = &fakeRequest{sink: r.sink, exportErr: r.exportErr, delay: r.delay} + } else { + if r2.(*fakeRequest).mergeErr != nil { + return nil, r2.(*fakeRequest).mergeErr + } + fr2 = r2.(*fakeRequest) + fr2 = &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay} + } + var res []internal.BatchRequest + + // fill fr1 to maxItems if it's not nil + + r = &fakeRequest{items: r.items, sink: r.sink, exportErr: r.exportErr, delay: r.delay} + if fr2.items <= maxItems-r.items { + r.items += fr2.items + if fr2.exportErr != nil { + r.exportErr = fr2.exportErr + } + return []internal.BatchRequest{r}, nil + } + // if split is needed, we don't propagate exportErr from fr2 to fr1 to test more cases + fr2.items -= maxItems - r.items + r.items = maxItems + res = append(res, r) + + // split fr2 to maxItems + for { + if fr2.items <= maxItems { + res = append(res, &fakeRequest{items: fr2.items, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + break + } + res = append(res, &fakeRequest{items: maxItems, sink: fr2.sink, exportErr: fr2.exportErr, delay: fr2.delay}) + fr2.items -= maxItems + } + + return res, nil +} + type FakeRequestConverter struct { MetricsError error TracesError error diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 4f5b977b2e5..772a5673e24 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -86,7 +86,6 @@ func NewLogs( } logsOpts := []Option{ internal.WithMarshaler(logsRequestMarshaler), internal.WithUnmarshaler(newLogsRequestUnmarshalerFunc(pusher)), - internal.WithBatchFuncs(mergeLogs, mergeSplitLogs), } return NewLogsRequest(ctx, set, requestFromLogs(pusher), append(logsOpts, options...)...) } diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go index 296538bc0e0..cd1ed33c243 100644 --- a/exporter/exporterhelper/logs_batch.go +++ b/exporter/exporterhelper/logs_batch.go @@ -12,24 +12,23 @@ import ( ) // mergeLogs merges two logs requests into one. -func mergeLogs(_ context.Context, r1 Request, r2 Request) (Request, error) { - lr1, ok1 := r1.(*logsRequest) +func (req *logsRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { lr2, ok2 := r2.(*logsRequest) - if !ok1 || !ok2 { + if !ok2 { return nil, errors.New("invalid input type") } - lr2.ld.ResourceLogs().MoveAndAppendTo(lr1.ld.ResourceLogs()) - return lr1, nil + lr2.ld.ResourceLogs().MoveAndAppendTo(req.ld.ResourceLogs()) + return req, nil } // mergeSplitLogs splits and/or merges the logs into multiple requests based on the MaxSizeConfig. -func mergeSplitLogs(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { +func (req *logsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { var ( - res []Request + res []BatchRequest destReq *logsRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []Request{r1, r2} { + for _, req := range []BatchRequest{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go index f5e10b5bcc9..64a8b49a83a 100644 --- a/exporter/exporterhelper/logs_batch_test.go +++ b/exporter/exporterhelper/logs_batch_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/exporter/internal" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/testdata" ) @@ -18,7 +19,7 @@ import ( func TestMergeLogs(t *testing.T) { lr1 := &logsRequest{ld: testdata.GenerateLogs(2)} lr2 := &logsRequest{ld: testdata.GenerateLogs(3)} - res, err := mergeLogs(context.Background(), lr1, lr2) + res, err := lr1.Merge(context.Background(), lr2) require.NoError(t, err) assert.Equal(t, 5, res.(*logsRequest).ld.LogRecordCount()) } @@ -26,7 +27,7 @@ func TestMergeLogs(t *testing.T) { func TestMergeLogsInvalidInput(t *testing.T) { lr1 := &tracesRequest{td: testdata.GenerateTraces(2)} lr2 := &logsRequest{ld: testdata.GenerateLogs(3)} - _, err := mergeLogs(context.Background(), lr1, lr2) + _, err := lr1.Merge(context.Background(), lr2) assert.Error(t, err) } @@ -34,8 +35,8 @@ func TestMergeSplitLogs(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - lr1 Request - lr2 Request + lr1 internal.BatchRequest + lr2 internal.BatchRequest expected []*logsRequest }{ { @@ -45,13 +46,6 @@ func TestMergeSplitLogs(t *testing.T) { lr2: &logsRequest{ld: plog.NewLogs()}, expected: []*logsRequest{{ld: plog.NewLogs()}}, }, - { - name: "both_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - lr1: nil, - lr2: nil, - expected: []*logsRequest{}, - }, { name: "first_request_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, @@ -60,17 +54,10 @@ func TestMergeSplitLogs(t *testing.T) { expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}}, }, { - name: "first_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - lr1: nil, - lr2: &logsRequest{ld: testdata.GenerateLogs(5)}, - expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}}, - }, - { - name: "first_nil_second_empty", + name: "first_empty_second_nil", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - lr1: nil, - lr2: &logsRequest{ld: plog.NewLogs()}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: nil, expected: []*logsRequest{{ld: plog.NewLogs()}}, }, { @@ -87,7 +74,7 @@ func TestMergeSplitLogs(t *testing.T) { { name: "split_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, - lr1: nil, + lr1: &logsRequest{ld: plog.NewLogs()}, lr2: &logsRequest{ld: testdata.GenerateLogs(10)}, expected: []*logsRequest{ {ld: testdata.GenerateLogs(4)}, @@ -132,7 +119,7 @@ func TestMergeSplitLogs(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := mergeSplitLogs(context.Background(), tt.cfg, tt.lr1, tt.lr2) + res, err := tt.lr1.MergeSplit(context.Background(), tt.cfg, tt.lr2) require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i, r := range res { @@ -146,7 +133,7 @@ func TestMergeSplitLogs(t *testing.T) { func TestMergeSplitLogsInvalidInput(t *testing.T) { r1 := &tracesRequest{td: testdata.GenerateTraces(2)} r2 := &logsRequest{ld: testdata.GenerateLogs(3)} - _, err := mergeSplitLogs(context.Background(), exporterbatcher.MaxSizeConfig{}, r1, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{}, r2) assert.Error(t, err) } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 64557029ce7..b2da8895f98 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -86,7 +86,6 @@ func NewMetrics( } metricsOpts := []Option{ internal.WithMarshaler(metricsRequestMarshaler), internal.WithUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher)), - internal.WithBatchFuncs(mergeMetrics, mergeSplitMetrics), } return NewMetricsRequest(ctx, set, requestFromMetrics(pusher), append(metricsOpts, options...)...) } diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go index 1a6448c8496..6331467b46b 100644 --- a/exporter/exporterhelper/metrics_batch.go +++ b/exporter/exporterhelper/metrics_batch.go @@ -12,24 +12,23 @@ import ( ) // mergeMetrics merges two metrics requests into one. -func mergeMetrics(_ context.Context, r1 Request, r2 Request) (Request, error) { - mr1, ok1 := r1.(*metricsRequest) +func (req *metricsRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { mr2, ok2 := r2.(*metricsRequest) - if !ok1 || !ok2 { + if !ok2 { return nil, errors.New("invalid input type") } - mr2.md.ResourceMetrics().MoveAndAppendTo(mr1.md.ResourceMetrics()) - return mr1, nil + mr2.md.ResourceMetrics().MoveAndAppendTo(req.md.ResourceMetrics()) + return req, nil } // mergeSplitMetrics splits and/or merges the metrics into multiple requests based on the MaxSizeConfig. -func mergeSplitMetrics(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { +func (req *metricsRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { var ( - res []Request + res []BatchRequest destReq *metricsRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []Request{r1, r2} { + for _, req := range []BatchRequest{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go index 860a1eee9c3..6edfdfe59c9 100644 --- a/exporter/exporterhelper/metrics_batch_test.go +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -18,7 +18,7 @@ import ( func TestMergeMetrics(t *testing.T) { mr1 := &metricsRequest{md: testdata.GenerateMetrics(2)} mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - res, err := mergeMetrics(context.Background(), mr1, mr2) + res, err := mr1.Merge(context.Background(), mr2) require.NoError(t, err) assert.Equal(t, 5, res.(*metricsRequest).md.MetricCount()) } @@ -26,7 +26,7 @@ func TestMergeMetrics(t *testing.T) { func TestMergeMetricsInvalidInput(t *testing.T) { mr1 := &tracesRequest{td: testdata.GenerateTraces(2)} mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - _, err := mergeMetrics(context.Background(), mr1, mr2) + _, err := mr1.Merge(context.Background(), mr2) assert.Error(t, err) } @@ -34,8 +34,8 @@ func TestMergeSplitMetrics(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - mr1 Request - mr2 Request + mr1 BatchRequest + mr2 BatchRequest expected []*metricsRequest }{ { @@ -45,13 +45,6 @@ func TestMergeSplitMetrics(t *testing.T) { mr2: &metricsRequest{md: pmetric.NewMetrics()}, expected: []*metricsRequest{{md: pmetric.NewMetrics()}}, }, - { - name: "both_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - mr1: nil, - mr2: nil, - expected: []*metricsRequest{}, - }, { name: "first_request_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, @@ -60,17 +53,10 @@ func TestMergeSplitMetrics(t *testing.T) { expected: []*metricsRequest{{md: testdata.GenerateMetrics(5)}}, }, { - name: "first_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - mr1: nil, - mr2: &metricsRequest{md: testdata.GenerateMetrics(5)}, - expected: []*metricsRequest{{md: testdata.GenerateMetrics(5)}}, - }, - { - name: "first_nil_second_empty", + name: "first_empty_second_nil", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - mr1: nil, - mr2: &metricsRequest{md: pmetric.NewMetrics()}, + mr1: &metricsRequest{md: pmetric.NewMetrics()}, + mr2: nil, expected: []*metricsRequest{{md: pmetric.NewMetrics()}}, }, { @@ -87,7 +73,7 @@ func TestMergeSplitMetrics(t *testing.T) { { name: "split_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 14}, - mr1: nil, + mr1: &metricsRequest{md: pmetric.NewMetrics()}, mr2: &metricsRequest{md: testdata.GenerateMetrics(15)}, // 15 metrics, 30 data points expected: []*metricsRequest{ {md: testdata.GenerateMetrics(7)}, // 7 metrics, 14 data points @@ -133,7 +119,7 @@ func TestMergeSplitMetrics(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := mergeSplitMetrics(context.Background(), tt.cfg, tt.mr1, tt.mr2) + res, err := tt.mr1.MergeSplit(context.Background(), tt.cfg, tt.mr2) require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { @@ -146,7 +132,7 @@ func TestMergeSplitMetrics(t *testing.T) { func TestMergeSplitMetricsInvalidInput(t *testing.T) { r1 := &tracesRequest{td: testdata.GenerateTraces(2)} r2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - _, err := mergeSplitMetrics(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r1, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) assert.Error(t, err) } diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 407af781feb..7d7bedbd289 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -86,7 +86,6 @@ func NewTraces( } tracesOpts := []Option{ internal.WithMarshaler(tracesRequestMarshaler), internal.WithUnmarshaler(newTraceRequestUnmarshalerFunc(pusher)), - internal.WithBatchFuncs(mergeTraces, mergeSplitTraces), } return NewTracesRequest(ctx, set, requestFromTraces(pusher), append(tracesOpts, options...)...) } diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go index 1bdada95b7b..c2743e12356 100644 --- a/exporter/exporterhelper/traces_batch.go +++ b/exporter/exporterhelper/traces_batch.go @@ -12,24 +12,23 @@ import ( ) // mergeTraces merges two traces requests into one. -func mergeTraces(_ context.Context, r1 Request, r2 Request) (Request, error) { - tr1, ok1 := r1.(*tracesRequest) +func (req *tracesRequest) Merge(_ context.Context, r2 BatchRequest) (BatchRequest, error) { tr2, ok2 := r2.(*tracesRequest) - if !ok1 || !ok2 { + if !ok2 { return nil, errors.New("invalid input type") } - tr2.td.ResourceSpans().MoveAndAppendTo(tr1.td.ResourceSpans()) - return tr1, nil + tr2.td.ResourceSpans().MoveAndAppendTo(req.td.ResourceSpans()) + return req, nil } // mergeSplitTraces splits and/or merges the traces into multiple requests based on the MaxSizeConfig. -func mergeSplitTraces(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { +func (req *tracesRequest) MergeSplit(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r2 BatchRequest) ([]BatchRequest, error) { var ( - res []Request + res []BatchRequest destReq *tracesRequest capacityLeft = cfg.MaxSizeItems ) - for _, req := range []Request{r1, r2} { + for _, req := range []BatchRequest{req, r2} { if req == nil { continue } diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go index d88591b3091..1ff4434916c 100644 --- a/exporter/exporterhelper/traces_batch_test.go +++ b/exporter/exporterhelper/traces_batch_test.go @@ -18,7 +18,7 @@ import ( func TestMergeTraces(t *testing.T) { tr1 := &tracesRequest{td: testdata.GenerateTraces(2)} tr2 := &tracesRequest{td: testdata.GenerateTraces(3)} - res, err := mergeTraces(context.Background(), tr1, tr2) + res, err := tr1.Merge(context.Background(), tr2) require.NoError(t, err) assert.Equal(t, 5, res.(*tracesRequest).td.SpanCount()) } @@ -26,7 +26,7 @@ func TestMergeTraces(t *testing.T) { func TestMergeTracesInvalidInput(t *testing.T) { tr1 := &logsRequest{ld: testdata.GenerateLogs(2)} tr2 := &tracesRequest{td: testdata.GenerateTraces(3)} - _, err := mergeTraces(context.Background(), tr1, tr2) + _, err := tr1.Merge(context.Background(), tr2) assert.Error(t, err) } @@ -34,8 +34,8 @@ func TestMergeSplitTraces(t *testing.T) { tests := []struct { name string cfg exporterbatcher.MaxSizeConfig - tr1 Request - tr2 Request + tr1 BatchRequest + tr2 BatchRequest expected []*tracesRequest }{ { @@ -45,13 +45,6 @@ func TestMergeSplitTraces(t *testing.T) { tr2: &tracesRequest{td: ptrace.NewTraces()}, expected: []*tracesRequest{{td: ptrace.NewTraces()}}, }, - { - name: "both_requests_nil", - cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - tr1: nil, - tr2: nil, - expected: []*tracesRequest{}, - }, { name: "first_request_empty", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, @@ -67,10 +60,10 @@ func TestMergeSplitTraces(t *testing.T) { expected: []*tracesRequest{{td: testdata.GenerateTraces(5)}}, }, { - name: "first_nil_second_empty", + name: "first_empty_second_nil", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, - tr1: nil, - tr2: &tracesRequest{td: ptrace.NewTraces()}, + tr1: &tracesRequest{td: ptrace.NewTraces()}, + tr2: nil, expected: []*tracesRequest{{td: ptrace.NewTraces()}}, }, { @@ -87,7 +80,7 @@ func TestMergeSplitTraces(t *testing.T) { { name: "split_only", cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, - tr1: nil, + tr1: &tracesRequest{td: ptrace.NewTraces()}, tr2: &tracesRequest{td: testdata.GenerateTraces(10)}, expected: []*tracesRequest{ {td: testdata.GenerateTraces(4)}, @@ -133,7 +126,7 @@ func TestMergeSplitTraces(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := mergeSplitTraces(context.Background(), tt.cfg, tt.tr1, tt.tr2) + res, err := tt.tr1.MergeSplit(context.Background(), tt.cfg, tt.tr2) require.NoError(t, err) assert.Equal(t, len(tt.expected), len(res)) for i := range res { @@ -146,7 +139,7 @@ func TestMergeSplitTraces(t *testing.T) { func TestMergeSplitTracesInvalidInput(t *testing.T) { r1 := &tracesRequest{td: testdata.GenerateTraces(2)} r2 := &metricsRequest{md: testdata.GenerateMetrics(3)} - _, err := mergeSplitTraces(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r1, r2) + _, err := r1.MergeSplit(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r2) assert.Error(t, err) } diff --git a/exporter/internal/request.go b/exporter/internal/request.go index 1b82e23504d..3666e1552b1 100644 --- a/exporter/internal/request.go +++ b/exporter/internal/request.go @@ -5,6 +5,8 @@ package internal // import "go.opentelemetry.io/collector/exporter/internal" import ( "context" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" ) // Request represents a single request that can be sent to an external endpoint. @@ -19,6 +21,16 @@ type Request interface { ItemsCount() int } +// BatchRequest represents a single request that can be sent to an external endpoint. It can be merged with +// another BatchRequest and/or split into multiple given a size limit. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type BatchRequest interface { + Request + Merge(context.Context, BatchRequest) (BatchRequest, error) + MergeSplit(context.Context, exporterbatcher.MaxSizeConfig, BatchRequest) ([]BatchRequest, error) +} + // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial // temporary failures. For example, if some items failed to process and can be retried, this interface allows to // return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned.