diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 4d36b8366e8b..ddce4f3c0c53 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -118,10 +118,19 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config. return err } - return p.processBlocks(ctx, bqs, data) + start = time.Now() + res := p.processBlocks(ctx, bqs, data) + duration = time.Since(start) + + for _, t := range tasks { + FromContext(t.ctx).AddProcessingTime(duration) + } + + return res } func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error { + defer func() { for i := range bqs { if bqs[i] == nil { @@ -188,7 +197,7 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie for _, task := range tasks { stats := FromContext(task.ctx) - stats.AddProcessingTime(duration) + stats.AddTotalProcessingTime(duration) stats.IncProcessedBlocks() } diff --git a/pkg/bloomgateway/stats.go b/pkg/bloomgateway/stats.go index 75a3ab7b0ee8..09f78841e544 100644 --- a/pkg/bloomgateway/stats.go +++ b/pkg/bloomgateway/stats.go @@ -8,11 +8,15 @@ import ( ) type Stats struct { - Status string - NumTasks, NumFilters int - ChunksRequested, ChunksFiltered, SeriesRequested, SeriesFiltered int - QueueTime, MetasFetchTime, BlocksFetchTime, ProcessingTime, PostProcessingTime *atomic.Duration - ProcessedBlocks *atomic.Int32 + Status string + NumTasks, NumFilters int + ChunksRequested, ChunksFiltered int + SeriesRequested, SeriesFiltered int + QueueTime *atomic.Duration + MetasFetchTime, BlocksFetchTime *atomic.Duration + ProcessingTime, TotalProcessingTime *atomic.Duration + PostProcessingTime *atomic.Duration + ProcessedBlocks *atomic.Int32 } type statsKey int @@ -22,13 +26,14 @@ var ctxKey = statsKey(0) // ContextWithEmptyStats returns a context with empty stats. func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) { stats := &Stats{ - Status: "unknown", - ProcessedBlocks: atomic.NewInt32(0), - QueueTime: atomic.NewDuration(0), - MetasFetchTime: atomic.NewDuration(0), - BlocksFetchTime: atomic.NewDuration(0), - ProcessingTime: atomic.NewDuration(0), - PostProcessingTime: atomic.NewDuration(0), + Status: "unknown", + ProcessedBlocks: atomic.NewInt32(0), + QueueTime: atomic.NewDuration(0), + MetasFetchTime: atomic.NewDuration(0), + BlocksFetchTime: atomic.NewDuration(0), + ProcessingTime: atomic.NewDuration(0), + TotalProcessingTime: atomic.NewDuration(0), + PostProcessingTime: atomic.NewDuration(0), } ctx = context.WithValue(ctx, ctxKey, stats) return stats, ctx @@ -110,6 +115,13 @@ func (s *Stats) AddProcessingTime(t time.Duration) { s.ProcessingTime.Add(t) } +func (s *Stats) AddTotalProcessingTime(t time.Duration) { + if s == nil { + return + } + s.TotalProcessingTime.Add(t) +} + func (s *Stats) AddPostProcessingTime(t time.Duration) { if s == nil { return