Skip to content

Commit

Permalink
(chore) Bloom Gateway: Avoid unnecessary merging of tasks prior to pa…
Browse files Browse the repository at this point in the history
…ssing them to the FusedQuerier (#11809)

**What this PR does / why we need it**:

The `requestIterator` (`v1.Iterator[v1.Request]`) can be built directly
from the task and does not require any unnecessary merging of tasks,
since this is done by the `v1.FusedQuerier` already.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jan 29, 2024
1 parent de83ae0 commit c01a823
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 60 deletions.
79 changes: 31 additions & 48 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,64 +84,47 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
}
}

// taskMergeIterator implements v1.Iterator
type taskMergeIterator struct {
curr v1.Request
heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]]
tasks []Task
day model.Time
tokenizer *v1.NGramTokenizer
err error
func (t Task) RequestIter(tokenizer *v1.NGramTokenizer) v1.Iterator[v1.Request] {
return &requestIterator{
series: v1.NewSliceIter(t.series),
searches: convertToSearches(t.filters, tokenizer),
channel: t.ResCh,
curr: v1.Request{},
}
}

func newTaskMergeIterator(day model.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] {
it := &taskMergeIterator{
tasks: tasks,
curr: v1.Request{},
day: day,
tokenizer: tokenizer,
}
it.init()
return v1.NewPeekingIter[v1.Request](it)
var _ v1.Iterator[v1.Request] = &requestIterator{}

type requestIterator struct {
series v1.Iterator[*logproto.GroupedChunkRefs]
searches [][]byte
channel chan<- v1.Output
curr v1.Request
}

func (it *taskMergeIterator) init() {
sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks))
for i := range it.tasks {
iter := v1.NewSliceIterWithIndex(it.tasks[i].series, i)
sequences = append(sequences, iter)
}
it.heap = v1.NewHeapIterator(
func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool {
return i.Value().Fingerprint < j.Value().Fingerprint
},
sequences...,
)
it.err = nil
// At implements v1.Iterator.
func (it *requestIterator) At() v1.Request {

return it.curr
}

// Err implements v1.Iterator.
func (it *requestIterator) Err() error {
return nil
}

func (it *taskMergeIterator) Next() bool {
ok := it.heap.Next()
// Next implements v1.Iterator.
func (it *requestIterator) Next() bool {
ok := it.series.Next()
if !ok {
return false
}

group := it.heap.At()
task := it.tasks[group.Index()]

group := it.series.At()
it.curr = v1.Request{
Fp: model.Fingerprint(group.Value().Fingerprint),
Chks: convertToChunkRefs(group.Value().Refs),
Searches: convertToSearches(task.filters, it.tokenizer),
Response: task.ResCh,
Fp: model.Fingerprint(group.Fingerprint),
Chks: convertToChunkRefs(group.Refs),
Searches: it.searches,
Response: it.channel,
}
return true
}

func (it *taskMergeIterator) At() v1.Request {
return it.curr
}

func (it *taskMergeIterator) Err() error {
return it.err
}
24 changes: 18 additions & 6 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package bloomgateway

import (
"math"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
)

Expand All @@ -32,7 +34,6 @@ func TestTask(t *testing.T) {
from, through := task.Bounds()
require.Equal(t, ts.Add(-1*time.Hour), from)
require.Equal(t, ts, through)
require.Equal(t, truncateDay(ts), task.day)
})
}

Expand All @@ -50,14 +51,18 @@ func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.F
return tasks
}

func TestTaskMergeIterator(t *testing.T) {
func TestTask_RequestIterator(t *testing.T) {
ts := mktime("2024-01-24 12:00")
day := truncateDay(ts)
tenant := "fake"
tokenizer := v1.NewNGramTokenizer(4, 0)

t.Run("empty requests result in empty iterator", func(t *testing.T) {
it := newTaskMergeIterator(day, tokenizer)
t.Run("empty request yields empty iterator", func(t *testing.T) {
swb := seriesWithBounds{
bounds: model.Interval{Start: 0, End: math.MaxInt64},
series: []*logproto.GroupedChunkRefs{},
}
task, _ := NewTask(tenant, swb, []syntax.LineFilter{})
it := task.RequestIter(tokenizer)
// nothing to iterate over
require.False(t, it.Next())
})
Expand Down Expand Up @@ -97,7 +102,14 @@ func TestTaskMergeIterator(t *testing.T) {
}

tasks := createTasksForRequests(t, tenant, r1, r2, r3)
it := newTaskMergeIterator(day, tokenizer, tasks...)

iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))
for _, task := range tasks {
iters = append(iters, v1.NewPeekingIter(task.RequestIter(tokenizer)))
}

// merge the request iterators using the heap sort iterator
it := v1.NewHeapIterator[v1.Request](func(r1, r2 v1.Request) bool { return r1.Fp < r2.Fp }, iters...)

// first item
require.True(t, it.Next())
Expand Down
16 changes: 10 additions & 6 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (w *worker) running(ctx context.Context) error {
blockRefs = append(blockRefs, b.blockRef)
}

err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, tasksForBlocks)
err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, blockRefs, tasksForBlocks)
if err != nil {
for _, t := range tasks {
t.ErrCh <- err
Expand All @@ -215,26 +215,30 @@ func (w *worker) stopping(err error) error {
return nil
}

func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day model.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error {
return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
return w.processBlock(bq, day, b.tasks)
return w.processBlock(bq, b.tasks)
}
}
return nil
})
}

func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day model.Time, tasks []Task) error {
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, tasks []Task) error {
schema, err := blockQuerier.Schema()
if err != nil {
return err
}

tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0)
it := newTaskMergeIterator(day, tokenizer, tasks...)
fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it})
iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks))
for _, task := range tasks {
it := v1.NewPeekingIter(task.RequestIter(tokenizer))
iters = append(iters, it)
}
fq := blockQuerier.Fuse(iters)

start := time.Now()
err = fq.Run()
Expand Down

0 comments on commit c01a823

Please sign in to comment.