Skip to content

Commit

Permalink
feat(bloom-gw): Add metrics.go style log line to bloom gateway `Fil…
Browse files Browse the repository at this point in the history
…terChunks` call (#12354)

This PR adds a "metrics.go" style log line to the bloom gateway that contains latencies for queueing, processing, post-processing, as well as number of chunks/series requested/filtered.

This log line should give operators a better understanding where time is spent for individual requests.

Example log line:

```
ts=2024-03-26T13:16:11.869619964Z caller=spanlogger.go:109 component=bloom-gateway tenant=XXX method=bloomgateway.FilterChunkRefs user=XXX traceID=6239d49e1e88f88645bd7f1f6f1b85c8 level=info status=success tasks=1 series_requested=35 series_filtered=31 chunks_requested=103 chunks_filtered=93 queue_time=4.108128936s metas_fetch_time=22.040408ms blocks_fetch_time=1.284575ms processing_time=30.215863002s post_processing_time=16.084µs
```

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Mar 26, 2024
1 parent 5c6a031 commit 9af191f
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 35 deletions.
46 changes: 27 additions & 19 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/queue"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
Expand Down Expand Up @@ -206,34 +205,45 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
log.With(g.logger, "tenant", tenantID),
"bloomgateway.FilterChunkRefs",
)
defer sp.Finish()

stats, ctx := ContextWithEmptyStats(ctx)
defer func() {
level.Info(sp).Log(stats.KVArgs()...)
sp.Finish()
}()

// start time == end time --> empty response
if req.From.Equal(req.Through) {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
}, nil
}

// start time > end time --> error response
if req.Through.Before(req.From) {
stats.Status = labelFailure
return nil, errors.New("from time must not be after through time")
}

filters := v1.ExtractTestableLineFilters(req.Plan.AST)
stats.NumFilters = len(filters)
g.metrics.receivedFilters.Observe(float64(len(filters)))

// Shortcut if request does not contain filters
if len(filters) == 0 {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
}

seriesByDay := partitionRequest(req)
stats.NumTasks = len(seriesByDay)

// no tasks --> empty response
if len(seriesByDay) == 0 {
stats.Status = labelSuccess
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
}, nil
Expand All @@ -255,15 +265,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))

level.Debug(sp).Log(
"msg", "created task for day",
"task", task.ID,
"day", seriesForDay.day,
"interval", seriesForDay.interval.String(),
"nSeries", len(seriesForDay.series),
"filters", JoinFunc(filters, ";", func(e syntax.LineFilterExpr) string { return e.String() }),
)
tasks = append(tasks, task)
}

Expand All @@ -285,13 +286,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// When enqueuing, we also add the task to the pending tasks
_ = g.pendingTasks.Inc()
}); err != nil {
stats.Status = labelFailure
return nil, errors.Wrap(err, "failed to enqueue task")
}
// TODO(owen-d): use `concurrency` lib, bound parallelism
go g.consumeTask(ctx, task, tasksCh)
}

sp.Log("enqueue_duration", time.Since(queueStart).String())
sp.Log("msg", "enqueued tasks", "duration", time.Since(queueStart).String())

remaining := len(tasks)

Expand All @@ -305,10 +307,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
for remaining > 0 {
select {
case <-ctx.Done():
stats.Status = "cancel"
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
level.Info(sp).Log("msg", "task done", "task", task.ID, "err", task.Err())
if task.Err() != nil {
stats.Status = labelFailure
return nil, errors.Wrap(task.Err(), "request failed")
}
responses = append(responses, task.responses)
Expand All @@ -318,7 +322,10 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

sp.Log("msg", "received all responses")

start := time.Now()
filtered := filterChunkRefs(req, responses)
duration := time.Since(start)
stats.AddPostProcessingTime(duration)

// free up the responses
for _, resp := range responses {
Expand All @@ -335,13 +342,14 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
g.metrics.requestedChunks.Observe(float64(preFilterChunks))
g.metrics.filteredChunks.Observe(float64(preFilterChunks - postFilterChunks))

level.Info(sp).Log(
"msg", "return filtered chunk refs",
"requested_series", preFilterSeries,
"filtered_series", preFilterSeries-postFilterSeries,
"requested_chunks", preFilterChunks,
"filtered_chunks", preFilterChunks-postFilterChunks,
)
stats.Status = "success"
stats.SeriesRequested = preFilterSeries
stats.SeriesFiltered = preFilterSeries - postFilterSeries
stats.ChunksRequested = preFilterChunks
stats.ChunksFiltered = preFilterChunks - postFilterChunks

sp.Log("msg", "return filtered chunk refs")

return &logproto.FilterChunkRefResponse{ChunkRefs: filtered}, nil
}

Expand Down
47 changes: 31 additions & 16 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.M
"msg", "process tasks with bounds",
"tenant", tenant,
"tasks", len(tasks),
"bounds", JoinFunc(bounds, ",", func(e v1.FingerprintBounds) string { return e.String() }),
"bounds", len(bounds),
)

for ts, tasks := range group(tasks, func(t Task) config.DayTime { return t.table }) {
Expand Down Expand Up @@ -73,23 +73,30 @@ func (p *processor) processTasks(ctx context.Context, tenant string, day config.
Interval: interval,
Keyspace: v1.NewBounds(minFpRange.Min, maxFpRange.Max),
}

start := time.Now()
metas, err := p.store.FetchMetas(ctx, metaSearch)
duration := time.Since(start)
level.Debug(p.logger).Log("msg", "fetched metas", "count", len(metas), "duration", duration, "err", err)

for _, t := range tasks {
FromContext(t.ctx).AddMetasFetchTime(duration)
}

if err != nil {
return err
}

blocksRefs := bloomshipper.BlocksForMetas(metas, interval, keyspaces)
level.Info(p.logger).Log("msg", "blocks for metas", "num_metas", len(metas), "num_blocks", len(blocksRefs))
return p.processBlocks(ctx, partitionTasks(tasks, blocksRefs))
}

func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) error {
data := partitionTasks(tasks, blocksRefs)

refs := make([]bloomshipper.BlockRef, 0, len(data))
for _, block := range data {
refs = append(refs, block.ref)
}

start := time.Now()
start = time.Now()
bqs, err := p.store.FetchBlocks(
ctx,
refs,
Expand All @@ -101,12 +108,21 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er
// the underlying bloom []byte outside of iteration
bloomshipper.WithPool(true),
)
level.Debug(p.logger).Log("msg", "fetch blocks", "count", len(bqs), "duration", time.Since(start), "err", err)
duration = time.Since(start)
level.Debug(p.logger).Log("msg", "fetched blocks", "count", len(refs), "duration", duration, "err", err)

for _, t := range tasks {
FromContext(t.ctx).AddBlocksFetchTime(duration)
}

if err != nil {
return err
}

return p.processBlocks(ctx, bqs, data)
}

