diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 120e6da26f8b..a23cb91d200c 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -84,64 +84,47 @@ func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task { } } -// taskMergeIterator implements v1.Iterator -type taskMergeIterator struct { - curr v1.Request - heap *v1.HeapIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]] - tasks []Task - day model.Time - tokenizer *v1.NGramTokenizer - err error +func (t Task) RequestIter(tokenizer *v1.NGramTokenizer) v1.Iterator[v1.Request] { + return &requestIterator{ + series: v1.NewSliceIter(t.series), + searches: convertToSearches(t.filters, tokenizer), + channel: t.ResCh, + curr: v1.Request{}, + } } -func newTaskMergeIterator(day model.Time, tokenizer *v1.NGramTokenizer, tasks ...Task) v1.PeekingIterator[v1.Request] { - it := &taskMergeIterator{ - tasks: tasks, - curr: v1.Request{}, - day: day, - tokenizer: tokenizer, - } - it.init() - return v1.NewPeekingIter[v1.Request](it) +var _ v1.Iterator[v1.Request] = &requestIterator{} + +type requestIterator struct { + series v1.Iterator[*logproto.GroupedChunkRefs] + searches [][]byte + channel chan<- v1.Output + curr v1.Request } -func (it *taskMergeIterator) init() { - sequences := make([]v1.PeekingIterator[v1.IndexedValue[*logproto.GroupedChunkRefs]], 0, len(it.tasks)) - for i := range it.tasks { - iter := v1.NewSliceIterWithIndex(it.tasks[i].series, i) - sequences = append(sequences, iter) - } - it.heap = v1.NewHeapIterator( - func(i, j v1.IndexedValue[*logproto.GroupedChunkRefs]) bool { - return i.Value().Fingerprint < j.Value().Fingerprint - }, - sequences..., - ) - it.err = nil +// At implements v1.Iterator. +func (it *requestIterator) At() v1.Request { + + return it.curr +} + +// Err implements v1.Iterator. +func (it *requestIterator) Err() error { + return nil } -func (it *taskMergeIterator) Next() bool { - ok := it.heap.Next() +// Next implements v1.Iterator. +func (it *requestIterator) Next() bool { + ok := it.series.Next() if !ok { return false } - - group := it.heap.At() - task := it.tasks[group.Index()] - + group := it.series.At() it.curr = v1.Request{ - Fp: model.Fingerprint(group.Value().Fingerprint), - Chks: convertToChunkRefs(group.Value().Refs), - Searches: convertToSearches(task.filters, it.tokenizer), - Response: task.ResCh, + Fp: model.Fingerprint(group.Fingerprint), + Chks: convertToChunkRefs(group.Refs), + Searches: it.searches, + Response: it.channel, } return true } - -func (it *taskMergeIterator) At() v1.Request { - return it.curr -} - -func (it *taskMergeIterator) Err() error { - return it.err -} diff --git a/pkg/bloomgateway/multiplexing_test.go b/pkg/bloomgateway/multiplexing_test.go index 67277d60f232..2215b1ed1b5d 100644 --- a/pkg/bloomgateway/multiplexing_test.go +++ b/pkg/bloomgateway/multiplexing_test.go @@ -1,6 +1,7 @@ package bloomgateway import ( + "math" "testing" "time" @@ -8,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" v1 "github.com/grafana/loki/pkg/storage/bloom/v1" ) @@ -32,7 +34,6 @@ func TestTask(t *testing.T) { from, through := task.Bounds() require.Equal(t, ts.Add(-1*time.Hour), from) require.Equal(t, ts, through) - require.Equal(t, truncateDay(ts), task.day) }) } @@ -50,14 +51,18 @@ func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.F return tasks } -func TestTaskMergeIterator(t *testing.T) { +func TestTask_RequestIterator(t *testing.T) { ts := mktime("2024-01-24 12:00") - day := truncateDay(ts) tenant := "fake" tokenizer := v1.NewNGramTokenizer(4, 0) - t.Run("empty requests result in empty iterator", func(t *testing.T) { - it := newTaskMergeIterator(day, tokenizer) + t.Run("empty request yields empty iterator", func(t *testing.T) { + swb := seriesWithBounds{ + bounds: model.Interval{Start: 0, End: math.MaxInt64}, + series: []*logproto.GroupedChunkRefs{}, + } + task, _ := NewTask(tenant, swb, []syntax.LineFilter{}) + it := task.RequestIter(tokenizer) // nothing to iterate over require.False(t, it.Next()) }) @@ -97,7 +102,14 @@ func TestTaskMergeIterator(t *testing.T) { } tasks := createTasksForRequests(t, tenant, r1, r2, r3) - it := newTaskMergeIterator(day, tokenizer, tasks...) + + iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks)) + for _, task := range tasks { + iters = append(iters, v1.NewPeekingIter(task.RequestIter(tokenizer))) + } + + // merge the request iterators using the heap sort iterator + it := v1.NewHeapIterator[v1.Request](func(r1, r2 v1.Request) bool { return r1.Fp < r2.Fp }, iters...) // first item require.True(t, it.Next()) diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index 69f7859e6481..8ee8594f2e27 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -192,7 +192,7 @@ func (w *worker) running(ctx context.Context) error { blockRefs = append(blockRefs, b.blockRef) } - err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, day, blockRefs, tasksForBlocks) + err = w.processBlocksWithCallback(taskCtx, tasks[0].Tenant, blockRefs, tasksForBlocks) if err != nil { for _, t := range tasks { t.ErrCh <- err @@ -215,26 +215,30 @@ func (w *worker) stopping(err error) error { return nil } -func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day model.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error { +func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error { return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error { for _, b := range boundedRefs { if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp { - return w.processBlock(bq, day, b.tasks) + return w.processBlock(bq, b.tasks) } } return nil }) } -func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day model.Time, tasks []Task) error { +func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, tasks []Task) error { schema, err := blockQuerier.Schema() if err != nil { return err } tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0) - it := newTaskMergeIterator(day, tokenizer, tasks...) - fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it}) + iters := make([]v1.PeekingIterator[v1.Request], 0, len(tasks)) + for _, task := range tasks { + it := v1.NewPeekingIter(task.RequestIter(tokenizer)) + iters = append(iters, it) + } + fq := blockQuerier.Fuse(iters) start := time.Now() err = fq.Run()