Skip to content

Commit

Permalink
chore(blooms): Remove ID field from task struct (#12851)
Browse files Browse the repository at this point in the history
We've seen a few cases where creating the ULID failed for unknown reasons, and the ID is not really used. It was only useful early on in the development for debugging.

Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored May 2, 2024
1 parent 7bbd8b5 commit 48bbf98
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ require (
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/ncw/swift v1.0.53
github.com/oklog/run v1.1.0
github.com/oklog/ulid v1.3.1
github.com/oklog/ulid v1.3.1 // indirect
github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e
github.com/opentracing-contrib/go-stdlib v1.0.0
github.com/opentracing/opentracing-go v1.2.0
Expand Down
7 changes: 1 addition & 6 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,10 +278,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
tasks := make([]Task, 0, len(seriesByDay))
responses := make([][]v1.Output, 0, len(seriesByDay))
for _, seriesForDay := range seriesByDay {
task, err := NewTask(ctx, tenantID, seriesForDay, filters, blocks)
if err != nil {
return nil, err
}
task := newTask(ctx, tenantID, seriesForDay, filters, blocks)
// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))
tasks = append(tasks, task)
Expand All @@ -298,7 +295,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
for _, task := range tasks {
task := task
task.enqueueTime = time.Now()
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))

// TODO(owen-d): gracefully handle full queues
if err := g.queue.Enqueue(tenantID, nil, task, func() {
Expand Down Expand Up @@ -329,7 +325,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
stats.Status = "cancel"
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
level.Info(logger).Log("msg", "task done", "task", task.ID, "err", task.Err())
if task.Err() != nil {
stats.Status = labelFailure
return nil, errors.Wrap(task.Err(), "request failed")
Expand Down
31 changes: 6 additions & 25 deletions pkg/bloomgateway/multiplexing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package bloomgateway

import (
"context"
"math/rand"
"sync"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/common/model"

"github.com/grafana/loki/v3/pkg/logproto"
Expand All @@ -20,10 +18,6 @@ const (
Day = 24 * time.Hour
)

var (
entropy = rand.New(rand.NewSource(time.Now().UnixNano()))
)

type tokenSettings struct {
nGramLen int
}
Expand All @@ -45,10 +39,8 @@ func (e *wrappedError) Set(err error) {

// Task is the data structure that is enqueued to the internal queue and dequeued by query workers
type Task struct {
// ID is a lexcographically sortable unique identifier of the task
ID ulid.ULID
// Tenant is the tenant ID
Tenant string
// tenant is the tenant ID
tenant string

// channel to write partial responses to
resCh chan v1.Output
Expand Down Expand Up @@ -79,18 +71,9 @@ type Task struct {
enqueueTime time.Time
}

// NewTask returns a new Task that can be enqueued to the task queue.
// In addition, it returns a result and an error channel, as well
// as an error if the instantiation fails.
func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) (Task, error) {
key, err := ulid.New(ulid.Now(), entropy)
if err != nil {
return Task{}, err
}

task := Task{
ID: key,
Tenant: tenantID,
func newTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) Task {
return Task{
tenant: tenantID,
err: new(wrappedError),
resCh: make(chan v1.Output),
filters: filters,
Expand All @@ -101,7 +84,6 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filt
ctx: ctx,
done: make(chan struct{}),
}
return task, nil
}

// Bounds implements Bounded
Expand Down Expand Up @@ -130,9 +112,8 @@ func (t Task) CloseWithError(err error) {

// Copy returns a copy of the existing task but with a new slice of grouped chunk refs
func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task {
// do not copy ID to distinguish it as copied task
return Task{
Tenant: t.Tenant,
tenant: t.tenant,
err: t.err,
resCh: t.resCh,
filters: t.filters,
Expand Down
8 changes: 3 additions & 5 deletions pkg/bloomgateway/multiplexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func TestTask(t *testing.T) {
},
}
swb := partitionRequest(req)[0]
task, err := NewTask(context.Background(), "tenant", swb, nil, nil)
require.NoError(t, err)
task := newTask(context.Background(), "tenant", swb, nil, nil)
from, through := task.Bounds()
require.Equal(t, ts.Add(-1*time.Hour), from)
require.Equal(t, ts, through)
Expand All @@ -45,8 +44,7 @@ func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.F
tasks := make([]Task, 0, len(requests))
for _, r := range requests {
for _, swb := range partitionRequest(r) {
task, err := NewTask(context.Background(), tenant, swb, nil, nil)
require.NoError(t, err)
task := newTask(context.Background(), tenant, swb, nil, nil)
tasks = append(tasks, task)
}
}
Expand All @@ -63,7 +61,7 @@ func TestTask_RequestIterator(t *testing.T) {
interval: bloomshipper.Interval{Start: 0, End: math.MaxInt64},
series: []*logproto.GroupedChunkRefs{},
}
task, _ := NewTask(context.Background(), tenant, swb, []syntax.LineFilterExpr{}, nil)
task := newTask(context.Background(), tenant, swb, []syntax.LineFilterExpr{}, nil)
it := task.RequestIter(tokenizer)
// nothing to iterate over
require.False(t, it.Next())
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (p *processor) run(ctx context.Context, tasks []Task) error {
}

func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.MultiFingerprintBounds) error {
tenant := tasks[0].Tenant
tenant := tasks[0].tenant
level.Info(p.logger).Log(
"msg", "process tasks with bounds",
"tenant", tenant,
Expand Down Expand Up @@ -157,7 +157,7 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie
for _, task := range tasks {
if sp := opentracing.SpanFromContext(task.ctx); sp != nil {
md, _ := blockQuerier.Metadata()
blk := bloomshipper.BlockRefFrom(task.Tenant, task.table.String(), md)
blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md)
sp.LogKV("process block", blk.String(), "series", len(task.series))
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomgateway/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestProcessor(t *testing.T) {
}

t.Log("series", len(swb.series))
task, _ := NewTask(ctx, "fake", swb, filters, nil)
task := newTask(ctx, "fake", swb, filters, nil)
tasks := []Task{task}

results := atomic.NewInt64(0)
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestProcessor(t *testing.T) {
}

t.Log("series", len(swb.series))
task, _ := NewTask(ctx, "fake", swb, filters, blocks)
task := newTask(ctx, "fake", swb, filters, blocks)
tasks := []Task{task}

results := atomic.NewInt64(0)
Expand Down Expand Up @@ -227,7 +227,7 @@ func TestProcessor(t *testing.T) {
}

t.Log("series", len(swb.series))
task, _ := NewTask(ctx, "fake", swb, filters, nil)
task := newTask(ctx, "fake", swb, filters, nil)
tasks := []Task{task}

results := atomic.NewInt64(0)
Expand Down
1 change: 0 additions & 1 deletion pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (w *worker) running(_ context.Context) error {
w.queue.ReleaseRequests(items)
return errors.Errorf("failed to cast dequeued item to Task: %v", item)
}
level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID)
_ = w.pending.Dec()
w.metrics.queueDuration.WithLabelValues(w.id).Observe(time.Since(task.enqueueTime).Seconds())
FromContext(task.ctx).AddQueueTime(time.Since(task.enqueueTime))
Expand Down

0 comments on commit 48bbf98

Please sign in to comment.