func (p *processor) processBlocks(ctx context.Context, bqs []*bloomshipper.CloseableBlockQuerier, data []blockWithTasks) error {
defer func() {
for i := range bqs {
if bqs[i] == nil {
Expand All @@ -124,13 +140,6 @@ func (p *processor) processBlocks(ctx context.Context, data []blockWithTasks) er
}

block := data[i]
level.Debug(p.logger).Log(
"msg", "process block with tasks",
"job", i+1,
"of_jobs", len(bqs),
"block", block.ref,
"num_tasks", len(block.tasks),
)

if !block.ref.Bounds.Equal(bq.Bounds) {
return errors.Errorf("block and querier bounds differ: %s vs %s", block.ref.Bounds, bq.Bounds)
Expand Down Expand Up @@ -178,10 +187,16 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie

start := time.Now()
err = fq.Run()
duration := time.Since(start)

if err != nil {
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelFailure).Observe(time.Since(start).Seconds())
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelFailure).Observe(duration.Seconds())
} else {
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelSuccess).Observe(time.Since(start).Seconds())
p.metrics.blockQueryLatency.WithLabelValues(p.id, labelSuccess).Observe(duration.Seconds())
}

for _, task := range tasks {
FromContext(task.ctx).AddProcessingTime(duration)
}

return err
Expand Down
90 changes: 90 additions & 0 deletions pkg/bloomgateway/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package bloomgateway

import (
"context"
"time"

"go.uber.org/atomic"
)

type Stats struct {
Status string
NumTasks, NumFilters int
ChunksRequested, ChunksFiltered, SeriesRequested, SeriesFiltered int
QueueTime, MetasFetchTime, BlocksFetchTime, ProcessingTime, PostProcessingTime atomic.Duration
}

type statsKey int

var ctxKey = statsKey(0)

// ContextWithEmptyStats returns a context with empty stats.
func ContextWithEmptyStats(ctx context.Context) (*Stats, context.Context) {
stats := &Stats{Status: "unknown"}
ctx = context.WithValue(ctx, ctxKey, stats)
return stats, ctx
}

// FromContext gets the Stats out of the Context. Returns nil if stats have not
// been initialised in the context.
func FromContext(ctx context.Context) *Stats {
o := ctx.Value(ctxKey)
if o == nil {
return nil
}
return o.(*Stats)
}

func (s *Stats) KVArgs() []any {
if s == nil {
return []any{}
}
return []any{
"status", s.Status,
"tasks", s.NumTasks,
"series_requested", s.SeriesRequested,
"series_filtered", s.SeriesFiltered,
"chunks_requested", s.ChunksRequested,
"chunks_filtered", s.ChunksFiltered,
"queue_time", s.QueueTime.Load(),
"metas_fetch_time", s.MetasFetchTime.Load(),
"blocks_fetch_time", s.BlocksFetchTime.Load(),
"processing_time", s.ProcessingTime.Load(),
"post_processing_time", s.PostProcessingTime.Load(),
}
}

func (s *Stats) AddQueueTime(t time.Duration) {
if s == nil {
return
}
s.QueueTime.Add(t)
}

func (s *Stats) AddMetasFetchTime(t time.Duration) {
if s == nil {
return
}
s.MetasFetchTime.Add(t)
}

func (s *Stats) AddBlocksFetchTime(t time.Duration) {
if s == nil {
return
}
s.BlocksFetchTime.Add(t)
}

func (s *Stats) AddProcessingTime(t time.Duration) {
if s == nil {
return
}
s.ProcessingTime.Add(t)
}

func (s *Stats) AddPostProcessingTime(t time.Duration) {
if s == nil {
return
}
s.PostProcessingTime.Add(t)
}
1 change: 1 addition & 0 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func (w *worker) running(_ context.Context) error {
level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID)
_ = w.pending.Dec()
w.metrics.queueDuration.WithLabelValues(w.id).Observe(time.Since(task.enqueueTime).Seconds())
FromContext(task.ctx).AddQueueTime(time.Since(task.enqueueTime))
tasks = append(tasks, task)

first, last := getFirstLast(task.series)
Expand Down

0 comments on commit 9af191f

Please sign in to comment.