diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index b4ccd0bee075..ab2637b91d8e 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -63,7 +63,9 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" + util_log "github.com/grafana/loki/v3/pkg/util/log" utillog "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/spanlogger" ) var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring") @@ -87,7 +89,7 @@ type Gateway struct { queue *queue.RequestQueue activeUsers *util.ActiveUsersCleanupService - bloomStore bloomshipper.Store + bloomStore bloomshipper.StoreWithMetrics pendingTasks *atomic.Int64 @@ -109,7 +111,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int { } // New returns a new instance of the Bloom Gateway. -func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { +func New(cfg Config, store bloomshipper.StoreWithMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) { utillog.WarnExperimentalUse("Bloom Gateway", logger) g := &Gateway{ cfg: cfg, @@ -203,13 +205,15 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, err } - logger := log.With(g.logger, "tenant", tenantID) - sp, ctx := opentracing.StartSpanFromContext(ctx, "bloomgateway.FilterChunkRefs") stats, ctx := ContextWithEmptyStats(ctx) + logger := spanlogger.FromContextWithFallback( + ctx, + util_log.WithContext(ctx, g.logger), + ) + defer func() { level.Info(logger).Log(stats.KVArgs()...) - sp.LogKV(stats.KVArgs()...) sp.Finish() }() @@ -319,6 +323,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk preFilterChunks += len(series.Refs) } + combinedRecorder := v1.NewBloomRecorder(ctx, "combined") for remaining > 0 { select { case <-ctx.Done(): @@ -330,10 +335,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, errors.Wrap(task.Err(), "request failed") } responses = append(responses, task.responses) + combinedRecorder.Merge(task.recorder) remaining-- } } + combinedRecorder.Report(util_log.WithContext(ctx, g.logger), g.bloomStore.BloomMetrics()) sp.LogKV("msg", "received all responses") start := time.Now() @@ -348,7 +355,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk postFilterSeries := len(filtered) - for _, group := range req.Refs { + for _, group := range filtered { postFilterChunks += len(group.Refs) } g.metrics.requestedSeries.Observe(float64(preFilterSeries)) @@ -421,7 +428,10 @@ func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] { // TODO(owen-d): improve perf. This can be faster with a more specialized impl // NB(owen-d): `req` is mutated in place for performance, but `responses` is not // Removals of the outputs must be sorted. -func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Output) []*logproto.GroupedChunkRefs { +func filterChunkRefs( + req *logproto.FilterChunkRefRequest, + responses [][]v1.Output, +) []*logproto.GroupedChunkRefs { res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs)) // dedupe outputs, merging the same series. diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index b3d80d4a0fb0..08afeffcbf70 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -69,11 +69,15 @@ type Task struct { // log enqueue time so we can observe the time spent in the queue enqueueTime time.Time + + // recorder + recorder *v1.BloomRecorder } func newTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) Task { return Task{ tenant: tenantID, + recorder: v1.NewBloomRecorder(ctx, "task"), err: new(wrappedError), resCh: make(chan v1.Output), filters: filters, @@ -113,6 +117,7 @@ func (t Task) CloseWithError(err error) { // Copy returns a copy of the existing task but with a new slice of grouped chunk refs func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task { return Task{ + recorder: t.recorder, tenant: t.tenant, err: t.err, resCh: t.resCh, @@ -126,22 +131,26 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task { } } -func (t Task) RequestIter(tokenizer *v1.NGramTokenizer) v1.Iterator[v1.Request] { +func (t Task) RequestIter( + tokenizer *v1.NGramTokenizer, +) v1.Iterator[v1.Request] { return &requestIterator{ - series: v1.NewSliceIter(t.series), - search: v1.FiltersToBloomTest(tokenizer, t.filters...), - channel: t.resCh, - curr: v1.Request{}, + recorder: t.recorder, + series: v1.NewSliceIter(t.series), + search: v1.FiltersToBloomTest(tokenizer, t.filters...), + channel: t.resCh, + curr: v1.Request{}, } } var _ v1.Iterator[v1.Request] = &requestIterator{} type requestIterator struct { - series v1.Iterator[*logproto.GroupedChunkRefs] - search v1.BloomTest - channel chan<- v1.Output - curr v1.Request + recorder *v1.BloomRecorder + series v1.Iterator[*logproto.GroupedChunkRefs] + search v1.BloomTest + channel chan<- v1.Output + curr v1.Request } // At implements v1.Iterator. @@ -162,6 +171,7 @@ func (it *requestIterator) Next() bool { } group := it.series.At() it.curr = v1.Request{ + Recorder: it.recorder, Fp: model.Fingerprint(group.Fingerprint), Chks: convertToChunkRefs(group.Refs), Search: it.search, diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index 947296d5712c..1e8452ded5d6 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -7,7 +7,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/grafana/dskit/concurrency" @@ -155,11 +154,15 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks)) for _, task := range tasks { - if sp := opentracing.SpanFromContext(task.ctx); sp != nil { - md, _ := blockQuerier.Metadata() - blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md) - sp.LogKV("process block", blk.String(), "series", len(task.series)) - } + // NB(owen-d): can be helpful for debugging, but is noisy + // and don't feel like threading this through a configuration + + // if sp := opentracing.SpanFromContext(task.ctx); sp != nil { + // md, _ := blockQuerier.Metadata() + // blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md) + // blockID := blk.String() + // sp.LogKV("process block", blockID, "series", len(task.series)) + // } it := v1.NewPeekingIter(task.RequestIter(tokenizer)) iters = append(iters, it) diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index b86dbf8006b7..f9dc847f588b 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -15,6 +15,7 @@ import ( "go.uber.org/atomic" "github.com/grafana/loki/v3/pkg/logql/syntax" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/client" "github.com/grafana/loki/v3/pkg/storage/config" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" @@ -40,6 +41,10 @@ type dummyStore struct { err error } +func (s *dummyStore) BloomMetrics() *v1.Metrics { + return v1.NewMetrics(nil) +} + func (s *dummyStore) ResolveMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([][]bloomshipper.MetaRef, []*bloomshipper.Fetcher, error) { time.Sleep(s.delay) diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 0ee608cede8e..ed920072b8ca 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -1,10 +1,15 @@ package v1 import ( + "context" + "github.com/efficientgo/core/errors" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/common/model" + "go.uber.org/atomic" + + "github.com/grafana/loki/v3/pkg/util/spanlogger" ) type Request struct { @@ -12,6 +17,102 @@ type Request struct { Chks ChunkRefs Search BloomTest Response chan<- Output + Recorder *BloomRecorder +} + +// BloomRecorder records the results of a bloom search +func NewBloomRecorder(ctx context.Context, id string) *BloomRecorder { + return &BloomRecorder{ + ctx: ctx, + id: id, + seriesFound: atomic.NewInt64(0), + chunksFound: atomic.NewInt64(0), + seriesSkipped: atomic.NewInt64(0), + chunksSkipped: atomic.NewInt64(0), + seriesMissed: atomic.NewInt64(0), + chunksMissed: atomic.NewInt64(0), + chunksFiltered: atomic.NewInt64(0), + } +} + +type BloomRecorder struct { + ctx context.Context + id string + // exists in the bloom+queried + seriesFound, chunksFound *atomic.Int64 + // exists in bloom+skipped + seriesSkipped, chunksSkipped *atomic.Int64 + // not found in bloom + seriesMissed, chunksMissed *atomic.Int64 + // filtered out + chunksFiltered *atomic.Int64 +} + +func (r *BloomRecorder) Merge(other *BloomRecorder) { + r.seriesFound.Add(other.seriesFound.Load()) + r.chunksFound.Add(other.chunksFound.Load()) + r.seriesSkipped.Add(other.seriesSkipped.Load()) + r.chunksSkipped.Add(other.chunksSkipped.Load()) + r.seriesMissed.Add(other.seriesMissed.Load()) + r.chunksMissed.Add(other.chunksMissed.Load()) + r.chunksFiltered.Add(other.chunksFiltered.Load()) +} + +func (r *BloomRecorder) Report(logger log.Logger, metrics *Metrics) { + logger = spanlogger.FromContextWithFallback(r.ctx, logger) + + var ( + seriesFound = r.seriesFound.Load() + seriesSkipped = r.seriesSkipped.Load() + seriesMissed = r.seriesMissed.Load() + seriesRequested = seriesFound + seriesSkipped + seriesMissed + + chunksFound = r.chunksFound.Load() + chunksSkipped = r.chunksSkipped.Load() + chunksMissed = r.chunksMissed.Load() + chunksFiltered = r.chunksFiltered.Load() + chunksRequested = chunksFound + chunksSkipped + chunksMissed + ) + level.Debug(logger).Log( + "recorder_msg", "bloom search results", + "recorder_id", r.id, + + "recorder_series_requested", seriesRequested, + "recorder_series_found", seriesFound, + "recorder_series_skipped", seriesSkipped, + "recorder_series_missed", seriesMissed, + + "recorder_chunks_requested", chunksRequested, + "recorder_chunks_found", chunksFound, + "recorder_chunks_skipped", chunksSkipped, + "recorder_chunks_missed", chunksMissed, + "recorder_chunks_filtered", chunksFiltered, + ) + + if metrics != nil { + metrics.recorderSeries.WithLabelValues(recorderRequested).Add(float64(seriesRequested)) + metrics.recorderSeries.WithLabelValues(recorderFound).Add(float64(seriesFound)) + metrics.recorderSeries.WithLabelValues(recorderSkipped).Add(float64(seriesSkipped)) + metrics.recorderSeries.WithLabelValues(recorderMissed).Add(float64(seriesMissed)) + + metrics.recorderChunks.WithLabelValues(recorderRequested).Add(float64(chunksRequested)) + metrics.recorderChunks.WithLabelValues(recorderFound).Add(float64(chunksFound)) + metrics.recorderChunks.WithLabelValues(recorderSkipped).Add(float64(chunksSkipped)) + metrics.recorderChunks.WithLabelValues(recorderMissed).Add(float64(chunksMissed)) + metrics.recorderChunks.WithLabelValues(recorderFiltered).Add(float64(chunksFiltered)) + } +} + +func (r *BloomRecorder) record( + seriesFound, chunksFound, seriesSkipped, chunksSkipped, seriesMissed, chunksMissed, chunksFiltered int, +) { + r.seriesFound.Add(int64(seriesFound)) + r.chunksFound.Add(int64(chunksFound)) + r.seriesSkipped.Add(int64(seriesSkipped)) + r.chunksSkipped.Add(int64(chunksSkipped)) + r.seriesMissed.Add(int64(seriesMissed)) + r.chunksMissed.Add(int64(chunksMissed)) + r.chunksFiltered.Add(int64(chunksFiltered)) } // Output represents a chunk that failed to pass all searches @@ -59,8 +160,50 @@ func NewFusedQuerier(bq *BlockQuerier, inputs []PeekingIterator[Request], logger } } -func (fq *FusedQuerier) noRemovals(batch []Request, fp model.Fingerprint) { +func (fq *FusedQuerier) recordMissingFp( + batch []Request, + fp model.Fingerprint, +) { + fq.noRemovals(batch, fp, func(input Request) { + input.Recorder.record( + 0, 0, // found + 0, 0, // skipped + 1, len(input.Chks), // missed + 0, // chunks filtered + ) + }) +} + +func (fq *FusedQuerier) recordSkippedFp( + batch []Request, + fp model.Fingerprint, +) { + fq.noRemovals(batch, fp, func(input Request) { + input.Recorder.record( + 0, 0, // found + 1, len(input.Chks), // skipped + 0, 0, // missed + 0, // chunks filtered + ) + }) +} + +func (fq *FusedQuerier) noRemovals( + batch []Request, + fp model.Fingerprint, + fn func(Request), +) { for _, input := range batch { + if fp != input.Fp { + // should not happen, but log just in case + level.Error(fq.logger).Log( + "msg", "fingerprint mismatch", + "expected", fp, + "actual", input.Fp, + "block", "TODO", + ) + } + fn(input) input.Response <- Output{ Fp: fp, Removals: nil, @@ -94,7 +237,7 @@ func (fq *FusedQuerier) Run() error { if series.Fingerprint != fp { // fingerprint not found, can't remove chunks level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.series.Err()) - fq.noRemovals(nextBatch, fp) + fq.recordMissingFp(nextBatch, fp) continue } @@ -103,51 +246,49 @@ func (fq *FusedQuerier) Run() error { if skip { // could not seek to the desired bloom, // likely because the page was too large to load - fq.noRemovals(nextBatch, fp) + fq.recordSkippedFp(nextBatch, fp) continue } if !fq.bq.blooms.Next() { // fingerprint not found, can't remove chunks level.Debug(fq.logger).Log("msg", "fingerprint not found", "fp", series.Fingerprint, "err", fq.bq.blooms.Err()) - fq.noRemovals(nextBatch, fp) + fq.recordMissingFp(nextBatch, fp) continue } bloom := fq.bq.blooms.At() // test every input against this chunk for _, input := range nextBatch { - _, inBlooms := input.Chks.Compare(series.Chunks, true) - - // First, see if the search passes the series level bloom before checking for chunks individually - if !input.Search.Matches(bloom) { - // We return all the chunks that were the intersection of the query - // because they for sure do not match the search and don't - // need to be downloaded - input.Response <- Output{ - Fp: fp, - Removals: inBlooms, - } - continue - } + missing, inBlooms := input.Chks.Compare(series.Chunks, true) - // TODO(owen-d): pool - var removals ChunkRefs + var ( + // TODO(owen-d): pool + removals ChunkRefs + // TODO(salvacorts): pool tokenBuf + tokenBuf []byte + prefixLen int + ) - // TODO(salvacorts): pool tokenBuf - var tokenBuf []byte - var prefixLen int - - for _, chk := range inBlooms { - // Get buf to concatenate the chunk and search token - tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf) - if !input.Search.MatchesWithPrefixBuf(bloom, tokenBuf, prefixLen) { - removals = append(removals, chk) - continue + // First, see if the search passes the series level bloom before checking for chunks individually + if matchedSeries := input.Search.Matches(bloom); !matchedSeries { + removals = inBlooms + } else { + for _, chk := range inBlooms { + // Get buf to concatenate the chunk and search token + tokenBuf, prefixLen = prefixedToken(schema.NGramLen(), chk, tokenBuf) + if !input.Search.MatchesWithPrefixBuf(bloom, tokenBuf, prefixLen) { + removals = append(removals, chk) + } } - // Otherwise, the chunk passed all the searches } + input.Recorder.record( + 1, len(inBlooms), // found + 0, 0, // skipped + 0, len(missing), // missed + len(removals), // filtered + ) input.Response <- Output{ Fp: fp, Removals: removals, diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index b86d6259ebfa..a0dc23001e93 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -89,6 +89,7 @@ func TestFusedQuerier(t *testing.T) { for j := 0; j < n; j++ { idx := numSeries/nReqs*i + j reqs = append(reqs, Request{ + Recorder: NewBloomRecorder(context.Background(), "unknown"), Fp: data[idx].Series.Fingerprint, Chks: data[idx].Series.Chunks, Response: ch, @@ -282,6 +283,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou idx = numSeries - 1 } reqs = append(reqs, Request{ + Recorder: NewBloomRecorder(context.Background(), "unknown"), Fp: data[idx].Series.Fingerprint, Chks: data[idx].Series.Chunks, Response: ch, diff --git a/pkg/storage/bloom/v1/metrics.go b/pkg/storage/bloom/v1/metrics.go index 700acfc05c67..4c6b4cee1132 100644 --- a/pkg/storage/bloom/v1/metrics.go +++ b/pkg/storage/bloom/v1/metrics.go @@ -28,6 +28,9 @@ type Metrics struct { pagesSkipped *prometheus.CounterVec bytesRead *prometheus.CounterVec bytesSkipped *prometheus.CounterVec + + recorderSeries *prometheus.CounterVec + recorderChunks *prometheus.CounterVec } const ( @@ -52,6 +55,12 @@ const ( bloomCreationTypeIndexed = "indexed" bloomCreationTypeSkipped = "skipped" + + recorderRequested = "requested" + recorderFound = "found" + recorderSkipped = "skipped" + recorderMissed = "missed" + recorderFiltered = "filtered" ) func NewMetrics(r prometheus.Registerer) *Metrics { @@ -148,5 +157,16 @@ func NewMetrics(r prometheus.Registerer) *Metrics { Name: "bloom_bytes_skipped_total", Help: "Number of bytes skipped during query iteration", }, []string{"type", "reason"}), + + recorderSeries: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "bloom_recorder_series_total", + Help: "Number of series reported by the bloom query recorder. Type can be requested (total), found (existed in blooms), skipped (due to page too large configurations, etc), missed (not found in blooms)", + }, []string{"type"}), + recorderChunks: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Name: "bloom_recorder_chunks_total", + Help: "Number of chunks reported by the bloom query recorder. Type can be requested (total), found (existed in blooms), skipped (due to page too large configurations, etc), missed (not found in blooms), filtered (filtered out)", + }, []string{"type"}), } }