From 7bbd8b5087d637ac592403c5daafda35353fe13d Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 1 May 2024 16:19:09 -0400 Subject: [PATCH 1/6] feat(blooms): ingester aware bounded impl (#12840) --- pkg/loghttp/query.go | 3 + pkg/storage/async_store.go | 105 ++++++++++++++++++ pkg/storage/async_store_test.go | 85 +++++++++++++- .../indexshipper/tsdb/sharding/power.go | 27 +---- pkg/validation/limits.go | 4 + pkg/validation/limits_test.go | 1 + 6 files changed, 202 insertions(+), 23 deletions(-) diff --git a/pkg/loghttp/query.go b/pkg/loghttp/query.go index 8b135602b7f0..89ad4e00a79c 100644 --- a/pkg/loghttp/query.go +++ b/pkg/loghttp/query.go @@ -537,6 +537,9 @@ func ParseIndexShardsQuery(r *http.Request) (*RangeQuery, datasize.ByteSize, err return nil, 0, err } targetBytes, err := parseBytes(r, "targetBytesPerShard", true) + if targetBytes <= 0 { + return nil, 0, errors.New("targetBytesPerShard must be a positive value") + } return parsed, targetBytes, err } diff --git a/pkg/storage/async_store.go b/pkg/storage/async_store.go index 8d104d702b8b..ed3c9dab6b42 100644 --- a/pkg/storage/async_store.go +++ b/pkg/storage/async_store.go @@ -5,12 +5,15 @@ import ( "fmt" "time" + "github.com/c2h5oh/datasize" "github.com/opentracing/opentracing-go" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/storage/stores" "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/prometheus/common/model" @@ -281,3 +284,105 @@ func filterDuplicateChunks(scfg config.SchemaConfig, storeChunks [][]chunk.Chunk return filteredChunkIDs } + +func (a *AsyncStore) GetShards( + ctx context.Context, + userID string, + from, through model.Time, + targetBytesPerShard uint64, + predicate chunk.Predicate, +) (*logproto.ShardsResponse, error) { + logger := log.With( + util_log.WithContext(ctx, util_log.Logger), + "component", "asyncStore", + ) + + if !a.shouldQueryIngesters(through, model.Now()) { + return a.Store.GetShards(ctx, userID, from, through, targetBytesPerShard, predicate) + } + + var ( + shardResp *logproto.ShardsResponse + statsResp *stats.Stats + ) + + jobs := []func() error{ + func() error { + var err error + shardResp, err = a.Store.GetShards(ctx, userID, from, through, targetBytesPerShard, predicate) + return err + }, + // We can't dedupe shards by their contents, so we complement the + // store's response with the ingester's stats and . + func() error { + var err error + statsResp, err = a.ingesterQuerier.Stats(ctx, userID, from, through, predicate.Matchers...) + return err + }, + } + + if err := concurrency.ForEachJob( + ctx, + len(jobs), + len(jobs), + func(ctx context.Context, i int) error { + return jobs[i]() + }, + ); err != nil { + return nil, err + } + + return mergeShardsFromIngestersAndStore(logger, shardResp, statsResp, targetBytesPerShard), nil +} + +func mergeShardsFromIngestersAndStore( + logger log.Logger, + storeResp *logproto.ShardsResponse, + statsResp *logproto.IndexStatsResponse, + targetBytesPerShard uint64, +) *logproto.ShardsResponse { + var storeBytes uint64 + for _, shard := range storeResp.Shards { + storeBytes += shard.Stats.Bytes + } + totalBytes := storeBytes + statsResp.Bytes + + defer func() { + level.Debug(logger).Log( + "msg", "resolved shards ", + "ingester_bytes", datasize.ByteSize(statsResp.Bytes).HumanReadable(), + "store_bytes", datasize.ByteSize(storeBytes).HumanReadable(), + "total_bytes", datasize.ByteSize(totalBytes).HumanReadable(), + "target_bytes", datasize.ByteSize(targetBytesPerShard).HumanReadable(), + "store_shards", len(storeResp.Shards), + ) + }() + + // edge case to avoid divide by zero later + if totalBytes == 0 { + return &logproto.ShardsResponse{ + Shards: sharding.LinearShards(0, 0), + } + } + + // If the ingesters don't have enough data to meaningfuly + // change the number of shards, use the store response. + if pct := float64(statsResp.Bytes) / float64(totalBytes); pct < 0.25 { + return storeResp + } + + shards := sharding.LinearShards(int(totalBytes/targetBytesPerShard), totalBytes) + + // increment the total chunks by the number seen from ingesters + // NB(owen-d): this isn't perfect as it mixes signals a bit by joining + // store chunks which _could_ possibly be filtered with ingester chunks which can't, + // but it's still directionally helpful + updatedStats := storeResp.Statistics + updatedStats.Index.TotalChunks += int64(statsResp.Chunks) + return &logproto.ShardsResponse{ + Shards: shards, + Statistics: updatedStats, + // explicitly nil chunkgroups when we've changed the shards+included chunkrefs from ingesters + ChunkGroups: nil, + } +} diff --git a/pkg/storage/async_store_test.go b/pkg/storage/async_store_test.go index a85b33ecccef..9cf80868c861 100644 --- a/pkg/storage/async_store_test.go +++ b/pkg/storage/async_store_test.go @@ -5,7 +5,10 @@ import ( "testing" "time" + "github.com/go-kit/log" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -15,6 +18,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/chunk" "github.com/grafana/loki/v3/pkg/storage/chunk/fetcher" "github.com/grafana/loki/v3/pkg/storage/config" + "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/v3/pkg/util" ) @@ -29,8 +33,8 @@ func newStoreMock() *storeMock { return &storeMock{} } -func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, predicate chunk.Predicate, storeChunksOverride *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { - args := s.Called(ctx, userID, from, through, predicate, storeChunksOverride) +func (s *storeMock) GetChunks(ctx context.Context, userID string, from, through model.Time, predicate chunk.Predicate, overrides *logproto.ChunkRefGroup) ([][]chunk.Chunk, []*fetcher.Fetcher, error) { + args := s.Called(ctx, userID, from, through, predicate, overrides) return args.Get(0).([][]chunk.Chunk), args.Get(1).([]*fetcher.Fetcher), args.Error(2) } @@ -360,3 +364,80 @@ func convertChunksToChunkIDs(s config.SchemaConfig, chunks []chunk.Chunk) []stri return chunkIDs } + +func TestMergeShardsFromIngestersAndStore(t *testing.T) { + mkStats := func(bytes, chks uint64) logproto.IndexStatsResponse { + return logproto.IndexStatsResponse{ + Bytes: bytes, + Chunks: chks, + } + } + + // creates n shards with bytesPerShard * n bytes and chks chunks + mkShards := func(n int, bytesPerShard uint64, chks int64) logproto.ShardsResponse { + return logproto.ShardsResponse{ + Shards: sharding.LinearShards(n, bytesPerShard*uint64(n)), + Statistics: stats.Result{ + Index: stats.Index{ + TotalChunks: chks, + }, + }, + } + } + + targetBytesPerShard := 10 + + for _, tc := range []struct { + desc string + ingester logproto.IndexStatsResponse + store logproto.ShardsResponse + exp logproto.ShardsResponse + }{ + { + desc: "zero bytes returns one full shard", + ingester: mkStats(0, 0), + store: mkShards(0, 0, 0), + exp: mkShards(1, 0, 0), + }, + { + desc: "zero ingester bytes honors store", + ingester: mkStats(0, 0), + store: mkShards(10, uint64(targetBytesPerShard), 10), + exp: mkShards(10, uint64(targetBytesPerShard), 10), + }, + { + desc: "zero store bytes honors ingester", + ingester: mkStats(uint64(targetBytesPerShard*10), 10), + store: mkShards(0, 0, 0), + exp: mkShards(10, uint64(targetBytesPerShard), 10), + }, + { + desc: "ingester bytes below threshold ignored", + ingester: mkStats(uint64(targetBytesPerShard*2), 10), // 2 shards worth from ingesters + store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store + exp: mkShards(10, uint64(targetBytesPerShard), 10), // use the store's resp + }, + { + desc: "ingester bytes above threshold recreate shards", + ingester: mkStats(uint64(targetBytesPerShard*4), 10), // 4 shards worth from ingesters + store: mkShards(10, uint64(targetBytesPerShard), 10), // 10 shards worth from store + exp: mkShards(14, uint64(targetBytesPerShard), 20), // regenerate 14 shards + }, + } { + + t.Run(tc.desc, func(t *testing.T) { + got := mergeShardsFromIngestersAndStore( + log.NewNopLogger(), + &tc.store, + &tc.ingester, + uint64(targetBytesPerShard), + ) + require.Equal(t, tc.exp.Statistics, got.Statistics) + require.Equal(t, tc.exp.ChunkGroups, got.ChunkGroups) + require.Equal(t, tc.exp.Statistics.Index.TotalChunks, got.Statistics.Index.TotalChunks) + for i, shard := range tc.exp.Shards { + require.Equal(t, shard, got.Shards[i], "shard %d", i) + } + }) + } +} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go b/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go index 257c198ee2d7..219563e0e535 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/sharding/power.go @@ -22,21 +22,7 @@ type PowerOfTwoSharding struct { func (p PowerOfTwoSharding) ShardsFor(bytes uint64, maxBytesPerShard uint64) []logproto.Shard { factor := GuessShardFactor(bytes, maxBytesPerShard, p.MaxShards) - - if factor < 2 { - return []logproto.Shard{{ - Bounds: logproto.FPBounds{ - Min: 0, - Max: math.MaxUint64, - }, - Stats: &stats.Stats{ - Bytes: bytes, - }, - }} - } - return LinearShards(factor, bytes) - } // LinearShards is a sharding implementation that splits the data into @@ -71,14 +57,13 @@ func LinearShards(n int, bytes uint64) []logproto.Shard { Bytes: bytesPerShard, }, } - - // The last shard should have the remainder of the bytes - // and the max bound should be math.MaxUint64 - // NB(owen-d): this can only happen when maxShards is used - // and the maxShards isn't a factor of 2 - shards[len(shards)-1].Stats.Bytes += bytes % uint64(n) - shards[len(shards)-1].Bounds.Max = math.MaxUint64 } + // The last shard should have the remainder of the bytes + // and the max bound should be math.MaxUint64 + // NB(owen-d): this can only happen when maxShards is used + // and the maxShards isn't a factor of 2 + shards[len(shards)-1].Stats.Bytes += bytes % uint64(n) + shards[len(shards)-1].Bounds.Max = math.MaxUint64 return shards diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 015919783164..036f5660c092 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -480,6 +480,10 @@ func (l *Limits) Validate() error { return err } + if l.TSDBMaxBytesPerShard <= 0 { + return errors.New("querier.tsdb-max-bytes-per-shard must be greater than 0") + } + return nil } diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 598a6f9033cd..2d4457c2a119 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -345,6 +345,7 @@ func TestLimitsValidation(t *testing.T) { desc := fmt.Sprintf("%s/%s", tc.limits.DeletionMode, tc.limits.BloomBlockEncoding) t.Run(desc, func(t *testing.T) { tc.limits.TSDBShardingStrategy = logql.PowerOfTwoVersion.String() // hacky but needed for test + tc.limits.TSDBMaxBytesPerShard = DefaultTSDBMaxBytesPerShard if tc.expected == nil { require.NoError(t, tc.limits.Validate()) } else { From 48bbf983d0beb79c82a5d90b7de8abcfb73f1645 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 2 May 2024 09:01:58 +0200 Subject: [PATCH 2/6] chore(blooms): Remove ID field from task struct (#12851) We've seen a few cases where creating the ULID failed for unknown reasons, and the ID is not really used. It was only useful early on in the development for debugging. Signed-off-by: Christian Haudum --- go.mod | 2 +- pkg/bloomgateway/bloomgateway.go | 7 +----- pkg/bloomgateway/multiplexing.go | 31 ++++++--------------------- pkg/bloomgateway/multiplexing_test.go | 8 +++---- pkg/bloomgateway/processor.go | 4 ++-- pkg/bloomgateway/processor_test.go | 6 +++--- pkg/bloomgateway/worker.go | 1 - 7 files changed, 16 insertions(+), 43 deletions(-) diff --git a/go.mod b/go.mod index 67c1302ab124..06969fdeed2e 100644 --- a/go.mod +++ b/go.mod @@ -75,7 +75,7 @@ require ( github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f github.com/ncw/swift v1.0.53 github.com/oklog/run v1.1.0 - github.com/oklog/ulid v1.3.1 + github.com/oklog/ulid v1.3.1 // indirect github.com/opentracing-contrib/go-grpc v0.0.0-20210225150812-73cb765af46e github.com/opentracing-contrib/go-stdlib v1.0.0 github.com/opentracing/opentracing-go v1.2.0 diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index 487b87bac352..b4ccd0bee075 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -278,10 +278,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk tasks := make([]Task, 0, len(seriesByDay)) responses := make([][]v1.Output, 0, len(seriesByDay)) for _, seriesForDay := range seriesByDay { - task, err := NewTask(ctx, tenantID, seriesForDay, filters, blocks) - if err != nil { - return nil, err - } + task := newTask(ctx, tenantID, seriesForDay, filters, blocks) // TODO(owen-d): include capacity in constructor? task.responses = responsesPool.Get(len(seriesForDay.series)) tasks = append(tasks, task) @@ -298,7 +295,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk for _, task := range tasks { task := task task.enqueueTime = time.Now() - level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series)) // TODO(owen-d): gracefully handle full queues if err := g.queue.Enqueue(tenantID, nil, task, func() { @@ -329,7 +325,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk stats.Status = "cancel" return nil, errors.Wrap(ctx.Err(), "request failed") case task := <-tasksCh: - level.Info(logger).Log("msg", "task done", "task", task.ID, "err", task.Err()) if task.Err() != nil { stats.Status = labelFailure return nil, errors.Wrap(task.Err(), "request failed") diff --git a/pkg/bloomgateway/multiplexing.go b/pkg/bloomgateway/multiplexing.go index 84516788d001..b3d80d4a0fb0 100644 --- a/pkg/bloomgateway/multiplexing.go +++ b/pkg/bloomgateway/multiplexing.go @@ -2,11 +2,9 @@ package bloomgateway import ( "context" - "math/rand" "sync" "time" - "github.com/oklog/ulid" "github.com/prometheus/common/model" "github.com/grafana/loki/v3/pkg/logproto" @@ -20,10 +18,6 @@ const ( Day = 24 * time.Hour ) -var ( - entropy = rand.New(rand.NewSource(time.Now().UnixNano())) -) - type tokenSettings struct { nGramLen int } @@ -45,10 +39,8 @@ func (e *wrappedError) Set(err error) { // Task is the data structure that is enqueued to the internal queue and dequeued by query workers type Task struct { - // ID is a lexcographically sortable unique identifier of the task - ID ulid.ULID - // Tenant is the tenant ID - Tenant string + // tenant is the tenant ID + tenant string // channel to write partial responses to resCh chan v1.Output @@ -79,18 +71,9 @@ type Task struct { enqueueTime time.Time } -// NewTask returns a new Task that can be enqueued to the task queue. -// In addition, it returns a result and an error channel, as well -// as an error if the instantiation fails. -func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) (Task, error) { - key, err := ulid.New(ulid.Now(), entropy) - if err != nil { - return Task{}, err - } - - task := Task{ - ID: key, - Tenant: tenantID, +func newTask(ctx context.Context, tenantID string, refs seriesWithInterval, filters []syntax.LineFilterExpr, blocks []bloomshipper.BlockRef) Task { + return Task{ + tenant: tenantID, err: new(wrappedError), resCh: make(chan v1.Output), filters: filters, @@ -101,7 +84,6 @@ func NewTask(ctx context.Context, tenantID string, refs seriesWithInterval, filt ctx: ctx, done: make(chan struct{}), } - return task, nil } // Bounds implements Bounded @@ -130,9 +112,8 @@ func (t Task) CloseWithError(err error) { // Copy returns a copy of the existing task but with a new slice of grouped chunk refs func (t Task) Copy(series []*logproto.GroupedChunkRefs) Task { - // do not copy ID to distinguish it as copied task return Task{ - Tenant: t.Tenant, + tenant: t.tenant, err: t.err, resCh: t.resCh, filters: t.filters, diff --git a/pkg/bloomgateway/multiplexing_test.go b/pkg/bloomgateway/multiplexing_test.go index a64ac6cbe404..5f71d3c9623d 100644 --- a/pkg/bloomgateway/multiplexing_test.go +++ b/pkg/bloomgateway/multiplexing_test.go @@ -31,8 +31,7 @@ func TestTask(t *testing.T) { }, } swb := partitionRequest(req)[0] - task, err := NewTask(context.Background(), "tenant", swb, nil, nil) - require.NoError(t, err) + task := newTask(context.Background(), "tenant", swb, nil, nil) from, through := task.Bounds() require.Equal(t, ts.Add(-1*time.Hour), from) require.Equal(t, ts, through) @@ -45,8 +44,7 @@ func createTasksForRequests(t *testing.T, tenant string, requests ...*logproto.F tasks := make([]Task, 0, len(requests)) for _, r := range requests { for _, swb := range partitionRequest(r) { - task, err := NewTask(context.Background(), tenant, swb, nil, nil) - require.NoError(t, err) + task := newTask(context.Background(), tenant, swb, nil, nil) tasks = append(tasks, task) } } @@ -63,7 +61,7 @@ func TestTask_RequestIterator(t *testing.T) { interval: bloomshipper.Interval{Start: 0, End: math.MaxInt64}, series: []*logproto.GroupedChunkRefs{}, } - task, _ := NewTask(context.Background(), tenant, swb, []syntax.LineFilterExpr{}, nil) + task := newTask(context.Background(), tenant, swb, []syntax.LineFilterExpr{}, nil) it := task.RequestIter(tokenizer) // nothing to iterate over require.False(t, it.Next()) diff --git a/pkg/bloomgateway/processor.go b/pkg/bloomgateway/processor.go index d94b305a9b26..947296d5712c 100644 --- a/pkg/bloomgateway/processor.go +++ b/pkg/bloomgateway/processor.go @@ -40,7 +40,7 @@ func (p *processor) run(ctx context.Context, tasks []Task) error { } func (p *processor) runWithBounds(ctx context.Context, tasks []Task, bounds v1.MultiFingerprintBounds) error { - tenant := tasks[0].Tenant + tenant := tasks[0].tenant level.Info(p.logger).Log( "msg", "process tasks with bounds", "tenant", tenant, @@ -157,7 +157,7 @@ func (p *processor) processBlock(_ context.Context, blockQuerier *v1.BlockQuerie for _, task := range tasks { if sp := opentracing.SpanFromContext(task.ctx); sp != nil { md, _ := blockQuerier.Metadata() - blk := bloomshipper.BlockRefFrom(task.Tenant, task.table.String(), md) + blk := bloomshipper.BlockRefFrom(task.tenant, task.table.String(), md) sp.LogKV("process block", blk.String(), "series", len(task.series)) } diff --git a/pkg/bloomgateway/processor_test.go b/pkg/bloomgateway/processor_test.go index 04856225e18f..b86dbf8006b7 100644 --- a/pkg/bloomgateway/processor_test.go +++ b/pkg/bloomgateway/processor_test.go @@ -126,7 +126,7 @@ func TestProcessor(t *testing.T) { } t.Log("series", len(swb.series)) - task, _ := NewTask(ctx, "fake", swb, filters, nil) + task := newTask(ctx, "fake", swb, filters, nil) tasks := []Task{task} results := atomic.NewInt64(0) @@ -178,7 +178,7 @@ func TestProcessor(t *testing.T) { } t.Log("series", len(swb.series)) - task, _ := NewTask(ctx, "fake", swb, filters, blocks) + task := newTask(ctx, "fake", swb, filters, blocks) tasks := []Task{task} results := atomic.NewInt64(0) @@ -227,7 +227,7 @@ func TestProcessor(t *testing.T) { } t.Log("series", len(swb.series)) - task, _ := NewTask(ctx, "fake", swb, filters, nil) + task := newTask(ctx, "fake", swb, filters, nil) tasks := []Task{task} results := atomic.NewInt64(0) diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index fab243f29613..6b234db27189 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -100,7 +100,6 @@ func (w *worker) running(_ context.Context) error { w.queue.ReleaseRequests(items) return errors.Errorf("failed to cast dequeued item to Task: %v", item) } - level.Debug(w.logger).Log("msg", "dequeued task", "task", task.ID) _ = w.pending.Dec() w.metrics.queueDuration.WithLabelValues(w.id).Observe(time.Since(task.enqueueTime).Seconds()) FromContext(task.ctx).AddQueueTime(time.Since(task.enqueueTime)) From ed84b238ad7a18c769ade8753c824d416a4cca74 Mon Sep 17 00:00:00 2001 From: Pangidoan Butar <38452094+doanbutar@users.noreply.github.com> Date: Thu, 2 May 2024 16:37:47 +0800 Subject: [PATCH 3/6] docs: Update template_functions.md (#12841) --- docs/sources/query/template_functions.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/sources/query/template_functions.md b/docs/sources/query/template_functions.md index 0effe4c01ac7..9ba67620a488 100644 --- a/docs/sources/query/template_functions.md +++ b/docs/sources/query/template_functions.md @@ -303,12 +303,12 @@ Example: Use this function to test to see if one string is contained inside of another. -Signature: `contains(s string, src string) bool` +Signature: `contains(src string, s string,) bool` Examples: ```template -{{ if contains .err "ErrTimeout" }} timeout {{end}} +{{ if contains "ErrTimeout" .err }} timeout {{end}} {{ if contains "he" "hello" }} yes {{end}} ``` @@ -316,13 +316,13 @@ Examples: Use this function to test to see if one string has exact matching inside of another. -Signature: `eq(s string, src string) bool` +Signature: `eq(src string, s string) bool` Examples: ```template -{{ if eq .err "ErrTimeout" }} timeout {{end}} -{{ if eq "he" "hello" }} yes {{end}} +{{ if eq "ErrTimeout" .err}} timeout {{end}} +{{ if eq "hello" "hello" }} yes {{end}} ``` ## hasPrefix and hasSuffix From 37c88220b3a7f8268c48f1bf37f4eb11cdba1b5f Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 2 May 2024 10:44:43 +0200 Subject: [PATCH 4/6] fix(blooms): Handle not found metas gracefully (#12853) There is a time window between between listing metas and fetching them from object storage which could lead to a race condition that the meta is not found in object storage, because it was deleted and superseded by a newer meta. This can happen when querying recent bloom data, that is still subject to updates, and results in an error like this: ``` rpc error: code = Unknown desc = failed to get meta file bloom/tsdb_index_19843/XXXX/metas/18fbdc8500000000-1921d15dffffffff-270affee.json: storage: object doesn't exist (Trace ID: 4fe28d32cfa3e3df9495c3a5d4a683fb) ``` Signed-off-by: Christian Haudum --- .../stores/shipper/bloomshipper/client.go | 19 +++++++++++++------ .../shipper/bloomshipper/client_test.go | 19 ++++++++++++++----- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index c637b983cb95..1c2314691cae 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -12,6 +12,7 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/dskit/concurrency" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -388,7 +389,11 @@ func (b *BloomClient) GetMetas(ctx context.Context, refs []MetaRef) ([]Meta, err err := concurrency.ForEachJob(ctx, len(refs), b.concurrency, func(ctx context.Context, idx int) error { meta, err := b.GetMeta(ctx, refs[idx]) if err != nil { - return err + key := b.KeyResolver.Meta(refs[idx]).Addr() + if !b.IsObjectNotFoundErr(err) { + return fmt.Errorf("failed to get meta file %s: %w", key, err) + } + level.Error(b.logger).Log("msg", "failed to get meta file", "ref", key, "err", err) } results[idx] = meta return nil @@ -396,20 +401,22 @@ func (b *BloomClient) GetMetas(ctx context.Context, refs []MetaRef) ([]Meta, err return results, err } +// GetMeta fetches the meta file for given MetaRef from object storage and +// decodes the JSON data into a Meta. +// If the meta file is not found in storage or decoding fails, the empty Meta +// is returned along with the error. func (b *BloomClient) GetMeta(ctx context.Context, ref MetaRef) (Meta, error) { - meta := Meta{ - MetaRef: ref, - } + meta := Meta{MetaRef: ref} key := b.KeyResolver.Meta(ref).Addr() reader, _, err := b.client.GetObject(ctx, key) if err != nil { - return Meta{}, fmt.Errorf("failed to get meta file%s: %w", key, err) + return meta, err } defer reader.Close() err = json.NewDecoder(reader).Decode(&meta) if err != nil { - return Meta{}, fmt.Errorf("failed to decode meta file %s: %w", key, err) + return meta, errors.Wrap(err, "failed to decode JSON") } return meta, nil } diff --git a/pkg/storage/stores/shipper/bloomshipper/client_test.go b/pkg/storage/stores/shipper/bloomshipper/client_test.go index cd77339c0932..ec5e7015e00b 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/client_test.go @@ -107,11 +107,20 @@ func TestBloomClient_GetMetas(t *testing.T) { require.Equal(t, metas, []Meta{m1, m2}) }) - t.Run("does not exist", func(t *testing.T) { - metas, err := c.GetMetas(ctx, []MetaRef{{}}) - require.Error(t, err) - require.True(t, c.client.IsObjectNotFoundErr(err)) - require.Equal(t, metas, []Meta{{}}) + t.Run("does not exist - yields empty meta", func(t *testing.T) { + ref := MetaRef{ + Ref: Ref{ + TenantID: "tenant", + TableName: "table", + Bounds: v1.FingerprintBounds{}, + StartTimestamp: 1000, + EndTimestamp: 2000, + Checksum: 1234, + }, + } + metas, err := c.GetMetas(ctx, []MetaRef{ref}) + require.NoError(t, err) + require.Equal(t, metas, []Meta{{MetaRef: ref}}) }) } From 599a3002d2878eafd2a64dc270f662f9756cae5b Mon Sep 17 00:00:00 2001 From: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> Date: Thu, 2 May 2024 09:58:46 +0100 Subject: [PATCH 5/6] docs: update the lokitool docs (#12805) Signed-off-by: Michel Hollands Co-authored-by: J Stickler --- docs/sources/alert/_index.md | 75 +++++++----------------------------- pkg/tool/printer/printer.go | 2 +- 2 files changed, 15 insertions(+), 62 deletions(-) diff --git a/docs/sources/alert/_index.md b/docs/sources/alert/_index.md index ead7c325e73c..1d99e56d1a56 100644 --- a/docs/sources/alert/_index.md +++ b/docs/sources/alert/_index.md @@ -66,13 +66,13 @@ groups: annotations: summary: High request latency - name: credentials_leak - rules: + rules: - alert: http-credentials-leaked - annotations: + annotations: message: "{{ $labels.job }} is leaking http basic auth credentials." expr: 'sum by (cluster, job, pod) (count_over_time({namespace="prod"} |~ "http(s?)://(\\w+):(\\w+)@" [5m]) > 0)' for: 10m - labels: + labels: severity: critical ``` @@ -160,7 +160,7 @@ Here is an example of a remote-write configuration for sending data to a local P ```yaml ruler: ... other settings ... - + remote_write: enabled: true client: @@ -186,13 +186,13 @@ We don't always control the source code of applications we run. Load balancers a Sometimes you want to know whether _any_ instance of something has occurred. Alerting based on logs can be a great way to handle this, such as finding examples of leaked authentication credentials: ```yaml - name: credentials_leak - rules: + rules: - alert: http-credentials-leaked - annotations: + annotations: message: "{{ $labels.job }} is leaking http basic auth credentials." expr: 'sum by (cluster, job, pod) (count_over_time({namespace="prod"} |~ "http(s?)://(\\w+):(\\w+)@" [5m]) > 0)' for: 10m - labels: + labels: severity: critical ``` @@ -208,76 +208,29 @@ As an example, we can use LogQL v2 to help Loki to monitor _itself_, alerting us ## Interacting with the Ruler -### Cortextool -Because the rule files are identical to Prometheus rule files, we can interact with the Loki Ruler via [`cortextool`](https://github.com/grafana/cortex-tools#rules). The CLI is in early development, but it works with both Loki and Cortex. Pass the `--backend=loki` option when using it with Loki. - -{{% admonition type="note" %}} -Not all commands in cortextool currently support Loki. -{{% /admonition %}} +### Lokitool +Because the rule files are identical to Prometheus rule files, we can interact with the Loki Ruler via `lokitool`. {{% admonition type="note" %}} -cortextool was intended to run against multi-tenant Loki, commands need an `--id=` flag set to the Loki instance ID or set the environment variable `CORTEX_TENANT_ID`. If Loki is running in single tenant mode, the required ID is `fake`. +lokitool is intended to run against multi-tenant Loki. The commands need an `--id=` flag set to the Loki instance ID or set the environment variable `LOKI_TENANT_ID`. If Loki is running in single tenant mode, the required ID is `fake`. {{% /admonition %}} An example workflow is included below: ```sh # lint the rules.yaml file ensuring it's valid and reformatting it if necessary -cortextool rules lint --backend=loki ./output/rules.yaml +lokitool rules lint ./output/rules.yaml # diff rules against the currently managed ruleset in Loki -cortextool rules diff --rule-dirs=./output --backend=loki +lokitool rules diff --rule-dirs=./output # ensure the remote ruleset matches your local ruleset, creating/updating/deleting remote rules which differ from your local specification. -cortextool rules sync --rule-dirs=./output --backend=loki +lokitool rules sync --rule-dirs=./output # print the remote ruleset -cortextool rules print --backend=loki +lokitool rules print ``` -### Cortextool Github Actions -There is also a [github action](https://github.com/grafana/cortex-rules-action) available for `cortex-tool`, so you can add it into your CI/CD pipelines! - -For instance, you can sync rules on master builds via -```yaml -name: sync-cortex-rules-and-alerts -on: - push: - branches: - - master -env: - CORTEX_ADDRESS: '' - CORTEX_TENANT_ID: '' - CORTEX_API_KEY: ${{ secrets.API_KEY }} - RULES_DIR: 'output/' -jobs: - sync-loki-alerts: - runs-on: ubuntu-18.04 - steps: - - name: Lint Rules - uses: grafana/cortex-rules-action@v0.4.0 - env: - ACTION: 'lint' - with: - args: --backend=loki - - name: Diff rules - uses: grafana/cortex-rules-action@v0.4.0 - env: - ACTION: 'diff' - with: - args: --backend=loki - - name: Sync rules - if: ${{ !contains(steps.diff-rules.outputs.detailed, 'no changes detected') }} - uses: grafana/cortex-rules-action@v0.4.0 - env: - ACTION: 'sync' - with: - args: --backend=loki - - name: Print rules - uses: grafana/cortex-rules-action@v0.4.0 - env: - ACTION: 'print' -``` ### Terraform With the [Terraform provider for Loki](https://registry.terraform.io/providers/fgouteroux/loki/latest), you can manage alerts and recording rules in Terraform HCL format: diff --git a/pkg/tool/printer/printer.go b/pkg/tool/printer/printer.go index 084d483a07a4..1a696a8ffdf9 100644 --- a/pkg/tool/printer/printer.go +++ b/pkg/tool/printer/printer.go @@ -17,7 +17,7 @@ import ( "github.com/grafana/loki/v3/pkg/tool/rules/rwrulefmt" ) -// Printer is used for printing formatted output from the cortextool +// Printer is used for printing formatted output from the lokitool type Printer struct { disableColor bool colorizer colorstring.Colorize From fd2301fd62b18eb345bc43868b40343efc1a1f10 Mon Sep 17 00:00:00 2001 From: benclive Date: Thu, 2 May 2024 12:51:12 +0100 Subject: [PATCH 6/6] fix: Ensure Drain patterns are valid for LogQL pattern match filter (#12815) --- pkg/pattern/drain/drain.go | 1 - pkg/pattern/drain/drain_test.go | 118 ++++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/pkg/pattern/drain/drain.go b/pkg/pattern/drain/drain.go index 20d9dadb6c8d..ade8fca366b8 100644 --- a/pkg/pattern/drain/drain.go +++ b/pkg/pattern/drain/drain.go @@ -268,7 +268,6 @@ func (d *Drain) Match(content string) *LogCluster { } func (d *Drain) getContentAsTokens(content string) []string { - content = strings.TrimSpace(content) for _, extraDelimiter := range d.config.ExtraDelimiters { content = strings.Replace(content, extraDelimiter, " ", -1) } diff --git a/pkg/pattern/drain/drain_test.go b/pkg/pattern/drain/drain_test.go index beb09742af93..ef7754c4ed57 100644 --- a/pkg/pattern/drain/drain_test.go +++ b/pkg/pattern/drain/drain_test.go @@ -6,9 +6,12 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/logql/log/pattern" ) func TestDrain_TrainExtractsPatterns(t *testing.T) { + t.Parallel() tests := []struct { name string drain *Drain @@ -116,3 +119,118 @@ func TestDrain_TrainExtractsPatterns(t *testing.T) { }) } } + +func TestDrain_TrainGeneratesMatchablePatterns(t *testing.T) { + t.Parallel() + tests := []struct { + name string + drain *Drain + inputLines []string + }{ + { + name: "should match each line against a pattern", + drain: New(DefaultConfig()), + inputLines: []string{ + `test test test`, + `test test test`, + `test test test`, + `test test test`, + }, + }, + { + name: "should also match newlines", + drain: New(DefaultConfig()), + inputLines: []string{ + "test test test\n", + "test test test\n", + "test test test\n", + "test test test\n", + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + for _, line := range tt.inputLines { + tt.drain.Train(line, 0) + } + t.Log("Learned clusters", tt.drain.Clusters()) + + for _, line := range tt.inputLines { + match := tt.drain.Match(line) + require.NotNil(t, match, "Line should match a cluster") + } + }) + } + +} + +func TestDrain_TrainGeneratesPatternsMatchableByLokiPatternFilter(t *testing.T) { + t.Parallel() + tests := []struct { + name string + drain *Drain + inputLines []string + }{ + { + name: "should extract patterns that all lines match", + drain: New(DefaultConfig()), + inputLines: []string{ + `test 1 test`, + `test 2 test`, + `test 3 test`, + `test 4 test`, + }, + }, + { + name: "should extract patterns that match if line ends with newlines", + drain: New(DefaultConfig()), + inputLines: []string{ + "test 1 test\n", + "test 2 test\n", + "test 3 test\n", + "test 4 test\n", + }, + }, + { + name: "should extract patterns that match if line ends with empty space", + drain: New(DefaultConfig()), + inputLines: []string{ + "test 1 test ", + "test 2 test ", + "test 3 test ", + "test 4 test ", + }, + }, + { + name: "should extract patterns that match if line starts with empty space", + drain: New(DefaultConfig()), + inputLines: []string{ + " test 1 test", + " test 2 test", + " test 3 test", + " test 4 test", + }, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + for _, line := range tt.inputLines { + tt.drain.Train(line, 0) + } + require.Equal(t, 1, len(tt.drain.Clusters())) + cluster := tt.drain.Clusters()[0] + t.Log("Extracted cluster: ", cluster) + + matcher, err := pattern.ParseLineFilter([]byte(cluster.String())) + require.NoError(t, err) + + for _, line := range tt.inputLines { + passes := matcher.Test([]byte(line)) + require.Truef(t, passes, "Line %q should match extracted pattern", line) + } + }) + } + +}