From 5168b07eb39bf52ae0fdcc3bc9c428224f7f23be Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 24 Jan 2024 17:49:11 +0100 Subject: [PATCH] fixup! Bloom Gateway: Partition requests into multiple tasks Signed-off-by: Christian Haudum --- pkg/bloomgateway/bloomgateway.go | 3 +-- pkg/bloomgateway/worker.go | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index ed33eb3097c7a..6f81bbe6335d0 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -340,8 +340,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk expectedResponses += len(seriesWithBounds.series) } - // no chunks in bounds => empty response - if req.From.Equal(req.Through) { + if len(tasks) == 0 { return &logproto.FilterChunkRefResponse{ ChunkRefs: []*logproto.GroupedChunkRefs{}, }, nil diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index b5e5cd9c08e48..e2146ed9fae72 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -180,13 +180,13 @@ func (w *worker) running(ctx context.Context) error { continue } - boundedRefs := partitionFingerprintRange(tasks, blockRefs) + tasksForBlocks := partitionFingerprintRange(tasks, blockRefs) blockRefs = blockRefs[:0] - for _, b := range boundedRefs { + for _, b := range tasksForBlocks { blockRefs = append(blockRefs, b.blockRef) } - err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, boundedRefs) + err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, tasksForBlocks) if err != nil { for _, t := range tasks { t.ErrCh <- err