diff --git a/exporter/internal/queue/bounded_memory_queue.go b/exporter/internal/queue/bounded_memory_queue.go index 98e1b281176..ae62704ff5b 100644 --- a/exporter/internal/queue/bounded_memory_queue.go +++ b/exporter/internal/queue/bounded_memory_queue.go @@ -53,6 +53,11 @@ func (q *boundedMemoryQueue[T]) Consume(consumeFunc func(context.Context, T) err return true } +// Should be called to remove the item of the given index from the queue once processing is finished. +// For in memory queue, this function is noop. +func (q *boundedMemoryQueue[T]) OnProcessingFinished(uint64, error) { +} + // Shutdown closes the queue channel to initiate draining of the queue. func (q *boundedMemoryQueue[T]) Shutdown(context.Context) error { q.sizedChannel.shutdown() diff --git a/exporter/internal/queue/persistent_queue.go b/exporter/internal/queue/persistent_queue.go index 71fb910c78f..31133b4b2d6 100644 --- a/exporter/internal/queue/persistent_queue.go +++ b/exporter/internal/queue/persistent_queue.go @@ -195,15 +195,15 @@ func (pq *persistentQueue[T]) restoreQueueSizeFromStorage(ctx context.Context) ( func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error) bool { for { var ( - req T - onProcessingFinished func(error) - consumed bool + index uint64 + req T + consumed bool ) // If we are stopped we still process all the other events in the channel before, but we // return fast in the `getNextItem`, so we will free the channel fast and get to the stop. _, ok := pq.sizedChannel.pop(func(permanentQueueEl) int64 { - req, onProcessingFinished, consumed = pq.getNextItem(context.Background()) + index, req, consumed = pq.getNextItem(context.Background()) if !consumed { return 0 } @@ -213,7 +213,8 @@ func (pq *persistentQueue[T]) Consume(consumeFunc func(context.Context, T) error return false } if consumed { - onProcessingFinished(consumeFunc(context.Background(), req)) + consumeErr := consumeFunc(context.Background(), req) + pq.OnProcessingFinished(index, consumeErr) return true } } @@ -303,20 +304,21 @@ func (pq *persistentQueue[T]) putInternal(ctx context.Context, req T) error { return nil } -// getNextItem pulls the next available item from the persistent storage along with a callback function that should be -// called after the item is processed to clean up the storage. If no new item is available, returns false. -func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), bool) { +// getNextItem pulls the next available item from the persistent storage along with its index. Once processing is +// finished, the index should be called with OnProcessingFinished to clean up the storage. If no new item is available, +// returns false. +func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (uint64, T, bool) { pq.mu.Lock() defer pq.mu.Unlock() var request T if pq.stopped { - return request, nil, false + return 0, request, false } if pq.readIndex == pq.writeIndex { - return request, nil, false + return 0, request, false } index := pq.readIndex @@ -340,45 +342,49 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error), pq.logger.Error("Error deleting item from queue", zap.Error(err)) } - return request, nil, false + return 0, request, false } // Increase the reference count, so the client is not closed while the request is being processed. // The client cannot be closed because we hold the lock since last we checked `stopped`. pq.refClient++ - return request, func(consumeErr error) { - // Delete the item from the persistent storage after it was processed. - pq.mu.Lock() - // Always unref client even if the consumer is shutdown because we always ref it for every valid request. - defer func() { - if err = pq.unrefClient(ctx); err != nil { - pq.logger.Error("Error closing the storage client", zap.Error(err)) - } - pq.mu.Unlock() - }() - if experr.IsShutdownErr(consumeErr) { - // The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart. - // TODO: Handle partially delivered requests by updating their values in the storage. - return - } + return index, request, true +} - if err = pq.itemDispatchingFinish(ctx, index); err != nil { - pq.logger.Error("Error deleting item from queue", zap.Error(err)) +// Should be called to remove the item of the given index from the queue once processing is finished. +func (pq *persistentQueue[T]) OnProcessingFinished(index uint64, consumeErr error) { + // Delete the item from the persistent storage after it was processed. + pq.mu.Lock() + // Always unref client even if the consumer is shutdown because we always ref it for every valid request. + defer func() { + if err := pq.unrefClient(context.Background()); err != nil { + pq.logger.Error("Error closing the storage client", zap.Error(err)) } + pq.mu.Unlock() + }() - // Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size - // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. - if (pq.readIndex % 10) == 0 { - if qsErr := pq.backupQueueSize(ctx); qsErr != nil { - pq.logger.Error("Error writing queue size to storage", zap.Error(err)) - } + if experr.IsShutdownErr(consumeErr) { + // The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart. + // TODO: Handle partially delivered requests by updating their values in the storage. + return + } + + if err := pq.itemDispatchingFinish(context.Background(), index); err != nil { + pq.logger.Error("Error deleting item from queue", zap.Error(err)) + } + + // Back up the queue size to storage on every 10 reads. The stored value is used to recover the queue size + // in case if the collector is killed. The recovered queue size is allowed to be inaccurate. + if (pq.readIndex % 10) == 0 { + if qsErr := pq.backupQueueSize(context.Background()); qsErr != nil { + pq.logger.Error("Error writing queue size to storage", zap.Error(qsErr)) } + } - // Ensure the used size and the channel size are in sync. - pq.sizedChannel.syncSize() + // Ensure the used size and the channel size are in sync. + pq.sizedChannel.syncSize() - }, true } // retrieveAndEnqueueNotDispatchedReqs gets the items for which sending was not finished, cleans the storage diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index 88c65de666a..b5a7cfa3d86 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -463,19 +463,19 @@ func TestPersistentQueue_CurrentlyProcessedItems(t *testing.T) { requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{}) // Takes index 0 in process. - readReq, _, found := ps.getNextItem(context.Background()) + _, readReq, found := ps.getNextItem(context.Background()) require.True(t, found) assert.Equal(t, req, readReq) requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0}) // This takes item 1 to process. - secondReadReq, onProcessingFinished, found := ps.getNextItem(context.Background()) + secondIndex, secondReadReq, found := ps.getNextItem(context.Background()) require.True(t, found) assert.Equal(t, req, secondReadReq) requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0, 1}) // Lets mark item 1 as finished, it will remove it from the currently dispatched items list. - onProcessingFinished(nil) + ps.OnProcessingFinished(secondIndex, nil) requireCurrentlyDispatchedItemsEqual(t, ps, []uint64{0}) // Reload the storage. Since items 0 was not finished, this should be re-enqueued at the end. @@ -736,12 +736,12 @@ func TestPersistentQueue_ShutdownWhileConsuming(t *testing.T) { require.NoError(t, ps.Offer(context.Background(), newTracesRequest(5, 10))) - _, onProcessingFinished, ok := ps.getNextItem(context.Background()) + index, _, ok := ps.getNextItem(context.Background()) require.True(t, ok) assert.False(t, ps.client.(*mockStorageClient).isClosed()) require.NoError(t, ps.Shutdown(context.Background())) assert.False(t, ps.client.(*mockStorageClient).isClosed()) - onProcessingFinished(nil) + ps.OnProcessingFinished(index, nil) assert.True(t, ps.client.(*mockStorageClient).isClosed()) } diff --git a/exporter/internal/queue/queue.go b/exporter/internal/queue/queue.go index 35bc504579e..54e8346a575 100644 --- a/exporter/internal/queue/queue.go +++ b/exporter/internal/queue/queue.go @@ -33,6 +33,8 @@ type Queue[T any] interface { Size() int // Capacity returns the capacity of the queue. Capacity() int + // Should be called to remove the item of the given index from the queue once processing is finished. + OnProcessingFinished(index uint64, consumeErr error) } type itemsCounter interface {