Skip to content

Commit

Permalink
chore(blooms): records more bloom iteration stats (#12889)
Browse files Browse the repository at this point in the history
(cherry picked from commit 1951322)
  • Loading branch information
owen-d authored and grafana-delivery-bot[bot] committed May 6, 2024
1 parent 93aaf29 commit 199132c
Show file tree
Hide file tree
Showing 7 changed files with 243 additions and 52 deletions.
24 changes: 17 additions & 7 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
util_log "github.com/grafana/loki/v3/pkg/util/log"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/spanlogger"
)

var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
Expand All @@ -87,7 +89,7 @@ type Gateway struct {

queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store
bloomStore bloomshipper.StoreWithMetrics

pendingTasks *atomic.Int64

Expand All @@ -109,7 +111,7 @@ func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
}

// New returns a new instance of the Bloom Gateway.
func New(cfg Config, store bloomshipper.Store, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
func New(cfg Config, store bloomshipper.StoreWithMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
utillog.WarnExperimentalUse("Bloom Gateway", logger)
g := &Gateway{
cfg: cfg,
Expand Down Expand Up @@ -203,13 +205,15 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, err
}

logger := log.With(g.logger, "tenant", tenantID)

sp, ctx := opentracing.StartSpanFromContext(ctx, "bloomgateway.FilterChunkRefs")
stats, ctx := ContextWithEmptyStats(ctx)
logger := spanlogger.FromContextWithFallback(
ctx,
util_log.WithContext(ctx, g.logger),
)

defer func() {
level.Info(logger).Log(stats.KVArgs()...)
sp.LogKV(stats.KVArgs()...)
sp.Finish()
}()

Expand Down Expand Up @@ -319,6 +323,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
preFilterChunks += len(series.Refs)
}

combinedRecorder := v1.NewBloomRecorder(ctx, "combined")
for remaining > 0 {
select {
case <-ctx.Done():
Expand All @@ -330,10 +335,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, errors.Wrap(task.Err(), "request failed")
}
responses = append(responses, task.responses)
combinedRecorder.Merge(task.recorder)
remaining--
}
}

combinedRecorder.Report(util_log.WithContext(ctx, g.logger), g.bloomStore.BloomMetrics())
sp.LogKV("msg", "received all responses")

start := time.Now()
Expand All @@ -348,7 +355,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk

postFilterSeries := len(filtered)

for _, group := range req.Refs {
for _, group := range filtered {
postFilterChunks += len(group.Refs)
}
g.metrics.requestedSeries.Observe(float64(preFilterSeries))
Expand Down Expand Up @@ -421,7 +428,10 @@ func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] {
// TODO(owen-d): improve perf. This can be faster with a more specialized impl
// NB(owen-d): `req` is mutated in place for performance, but `responses` is not
// Removals of the outputs must be sorted.
func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Output) []*logproto.GroupedChunkRefs {
func filterChunkRefs(
req *logproto.FilterChunkRefRequest,
responses [][]v1.Output,
) []*logproto.GroupedChunkRefs {
res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))

// dedupe outputs, merging the same series.
Expand Down
28 changes: 19 additions & 9 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,15 @@ type Task struct {

// log enqueue time so we can observe the time spent in the queue
enqueueTime time.Time

// recorder
recorder *v1.BloomRecorder
}

func newTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) Task {
return Task{
tenant: tenantID,
recorder: v1.NewBloomRecorder(ctx, "task"),
err: new(wrappedError),
resCh: make(chan v1.Output),
filters: filters,
Expand Down Expand Up @@ -113,6 +117,7 @@ func (t Task) CloseWithError(err error) {
// Copy returns a copy of the existing task but with a new slice of grouped chunk refs
func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
return Task{
recorder: t.recorder,
tenant: t.tenant,
err: t.err,
resCh: t.resCh,
Expand All @@ -126,22 +131,26 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
}
}

func (t Task) RequestIter(tokenizer *v1.NGramTokenizer) v1.Iterator[v1.Request] {
func (t Task) RequestIter(
tokenizer *v1.NGramTokenizer,
) v1.Iterator[v1.Request] {
return &requestIterator{
series: v1.NewSliceIter(t.series),
search: v1.FiltersToBloomTest(tokenizer, t.filters...),
channel: t.resCh,
curr: v1.Request{},
recorder: t.recorder,
series: v1.NewSliceIter(t.series),
search: v1.FiltersToBloomTest(tokenizer, t.filters...),
channel: t.resCh,
curr: v1.Request{},
}
}

var _ v1.Iterator[v1.Request] = &requestIterator{}

type requestIterator struct {
series v1.Iterator[*logproto.GroupedChunkRefs]
search v1.BloomTest
channel chan<- v1.Output
curr v1.Request
recorder *v1.BloomRecorder
series v1.Iterator[*logproto.GroupedChunkRefs]
search v1.BloomTest
channel chan<- v1.Output
curr v1.Request
}

// At implements v1.Iterator.
Expand All @@ -162,6 +171,7 @@ func (it *requestIterator) Next() bool {
}
group := it.series.At()
it.curr = v1.Request{
Recorder: it.recorder,
Fp: model.Fingerprint(group.Fingerprint),
Chks: convertToChunkRefs(group.Refs),
Search: it.search,
Expand Down
15 changes: 9 additions & 6 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"github.com/grafana/dskit/concurrency"
Expand Down Expand Up @@ -155,11 +154,15 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie
iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))

for _, task := range tasks {
if sp := opentracing.SpanFromContext(task.ctx); sp != nil {
md, _ := blockQuerier.Metadata()
blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md)
sp.LogKV("process block", blk.String(), "series", len(task.series))
}
// NB(owen-d): can be helpful for debugging, but is noisy
// and don't feel like threading this through a configuration

// if sp := opentracing.SpanFromContext(task.ctx); sp != nil {
// md, _ := blockQuerier.Metadata()
// blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md)
// blockID := blk.String()
// sp.LogKV("process block", blockID, "series", len(task.series))
// }

it := v1.NewPeekingIter(task.RequestIter(tokenizer))
iters = append(iters, it)
Expand Down
5 changes: 5 additions & 0 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/atomic"

"github.com/grafana/loki/v3/pkg/logql/syntax"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
Expand All @@ -40,6 +41,10 @@ type dummyStore struct {
err error
}

func (s *dummyStore) BloomMetrics() *v1.Metrics {
return v1.NewMetrics(nil)
}

func (s *dummyStore) ResolveMetas(_ context.Context, _ bloomshipper.MetaSearchParams) ([][]bloomshipper.MetaRef, []*bloomshipper.Fetcher, error) {
time.Sleep(s.delay)

Expand Down
Loading

0 comments on commit 199132c

Please sign in to comment.