diff --git a/.drone/drone.jsonnet b/.drone/drone.jsonnet index 6dcea160a78c..49f67f06861a 100644 --- a/.drone/drone.jsonnet +++ b/.drone/drone.jsonnet @@ -640,7 +640,7 @@ local build_image_tag = '0.33.0'; 'GIT_TARGET_BRANCH="$DRONE_TARGET_BRANCH"', ]) { depends_on: ['loki'], when: onPRs }, make('validate-example-configs', container=false) { depends_on: ['loki'] }, - make('validate-dev-cluster-config', container=false) { depends_on: ['loki'] }, + make('validate-dev-cluster-config', container=false) { depends_on: ['validate-example-configs'] }, make('check-example-config-doc', container=false) { depends_on: ['clone'] }, { name: 'build-docs-website', diff --git a/.drone/drone.yml b/.drone/drone.yml index d45f7898a085..7a62b621262a 100644 --- a/.drone/drone.yml +++ b/.drone/drone.yml @@ -306,7 +306,7 @@ steps: - commands: - make BUILD_IN_CONTAINER=false validate-dev-cluster-config depends_on: - - loki + - validate-example-configs environment: {} image: grafana/loki-build-image:0.33.0 name: validate-dev-cluster-config @@ -2113,6 +2113,6 @@ kind: secret name: gpg_private_key --- kind: signature -hmac: fe7669a21410ae5f2d1ad6b6205fdc582af874f65f7bd6a679731a88174e3a1c +hmac: 457592d17208477ceb480f81dbdb88f7b95a5ad015c88d9d6fed06c2422a52f9 ... diff --git a/CHANGELOG.md b/CHANGELOG.md index 84010533ed5e..3bcd6eedf541 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,7 +48,9 @@ * [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks. * [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache. * [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters. -* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. +* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. +* [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results. +* [11679](https://github.com/grafana/loki/pull/11679) **dannykopping** Cache: extending #11535 to align custom ingester query split with cache keys for correct caching of results. ##### Fixes * [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var. diff --git a/MAINTAINERS.md b/MAINTAINERS.md index ba11c534b1b6..072cc863392a 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -1,4 +1,4 @@ @slim-bean is the main/default maintainer. Some parts of the codebase have other maintainers: -- `@grafana/docs-logs`, which includes [@osg-grafana](https://github.com/osg-grafana) ([Grafana Labs](https://grafana.com/)) and [@knylander-grafana](https://github.com/knylander-grafana) ([Grafana Labs](https://grafana.com/)) +- `@grafana/docs-logs`, which includes [@knylander-grafana](https://github.com/knylander-grafana) ([Grafana Labs](https://grafana.com/)) diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 51ecb12af62f..1e94843eacf8 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2830,6 +2830,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -frontend.max-cache-freshness [max_cache_freshness_per_query: | default = 10m] +# Do not cache metadata request if the end time is within the +# frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such +# limits. Defaults to 24h. +# CLI flag: -frontend.max-metadata-cache-freshness +[max_metadata_cache_freshness: | default = 1d] + # Do not cache requests with an end time that falls within Now minus this # duration. 0 disables this feature (default). # CLI flag: -frontend.max-stats-cache-freshness diff --git a/docs/sources/setup/install/helm/reference.md b/docs/sources/setup/install/helm/reference.md index 631aadacac02..ea1ee3c060dc 100644 --- a/docs/sources/setup/install/helm/reference.md +++ b/docs/sources/setup/install/helm/reference.md @@ -2707,6 +2707,15 @@ true
 {}
 
+ + + + monitoring.rules.disabled + object + If you disable all the alerts and keep .monitoring.rules.alerting set to true, the chart will fail to render. +
+{}
+
diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index 1f7dc836b5ff..0c05d13d8ef3 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "google.golang.org/protobuf/proto" "github.com/grafana/loki/integration/client" @@ -1061,7 +1062,7 @@ func TestCategorizedLabels(t *testing.T) { func TestBloomFiltersEndToEnd(t *testing.T) { commonFlags := []string{ - "-bloom-compactor.compaction-interval=2s", + "-bloom-compactor.compaction-interval=10s", "-bloom-compactor.enable-compaction=true", "-bloom-compactor.enabled=true", "-bloom-gateway.enable-filtering=true", @@ -1101,7 +1102,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) { "-target=index-gateway", )..., ) - _ = clu.AddComponent( + tBloomGateway = clu.AddComponent( "bloom-gateway", append( commonFlags, @@ -1136,7 +1137,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) { "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), )..., ) - _ = clu.AddComponent( + tBloomCompactor = clu.AddComponent( "bloom-compactor", append( commonFlags, @@ -1186,6 +1187,12 @@ func TestBloomFiltersEndToEnd(t *testing.T) { cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL()) cliIndexGateway.Now = now + cliBloomGateway := client.New(tenantID, "", tBloomGateway.HTTPURL()) + cliBloomGateway.Now = now + + cliBloomCompactor := client.New(tenantID, "", tBloomCompactor.HTTPURL()) + cliBloomCompactor.Now = now + lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"` // ingest logs from 10 different pods // each line contains a random, unique string @@ -1206,7 +1213,14 @@ func TestBloomFiltersEndToEnd(t *testing.T) { require.NoError(t, tIngester.Restart()) // wait for compactor to compact index and for bloom compactor to build bloom filters - time.Sleep(10 * time.Second) + require.Eventually(t, func() bool { + // verify metrics that observe usage of block for filtering + metrics, err := cliBloomCompactor.Metrics() + require.NoError(t, err) + successfulRunCount := getMetricValue(t, "loki_bloomcompactor_runs_completed_total", metrics) + t.Log("successful bloom compactor runs", successfulRunCount) + return successfulRunCount == 1 + }, 30*time.Second, time.Second) // use bloom gateway to perform needle in the haystack queries randIdx := rand.Intn(len(uniqueStrings)) @@ -1221,22 +1235,44 @@ func TestBloomFiltersEndToEnd(t *testing.T) { expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx]) require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1]) - // TODO(chaudum): - // verify that bloom blocks have actually been used for querying - // atm, we can only verify by logs, so we should add appropriate metrics for - // uploaded/downloaded blocks and metas + // verify metrics that observe usage of block for filtering + bloomGwMetrics, err := cliBloomGateway.Metrics() + require.NoError(t, err) + + unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics) + require.Equal(t, float64(10), unfilteredCount) + + filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics) + require.Equal(t, float64(1), filteredCount) + + mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics) + require.NoError(t, err) + + count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{ + Name: proto.String("status"), + Value: proto.String("success"), + }, func(m *dto.Metric) uint64 { + return m.Histogram.GetSampleCount() + }) + require.Equal(t, uint64(1), count) } func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 { + return getValueFromMetricFamilyWithFunc(mf, lbs[0], func(m *dto.Metric) float64 { return m.Counter.GetValue() }) +} + +func getValueFromMetricFamilyWithFunc[R any](mf *dto.MetricFamily, lbs *dto.LabelPair, f func(*dto.Metric) R) R { + eq := func(e *dto.LabelPair) bool { + return e.GetName() == lbs.GetName() && e.GetValue() == lbs.GetValue() + } + var zero R for _, m := range mf.Metric { - if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) { + if !slices.ContainsFunc(m.GetLabel(), eq) { continue } - - return m.Counter.GetValue() + return f(m) } - - return 0 + return zero } func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) { diff --git a/integration/parse_metrics.go b/integration/parse_metrics.go index 46ea42497856..9f2bf5fc8fc2 100644 --- a/integration/parse_metrics.go +++ b/integration/parse_metrics.go @@ -13,16 +13,24 @@ var ( ErrInvalidMetricType = fmt.Errorf("invalid metric type") ) -func extractMetric(metricName, metrics string) (float64, map[string]string, error) { +func extractMetricFamily(name, metrics string) (*io_prometheus_client.MetricFamily, error) { var parser expfmt.TextParser mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics)) if err != nil { - return 0, nil, err + return nil, err + } + + mf, ok := mfs[name] + if !ok { + return nil, ErrNoMetricFound } + return mf, nil +} - mf, found := mfs[metricName] - if !found { - return 0, nil, ErrNoMetricFound +func extractMetric(metricName, metrics string) (float64, map[string]string, error) { + mf, err := extractMetricFamily(metricName, metrics) + if err != nil { + return 0, nil, err } var val float64 diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index b0c3251a0843..766c05bab457 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -80,8 +80,10 @@ var ( ) type metrics struct { - queueDuration prometheus.Histogram - inflightRequests prometheus.Summary + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary + chunkRefsUnfiltered prometheus.Counter + chunkRefsFiltered prometheus.Counter } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -102,9 +104,29 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * MaxAge: time.Minute, AgeBuckets: 6, }), + chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkrefs_pre_filtering", + Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.", + }), + chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkrefs_post_filtering", + Help: "Total amount of chunk refs post filtering.", + }), } } +func (m *metrics) addUnfilteredCount(n int) { + m.chunkRefsUnfiltered.Add(float64(n)) +} + +func (m *metrics) addFilteredCount(n int) { + m.chunkRefsFiltered.Add(float64(n)) +} + // SyncMap is a map structure which can be synchronized using the RWMutex type SyncMap[k comparable, v any] struct { sync.RWMutex @@ -284,8 +306,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, err } + numChunksUnfiltered := len(req.Refs) + // Shortcut if request does not contain filters if len(req.Filters) == 0 { + g.metrics.addUnfilteredCount(numChunksUnfiltered) + g.metrics.addFilteredCount(len(req.Refs)) return &logproto.FilterChunkRefResponse{ ChunkRefs: req.Refs, }, nil @@ -313,6 +339,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk responses := responsesPool.Get(requestCount) defer responsesPool.Put(responses) +outer: for { select { case <-ctx.Done(): @@ -325,17 +352,24 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) // wait for all parts of the full response if len(responses) == requestCount { - for _, o := range responses { - if res.Removals.Len() == 0 { - continue - } - // we must not remove items from req.Refs as long as the worker may iterater over them - g.removeNotMatchingChunks(req, o) - } - return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil + break outer } } } + + for _, o := range responses { + if o.Removals.Len() == 0 { + continue + } + // we must not remove items from req.Refs as long as the worker may iterater over them + g.removeNotMatchingChunks(req, o) + } + + g.metrics.addUnfilteredCount(numChunksUnfiltered) + g.metrics.addFilteredCount(len(req.Refs)) + + level.Debug(g.logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs)) + return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) { diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index a8f9c56d50ba..ce5add3c63f3 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -27,6 +27,7 @@ type workerMetrics struct { dequeueErrors *prometheus.CounterVec dequeueWaitTime *prometheus.SummaryVec storeAccessLatency *prometheus.HistogramVec + bloomQueryLatency *prometheus.HistogramVec } func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics { @@ -50,6 +51,13 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str Name: "dequeue_wait_time", Help: "Time spent waiting for dequeuing tasks from queue", }, labels), + bloomQueryLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "bloom_query_latency", + Help: "Latency in seconds of processing bloom blocks", + }, append(labels, "status")), + // TODO(chaudum): Move this metric into the bloomshipper storeAccessLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -213,29 +221,32 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error { for _, b := range boundedRefs { if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp { - processBlock(bq, day, b.tasks) - return nil + return w.processBlock(bq, day, b.tasks) } } return nil }) } -func processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) { +func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error { schema, err := blockQuerier.Schema() if err != nil { - for _, t := range tasks { - t.ErrCh <- errors.Wrap(err, "failed to get block schema") - } + return err } tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0) it := newTaskMergeIterator(day, tokenizer, tasks...) fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it}) + + start := time.Now() err = fq.Run() + duration := time.Since(start).Seconds() + if err != nil { - for _, t := range tasks { - t.ErrCh <- errors.Wrap(err, "failed to run chunk check") - } + w.metrics.bloomQueryLatency.WithLabelValues(w.id, "failure").Observe(duration) + return err } + + w.metrics.bloomQueryLatency.WithLabelValues(w.id, "success").Observe(duration) + return nil } diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 2144abdf5d7e..ea4570e652df 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -157,7 +157,7 @@ type LabelsBuilder struct { // NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results. func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint, without, noLabels bool) *BaseLabelsBuilder { if parserKeyHints == nil { - parserKeyHints = noParserHints + parserKeyHints = NoParserHints() } const labelsCapacity = 16 @@ -179,7 +179,7 @@ func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint // NewBaseLabelsBuilder creates a new base labels builder. func NewBaseLabelsBuilder() *BaseLabelsBuilder { - return NewBaseLabelsBuilderWithGrouping(nil, noParserHints, false, false) + return NewBaseLabelsBuilderWithGrouping(nil, NoParserHints(), false, false) } // ForLabels creates a labels builder for a given labels set as base. diff --git a/pkg/logql/log/parser_hints.go b/pkg/logql/log/parser_hints.go index a8b1f73f3109..3fd4cff2b332 100644 --- a/pkg/logql/log/parser_hints.go +++ b/pkg/logql/log/parser_hints.go @@ -6,7 +6,9 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" ) -var noParserHints = &Hints{} +func NoParserHints() ParserHint { + return &Hints{} +} // ParserHint are hints given to LogQL parsers. // This is specially useful for parser that extract implicitly all possible label keys. diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 246dbed499c9..bd57603ab808 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -28,7 +28,7 @@ func Test_jsonParser_Parse(t *testing.T) { "pod_uuid", "foo", "pod_deployment_ref", "foobar", ), - noParserHints, + NoParserHints(), }, { "numeric", @@ -37,7 +37,7 @@ func Test_jsonParser_Parse(t *testing.T) { labels.FromStrings("counter", "1", "price__net_", "5.56909", ), - noParserHints, + NoParserHints(), }, { "escaped", @@ -47,7 +47,7 @@ func Test_jsonParser_Parse(t *testing.T) { "price__net_", "5.56909", "foo", `foo\"bar`, ), - noParserHints, + NoParserHints(), }, { "utf8 error rune", @@ -57,21 +57,21 @@ func Test_jsonParser_Parse(t *testing.T) { "price__net_", "5.56909", "foo", "", ), - noParserHints, + NoParserHints(), }, { "skip arrays", []byte(`{"counter":1, "price": {"net_":["10","20"]}}`), labels.EmptyLabels(), labels.FromStrings("counter", "1"), - noParserHints, + NoParserHints(), }, { "bad key replaced", []byte(`{"cou-nter":1}`), labels.EmptyLabels(), labels.FromStrings("cou_nter", "1"), - noParserHints, + NoParserHints(), }, { "errors", @@ -80,7 +80,7 @@ func Test_jsonParser_Parse(t *testing.T) { labels.FromStrings("__error__", "JSONParserErr", "__error_details__", "Value looks like object, but can't find closing '}' symbol", ), - noParserHints, + NoParserHints(), }, { "errors hints", @@ -103,7 +103,7 @@ func Test_jsonParser_Parse(t *testing.T) { "next_err", "false", "pod_deployment_ref", "foobar", ), - noParserHints, + NoParserHints(), }, } for _, tt := range tests { @@ -255,7 +255,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("app", "foo"), - noParserHints, + NoParserHints(), }, { "alternate syntax", @@ -265,7 +265,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("test", "value"), - noParserHints, + NoParserHints(), }, { "multiple fields", @@ -278,7 +278,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("app", "foo", "namespace", "prod", ), - noParserHints, + NoParserHints(), }, { "utf8", @@ -288,7 +288,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("utf8", "value"), - noParserHints, + NoParserHints(), }, { "nested field", @@ -298,7 +298,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", "foo"), - noParserHints, + NoParserHints(), }, { "nested field alternate syntax", @@ -308,7 +308,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", "foo"), - noParserHints, + NoParserHints(), }, { "nested field alternate syntax 2", @@ -318,7 +318,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", "foo"), - noParserHints, + NoParserHints(), }, { "nested field alternate syntax 3", @@ -328,7 +328,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", "foo"), - noParserHints, + NoParserHints(), }, { "array element", @@ -338,7 +338,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("param", "1"), - noParserHints, + NoParserHints(), }, { "full array", @@ -348,7 +348,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("params", "[1,2,3]"), - noParserHints, + NoParserHints(), }, { "full object", @@ -358,7 +358,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("deployment", `{"ref":"foobar", "params": [1,2,3]}`), - noParserHints, + NoParserHints(), }, { "expression matching nothing", @@ -368,7 +368,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("nope", ""), - noParserHints, + NoParserHints(), }, { "null field", @@ -379,7 +379,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.EmptyLabels(), labels.FromStrings("nf", ""), // null is coerced to an empty string - noParserHints, + NoParserHints(), }, { "boolean field", @@ -389,7 +389,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("bool", `false`), - noParserHints, + NoParserHints(), }, { "label override", @@ -401,7 +401,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("uuid", "bar", "uuid_extracted", "foo", ), - noParserHints, + NoParserHints(), }, { "non-matching expression", @@ -413,7 +413,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("uuid", "bar", "request_size", "", ), - noParserHints, + NoParserHints(), }, { "empty line", @@ -423,7 +423,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", ""), - noParserHints, + NoParserHints(), }, { "existing labels are not affected", @@ -435,7 +435,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("foo", "bar", "uuid", "", ), - noParserHints, + NoParserHints(), }, { "invalid JSON line", @@ -447,7 +447,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("foo", "bar", logqlmodel.ErrorLabel, errJSON, ), - noParserHints, + NoParserHints(), }, { "invalid JSON line with hints", @@ -470,7 +470,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.FromStrings("foo", "bar"), labels.FromStrings("foo", "bar"), - noParserHints, + NoParserHints(), }, { "nested escaped object", @@ -482,7 +482,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("foo", "bar", "app", `{ "key": "value", "key2":"value2"}`, ), - noParserHints, + NoParserHints(), }, } for _, tt := range tests { @@ -746,7 +746,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", "logfmt syntax error at pos 8 : unexpected '='", ), - noParserHints, + NoParserHints(), }, { "not logfmt with hints", @@ -766,7 +766,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.EmptyLabels(), labels.FromStrings("buzz", "foo"), nil, - noParserHints, + NoParserHints(), }, { "key alone logfmt", @@ -775,7 +775,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("foo", "bar", "bar", "foo"), nil, - noParserHints, + NoParserHints(), }, { "quoted logfmt", @@ -785,7 +785,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo bar", ), nil, - noParserHints, + NoParserHints(), }, { "escaped control chars in logfmt", @@ -795,7 +795,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo\nbar\tbaz", ), nil, - noParserHints, + NoParserHints(), }, { "literal control chars in logfmt", @@ -805,7 +805,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo\nbar\tbaz", ), nil, - noParserHints, + NoParserHints(), }, { "escaped slash logfmt", @@ -815,7 +815,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", `foo ba\r baz`, ), nil, - noParserHints, + NoParserHints(), }, { "literal newline and escaped slash logfmt", @@ -825,7 +825,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo bar\nb\\az", ), nil, - noParserHints, + NoParserHints(), }, { "double property logfmt", @@ -836,7 +836,7 @@ func TestLogfmtParser_parse(t *testing.T) { "latency", "10ms", ), nil, - noParserHints, + NoParserHints(), }, { "duplicate from line property", @@ -846,7 +846,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "10ms", ), nil, - noParserHints, + NoParserHints(), }, { "duplicate property", @@ -857,7 +857,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "10ms", ), nil, - noParserHints, + NoParserHints(), }, { "invalid key names", @@ -869,7 +869,7 @@ func TestLogfmtParser_parse(t *testing.T) { "test_dash", "foo", ), nil, - noParserHints, + NoParserHints(), }, { "nil", @@ -877,7 +877,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("foo", "bar"), labels.FromStrings("foo", "bar"), nil, - noParserHints, + NoParserHints(), }, { "empty key", @@ -892,7 +892,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", "logfmt syntax error at pos 15 : unexpected '='", ), - noParserHints, + NoParserHints(), }, { "error rune in key", @@ -906,7 +906,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", "logfmt syntax error at pos 20 : invalid key", ), - noParserHints, + NoParserHints(), }, { "double quote in key", @@ -920,7 +920,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", `logfmt syntax error at pos 17 : unexpected '"'`, ), - noParserHints, + NoParserHints(), }, { "= in value", @@ -933,7 +933,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", `logfmt syntax error at pos 7 : unexpected '='`, ), - noParserHints, + NoParserHints(), }, } @@ -1200,7 +1200,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`some message`), - noParserHints, + NoParserHints(), }, { "wrong json", @@ -1210,7 +1210,7 @@ func Test_unpackParser_Parse(t *testing.T) { "__error_details__", "expecting json object(6), but it is not", ), []byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`), - noParserHints, + NoParserHints(), }, { "empty line", @@ -1218,7 +1218,7 @@ func Test_unpackParser_Parse(t *testing.T) { labels.FromStrings("cluster", "us-central1"), labels.FromStrings("cluster", "us-central1"), []byte(``), - noParserHints, + NoParserHints(), }, { "wrong json with hints", @@ -1240,7 +1240,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`["foo","bar"]`), - noParserHints, + NoParserHints(), }, { "should rename", @@ -1254,7 +1254,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`some message`), - noParserHints, + NoParserHints(), }, { "should not change log and labels if no packed entry", @@ -1266,7 +1266,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`{"bar":1,"app":"foo","namespace":"prod","pod":{"uid":"1"}}`), - noParserHints, + NoParserHints(), }, { "non json with escaped quotes", @@ -1278,7 +1278,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`I0303 17:49:45.976518 1526 kubelet_getters.go:178] "Pod status updated" pod="openshift-etcd/etcd-ip-10-0-150-50.us-east-2.compute.internal" status=Running`), - noParserHints, + NoParserHints(), }, { "invalid key names", @@ -1289,7 +1289,7 @@ func Test_unpackParser_Parse(t *testing.T) { "test_dash", "foo", ), []byte(`some message`), - noParserHints, + NoParserHints(), }, } for _, tt := range tests { diff --git a/pkg/logql/syntax/parser.go b/pkg/logql/syntax/parser.go index 710bf7132c4c..79213049f376 100644 --- a/pkg/logql/syntax/parser.go +++ b/pkg/logql/syntax/parser.go @@ -31,7 +31,13 @@ var parserPool = sync.Pool{ }, } -const maxInputSize = 5120 +// (E.Welch) We originally added this limit from fuzz testing and realizing there should be some maximum limit to an allowed query size. +// The original limit was 5120 based on some internet searching and a best estimate of what a reasonable limit would be. +// We have seen use cases with queries containing a lot of filter expressions or long expanded variable names where this limit was too small. +// Apparently the spec does not specify a limit, and more internet searching suggests almost all browsers will handle 100k+ length urls without issue +// Some limit here still seems prudent however, so the new limit is now 128k. +// Also note this is used to allocate the buffer for reading the query string, so there is some memory cost to making this larger. +const maxInputSize = 131072 func init() { // Improve the error messages coming out of yacc. diff --git a/pkg/querier/queryrange/index_stats_cache.go b/pkg/querier/queryrange/index_stats_cache.go index a985167456a7..d52f2e22323f 100644 --- a/pkg/querier/queryrange/index_stats_cache.go +++ b/pkg/querier/queryrange/index_stats_cache.go @@ -93,6 +93,7 @@ func NewIndexStatsCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -102,7 +103,7 @@ func NewIndexStatsCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( log, c, - IndexStatsSplitter{cacheKeyLimits{limits, transformer}}, + IndexStatsSplitter{cacheKeyLimits{limits, transformer, iqo}}, limits, merger, IndexStatsExtractor{}, diff --git a/pkg/querier/queryrange/index_stats_cache_test.go b/pkg/querier/queryrange/index_stats_cache_test.go index c8119c6b9fe2..1127b88576e1 100644 --- a/pkg/querier/queryrange/index_stats_cache_test.go +++ b/pkg/querier/queryrange/index_stats_cache_test.go @@ -37,6 +37,7 @@ func TestIndexStatsCache(t *testing.T) { c, nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -180,6 +181,7 @@ func TestIndexStatsCache_RecentData(t *testing.T) { c, nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, diff --git a/pkg/querier/queryrange/ingester_query_window.go b/pkg/querier/queryrange/ingester_query_window.go new file mode 100644 index 000000000000..7a161f40c007 --- /dev/null +++ b/pkg/querier/queryrange/ingester_query_window.go @@ -0,0 +1,33 @@ +package queryrange + +import ( + "time" + + "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/validation" +) + +// SplitIntervalForTimeRange returns the correct split interval to use. It accounts for the given upperBound value being +// within the ingester query window, in which case it returns the ingester query split (unless it's not set, then the default +// split interval will be used). +func SplitIntervalForTimeRange(iqo util.IngesterQueryOptions, limits Limits, defaultSplitFn func(string) time.Duration, tenantIDs []string, ref, upperBound time.Time) time.Duration { + split := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, defaultSplitFn) + + if iqo == nil { + return split + } + + // if the query is within the ingester query window, choose the ingester split duration (if configured), otherwise + // revert to the default split duration + ingesterQueryWindowStart := ref.Add(-iqo.QueryIngestersWithin()) + + // query is (even partially) within the ingester query window + if upperBound.After(ingesterQueryWindowStart) { + ingesterSplit := validation.MaxDurationOrZeroPerTenant(tenantIDs, limits.IngesterQuerySplitDuration) + if !iqo.QueryStoreOnly() && ingesterSplit > 0 { + split = ingesterSplit + } + } + + return split +} diff --git a/pkg/querier/queryrange/labels_cache.go b/pkg/querier/queryrange/labels_cache.go index 1e0dd225fa7b..66c811490403 100644 --- a/pkg/querier/queryrange/labels_cache.go +++ b/pkg/querier/queryrange/labels_cache.go @@ -11,18 +11,21 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" + "github.com/grafana/loki/pkg/util" ) type cacheKeyLabels struct { Limits transformer UserIDTransformer + iqo util.IngesterQueryOptions } // GenerateCacheKey generates a cache key based on the userID, split duration and the interval of the request. // It also includes the label name and the provided query for label values request. func (i cacheKeyLabels) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { lr := r.(*LabelRequest) - split := i.MetadataQuerySplitDuration(userID) + + split := SplitIntervalForTimeRange(i.iqo, i.Limits, i.MetadataQuerySplitDuration, []string{userID}, time.Now().UTC(), r.GetEnd().UTC()) var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { @@ -77,6 +80,7 @@ func NewLabelsCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -86,12 +90,14 @@ func NewLabelsCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( logger, c, - cacheKeyLabels{limits, transformer}, + cacheKeyLabels{limits, transformer, iqo}, limits, merger, labelsExtractor{}, cacheGenNumberLoader, - shouldCache, + func(ctx context.Context, r queryrangebase.Request) bool { + return shouldCacheMetadataReq(ctx, logger, shouldCache, r, limits) + }, parallelismForReq, retentionEnabled, metrics, diff --git a/pkg/querier/queryrange/labels_cache_test.go b/pkg/querier/queryrange/labels_cache_test.go index 73ab9ad8f4f8..4c645b8d19ce 100644 --- a/pkg/querier/queryrange/labels_cache_test.go +++ b/pkg/querier/queryrange/labels_cache_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "fmt" + "regexp" "testing" "time" @@ -69,6 +70,7 @@ func TestLabelsCache(t *testing.T) { cache.NewMockCache(), nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -249,3 +251,215 @@ func TestLabelsCache(t *testing.T) { }) } } + +func TestLabelCache_freshness(t *testing.T) { + testTime := time.Now().Add(-1 * time.Hour) + from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime) + start, end := from.Time(), through.Time() + nonOverlappingStart, nonOverlappingEnd := from.Add(-24*time.Hour).Time(), through.Add(-24*time.Hour).Time() + + for _, tt := range []struct { + name string + req *LabelRequest + shouldCache bool + maxMetadataCacheFreshness time.Duration + }{ + { + name: "max metadata freshness not set", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &start, + End: &end, + }, + }, + shouldCache: true, + }, + { + name: "req overlaps with max cache freshness window", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &start, + End: &end, + }, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: false, + }, + { + name: "req does not overlap max cache freshness window", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &nonOverlappingStart, + End: &nonOverlappingEnd, + }, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + cacheMiddleware, err := NewLabelsCacheMiddleware( + log.NewNopLogger(), + fakeLimits{ + metadataSplitDuration: map[string]time.Duration{ + "fake": 24 * time.Hour, + }, + maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness, + }, + DefaultCodec, + cache.NewMockCache(), + nil, + nil, + nil, + func(_ context.Context, _ []string, _ queryrangebase.Request) int { + return 1 + }, + false, + nil, + nil, + ) + require.NoError(t, err) + + labelsResp := &LokiLabelNamesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: []string{"bar", "buzz"}, + Statistics: stats.Result{ + Summary: stats.Summary{ + Splits: 1, + }, + }, + } + + called := 0 + handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + called++ + + // should request the entire length with no partitioning as nothing is cached yet. + require.Equal(t, tt.req.GetStart(), r.GetStart()) + require.Equal(t, tt.req.GetEnd(), r.GetEnd()) + + return labelsResp, nil + })) + + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, tt.req) + require.NoError(t, err) + require.Equal(t, 1, called) // called actual handler, as not cached. + require.Equal(t, labelsResp, got) + + called = 0 + got, err = handler.Do(ctx, tt.req) + require.NoError(t, err) + if !tt.shouldCache { + require.Equal(t, 1, called) + } else { + require.Equal(t, 0, called) + } + require.Equal(t, labelsResp, got) + }) + } +} + +func TestLabelQueryCacheKey(t *testing.T) { + const ( + defaultTenant = "a" + alternateTenant = "b" + defaultSplit = time.Hour + ingesterSplit = 90 * time.Minute + ingesterQueryWindow = defaultSplit * 3 + ) + + l := fakeLimits{ + metadataSplitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit}, + ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit}, + } + + cases := []struct { + name, tenantID string + start, end time.Time + expectedSplit time.Duration + iqo util.IngesterQueryOptions + values bool + }{ + { + name: "outside ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-5 * time.Hour), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: ingesterSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window, but query store only", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: true, + }, + }, + { + name: "within ingester query window, but no ingester split duration configured", + tenantID: alternateTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + } + + for _, values := range []bool{true, false} { + for _, tc := range cases { + t.Run(fmt.Sprintf("%s (values: %v)", tc.name, values), func(t *testing.T) { + keyGen := cacheKeyLabels{l, nil, tc.iqo} + + r := &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &tc.start, + End: &tc.end, + }, + } + + const labelName = "foo" + const query = `{cluster="eu-west1"}` + + if values { + r.LabelRequest.Values = true + r.LabelRequest.Name = labelName + r.LabelRequest.Query = query + } + + // we use regex here because cache key always refers to the current time to get the ingester query window, + // and therefore we can't know the current interval apriori without duplicating the logic + var pattern *regexp.Regexp + if values { + pattern = regexp.MustCompile(fmt.Sprintf(`labelvalues:%s:%s:%s:(\d+):%d`, tc.tenantID, labelName, regexp.QuoteMeta(query), tc.expectedSplit)) + } else { + pattern = regexp.MustCompile(fmt.Sprintf(`labels:%s:(\d+):%d`, tc.tenantID, tc.expectedSplit)) + } + + require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r)) + }) + } + } +} diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 673c995a600b..79cc9ad16a36 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -30,6 +30,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" "github.com/grafana/loki/pkg/util/validation" @@ -102,10 +103,11 @@ type UserIDTransformer func(context.Context, string) string type cacheKeyLimits struct { Limits transformer UserIDTransformer + iqo util.IngesterQueryOptions } func (l cacheKeyLimits) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { - split := l.QuerySplitDuration(userID) + split := SplitIntervalForTimeRange(l.iqo, l.Limits, l.QuerySplitDuration, []string{userID}, time.Now().UTC(), r.GetEnd().UTC()) var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { diff --git a/pkg/querier/queryrange/limits/definitions.go b/pkg/querier/queryrange/limits/definitions.go index 57b2e03c6697..e12255883bf4 100644 --- a/pkg/querier/queryrange/limits/definitions.go +++ b/pkg/querier/queryrange/limits/definitions.go @@ -30,5 +30,6 @@ type Limits interface { MaxQueryBytesRead(context.Context, string) int MaxQuerierBytesRead(context.Context, string) int MaxStatsCacheFreshness(context.Context, string) time.Duration + MaxMetadataCacheFreshness(context.Context, string) time.Duration VolumeEnabled(string) bool } diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 0de342e42644..a80cf96dde80 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "regexp" "sync" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/querier/plan" base "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/math" @@ -48,10 +50,99 @@ func TestLimits(t *testing.T) { require.Equal( t, fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart().UnixMilli()/int64(time.Hour/time.Millisecond), int64(time.Hour)), - cacheKeyLimits{wrapped, nil}.GenerateCacheKey(context.Background(), "a", r), + cacheKeyLimits{wrapped, nil, nil}.GenerateCacheKey(context.Background(), "a", r), ) } +func TestMetricQueryCacheKey(t *testing.T) { + const ( + defaultTenant = "a" + alternateTenant = "b" + query = `sum(rate({foo="bar"}[1]))` + defaultSplit = time.Hour + ingesterSplit = 90 * time.Minute + ingesterQueryWindow = defaultSplit * 3 + ) + + var ( + step = (15 * time.Second).Milliseconds() + ) + + l := fakeLimits{ + splitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit}, + ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit}, + } + + cases := []struct { + name, tenantID string + start, end time.Time + expectedSplit time.Duration + iqo util.IngesterQueryOptions + }{ + { + name: "outside ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-5 * time.Hour), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: ingesterSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window, but query store only", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: true, + }, + }, + { + name: "within ingester query window, but no ingester split duration configured", + tenantID: alternateTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + keyGen := cacheKeyLimits{l, nil, tc.iqo} + + r := &LokiRequest{ + Query: query, + StartTs: tc.start, + EndTs: tc.end, + Step: step, + } + + // we use regex here because cache key always refers to the current time to get the ingester query window, + // and therefore we can't know the current interval apriori without duplicating the logic + pattern := regexp.MustCompile(fmt.Sprintf(`%s:%s:%d:(\d+):%d`, tc.tenantID, regexp.QuoteMeta(query), step, tc.expectedSplit)) + require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r)) + }) + } +} + func Test_seriesLimiter(t *testing.T) { cfg := testConfig cfg.CacheResults = false @@ -308,7 +399,7 @@ func Test_MaxQueryLookBack_Types(t *testing.T) { } func Test_GenerateCacheKey_NoDivideZero(t *testing.T) { - l := cacheKeyLimits{WithSplitByLimits(nil, 0), nil} + l := cacheKeyLimits{WithSplitByLimits(nil, 0), nil, nil} start := time.Now() r := &LokiRequest{ Query: "qry", diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 6d0d62af7a88..8223704eea02 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -177,38 +177,36 @@ func NewMiddleware( var codec base.Codec = DefaultCodec - split := newDefaultSplitter(limits, iqo) - - indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, split, statsCache, + indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, iqo, statsCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) if err != nil { return nil, nil, err } - metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, newMetricQuerySplitter(limits, iqo), resultsCache, + metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, iqo, resultsCache, cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware, metricsNamespace) if err != nil { return nil, nil, err } - limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec, split) + limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec, iqo) if err != nil { return nil, nil, err } // NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in // MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170 - logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, split, resultsCache, metrics, indexStatsTripperware, metricsNamespace) + logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, iqo, resultsCache, metrics, indexStatsTripperware, metricsNamespace) if err != nil { return nil, nil, err } - seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, split, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace) + seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, iqo, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace) if err != nil { return nil, nil, err } - labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, split, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace) + labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, iqo, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace) if err != nil { return nil, nil, err } @@ -218,7 +216,7 @@ func NewMiddleware( return nil, nil, err } - seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, split, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) + seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, iqo, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) if err != nil { return nil, nil, err } @@ -409,7 +407,7 @@ func getOperation(path string) string { } // NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests. -func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { +func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) @@ -418,7 +416,7 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo NewLimitsMiddleware(limits), NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), } if cfg.CacheResults { @@ -473,7 +471,7 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo } // NewLimitedTripperware creates a new frontend tripperware responsible for handling log requests which are label matcher only, no filter expression. -func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger, split splitter) (base.Middleware, error) { +func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger, iqo util.IngesterQueryOptions) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) @@ -482,7 +480,7 @@ func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger NewLimitsMiddleware(limits), NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), NewQuerierSizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), } @@ -501,7 +499,7 @@ func NewSeriesTripperware( metrics *Metrics, schema config.SchemaConfig, merger base.Merger, - split splitter, + iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, @@ -516,6 +514,7 @@ func NewSeriesTripperware( merger, c, cacheGenNumLoader, + iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -542,7 +541,7 @@ func NewSeriesTripperware( StatsCollectorMiddleware(), NewLimitsMiddleware(limits), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), } if cfg.CacheSeriesResults { @@ -584,7 +583,7 @@ func NewLabelsTripperware( log log.Logger, limits Limits, merger base.Merger, - split splitter, + iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, @@ -601,6 +600,7 @@ func NewLabelsTripperware( merger, c, cacheGenNumLoader, + iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -627,7 +627,7 @@ func NewLabelsTripperware( StatsCollectorMiddleware(), NewLimitsMiddleware(limits), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), } if cfg.CacheLabelResults { @@ -652,8 +652,8 @@ func NewLabelsTripperware( } // NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries -func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { - cacheKey := cacheKeyLimits{limits, cfg.Transformer} +func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { + cacheKey := cacheKeyLimits{limits, cfg.Transformer, iqo} var queryCacheMiddleware base.Middleware if cfg.CacheResults { var err error @@ -706,7 +706,7 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge queryRangeMiddleware, NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, newMetricQuerySplitter(limits, iqo), metrics.SplitByMetrics), ) if cfg.CacheResults { @@ -804,7 +804,7 @@ func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log lo }), nil } -func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { +func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { // Parallelize the volume requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d). // Indices are sharded by 24 hours, so we split the volume request in 24h intervals. limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval) @@ -817,6 +817,7 @@ func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema conf merger, c, cacheGenNumLoader, + iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -843,7 +844,7 @@ func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema conf cacheMiddleware, cfg, merger, - split, + newDefaultSplitter(limits, iqo), limits, log, metrics, @@ -912,7 +913,7 @@ func volumeFeatureFlagRoundTripper(nextTW base.Middleware, limits Limits) base.M }) } -func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { +func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval) var cacheMiddleware base.Middleware @@ -924,6 +925,7 @@ func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema merger, c, cacheGenNumLoader, + iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -950,7 +952,7 @@ func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema cacheMiddleware, cfg, merger, - split, + newDefaultSplitter(limits, iqo), limits, log, metrics, diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index fe8799fffe79..c7c7cff4595a 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -1237,23 +1237,24 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) { } type fakeLimits struct { - maxQueryLength time.Duration - maxQueryParallelism int - tsdbMaxQueryParallelism int - maxQueryLookback time.Duration - maxEntriesLimitPerQuery int - maxSeries int - splitDuration map[string]time.Duration - metadataSplitDuration map[string]time.Duration - ingesterSplitDuration map[string]time.Duration - minShardingLookback time.Duration - queryTimeout time.Duration - requiredLabels []string - requiredNumberLabels int - maxQueryBytesRead int - maxQuerierBytesRead int - maxStatsCacheFreshness time.Duration - volumeEnabled bool + maxQueryLength time.Duration + maxQueryParallelism int + tsdbMaxQueryParallelism int + maxQueryLookback time.Duration + maxEntriesLimitPerQuery int + maxSeries int + splitDuration map[string]time.Duration + metadataSplitDuration map[string]time.Duration + ingesterSplitDuration map[string]time.Duration + minShardingLookback time.Duration + queryTimeout time.Duration + requiredLabels []string + requiredNumberLabels int + maxQueryBytesRead int + maxQuerierBytesRead int + maxStatsCacheFreshness time.Duration + maxMetadataCacheFreshness time.Duration + volumeEnabled bool } func (f fakeLimits) QuerySplitDuration(key string) time.Duration { @@ -1344,6 +1345,10 @@ func (f fakeLimits) MaxStatsCacheFreshness(_ context.Context, _ string) time.Dur return f.maxStatsCacheFreshness } +func (f fakeLimits) MaxMetadataCacheFreshness(_ context.Context, _ string) time.Duration { + return f.maxMetadataCacheFreshness +} + func (f fakeLimits) VolumeEnabled(_ string) bool { return f.volumeEnabled } diff --git a/pkg/querier/queryrange/series_cache.go b/pkg/querier/queryrange/series_cache.go index 9ad67f70acf5..bbbf96e2dd70 100644 --- a/pkg/querier/queryrange/series_cache.go +++ b/pkg/querier/queryrange/series_cache.go @@ -9,21 +9,29 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + + "github.com/grafana/dskit/tenant" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" + "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/validation" ) type cacheKeySeries struct { Limits transformer UserIDTransformer + iqo util.IngesterQueryOptions } // GenerateCacheKey generates a cache key based on the userID, matchers, split duration and the interval of the request. func (i cacheKeySeries) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { sr := r.(*LokiSeriesRequest) - split := i.MetadataQuerySplitDuration(userID) + + split := SplitIntervalForTimeRange(i.iqo, i.Limits, i.MetadataQuerySplitDuration, []string{userID}, time.Now().UTC(), r.GetEnd().UTC()) var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { @@ -34,11 +42,12 @@ func (i cacheKeySeries) GenerateCacheKey(ctx context.Context, userID string, r r userID = i.transformer(ctx, userID) } - matchers := sr.GetMatch() - sort.Strings(matchers) - matcherStr := strings.Join(matchers, ",") + return fmt.Sprintf("series:%s:%s:%d:%d", userID, i.joinMatchers(sr.GetMatch()), currentInterval, split) +} - return fmt.Sprintf("series:%s:%s:%d:%d", userID, matcherStr, currentInterval, split) +func (i cacheKeySeries) joinMatchers(matchers []string) string { + sort.Strings(matchers) + return strings.Join(matchers, ",") } type seriesExtractor struct{} @@ -78,6 +87,7 @@ func NewSeriesCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -87,14 +97,33 @@ func NewSeriesCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( logger, c, - cacheKeySeries{limits, transformer}, + cacheKeySeries{limits, transformer, iqo}, limits, merger, seriesExtractor{}, cacheGenNumberLoader, - shouldCache, + func(ctx context.Context, r queryrangebase.Request) bool { + return shouldCacheMetadataReq(ctx, logger, shouldCache, r, limits) + }, parallelismForReq, retentionEnabled, metrics, ) } + +func shouldCacheMetadataReq(ctx context.Context, logger log.Logger, shouldCache queryrangebase.ShouldCacheFn, req queryrangebase.Request, l Limits) bool { + if shouldCache != nil && !shouldCache(ctx, req) { + return false + } + + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + level.Error(logger).Log("msg", "failed to determine if metadata request should be cached. won't cache", "err", err) + return false + } + + cacheFreshnessCapture := func(id string) time.Duration { return l.MaxMetadataCacheFreshness(ctx, id) } + maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) + + return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(model.Now().Add(-maxCacheFreshness)) +} diff --git a/pkg/querier/queryrange/series_cache_test.go b/pkg/querier/queryrange/series_cache_test.go index abe992001217..d73efa9deea8 100644 --- a/pkg/querier/queryrange/series_cache_test.go +++ b/pkg/querier/queryrange/series_cache_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "fmt" + "regexp" "testing" "time" @@ -77,6 +78,7 @@ func TestSeriesCache(t *testing.T) { cache.NewMockCache(), nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -312,3 +314,203 @@ func TestSeriesCache(t *testing.T) { } }) } + +func TestSeriesCache_freshness(t *testing.T) { + testTime := time.Now().Add(-1 * time.Hour) + from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime) + + for _, tt := range []struct { + name string + req *LokiSeriesRequest + shouldCache bool + maxMetadataCacheFreshness time.Duration + }{ + { + name: "max metadata freshness not set", + req: &LokiSeriesRequest{ + StartTs: from.Time(), + EndTs: through.Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + shouldCache: true, + }, + { + name: "req overlaps with max cache freshness window", + req: &LokiSeriesRequest{ + StartTs: from.Time(), + EndTs: through.Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: false, + }, + { + name: "req does not overlap max cache freshness window", + req: &LokiSeriesRequest{ + StartTs: from.Add(-24 * time.Hour).Time(), + EndTs: through.Add(-24 * time.Hour).Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + cacheMiddleware, err := NewSeriesCacheMiddleware( + log.NewNopLogger(), + fakeLimits{ + metadataSplitDuration: map[string]time.Duration{ + "fake": 24 * time.Hour, + }, + maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness, + }, + DefaultCodec, + cache.NewMockCache(), + nil, + nil, + nil, + func(_ context.Context, _ []string, _ queryrangebase.Request) int { + return 1 + }, + false, + nil, + nil, + ) + require.NoError(t, err) + + seriesResp := &LokiSeriesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: []logproto.SeriesIdentifier{ + { + Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, + }, + }, + Statistics: stats.Result{ + Summary: stats.Summary{ + Splits: 1, + }, + }, + } + + called := 0 + handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + called++ + + // should request the entire length with no partitioning as nothing is cached yet. + require.Equal(t, tt.req.GetStart(), r.GetStart()) + require.Equal(t, tt.req.GetEnd(), r.GetEnd()) + + return seriesResp, nil + })) + + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, tt.req) + require.NoError(t, err) + require.Equal(t, 1, called) // called actual handler, as not cached. + require.Equal(t, seriesResp, got) + + called = 0 + got, err = handler.Do(ctx, tt.req) + require.NoError(t, err) + if !tt.shouldCache { + require.Equal(t, 1, called) + } else { + require.Equal(t, 0, called) + } + require.Equal(t, seriesResp, got) + }) + } +} + +func TestSeriesQueryCacheKey(t *testing.T) { + const ( + defaultTenant = "a" + alternateTenant = "b" + defaultSplit = time.Hour + ingesterSplit = 90 * time.Minute + ingesterQueryWindow = defaultSplit * 3 + ) + + l := fakeLimits{ + metadataSplitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit}, + ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit}, + } + + cases := []struct { + name, tenantID string + start, end time.Time + expectedSplit time.Duration + iqo util.IngesterQueryOptions + values bool + }{ + { + name: "outside ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-5 * time.Hour), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: ingesterSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window, but query store only", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: true, + }, + }, + { + name: "within ingester query window, but no ingester split duration configured", + tenantID: alternateTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + matchers := []string{`{namespace="prod"}`, `{service="foo"}`} + + keyGen := cacheKeySeries{l, nil, tc.iqo} + + r := &LokiSeriesRequest{ + StartTs: tc.start, + EndTs: tc.end, + Match: matchers, + Path: seriesAPIPath, + } + + // we use regex here because cache key always refers to the current time to get the ingester query window, + // and therefore we can't know the current interval apriori without duplicating the logic + pattern := regexp.MustCompile(fmt.Sprintf(`series:%s:%s:(\d+):%d`, tc.tenantID, regexp.QuoteMeta(keyGen.joinMatchers(matchers)), tc.expectedSplit)) + + require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r)) + }) + } +} diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index b332fe5e612e..ef05aa969ec1 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -186,7 +186,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que case *LokiSeriesRequest, *LabelRequest: interval = validation.MaxDurationOrZeroPerTenant(tenantIDs, h.limits.MetadataQuerySplitDuration) default: - interval = validation.MaxDurationOrZeroPerTenant(tenantIDs, h.limits.QuerySplitDuration) + interval = validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, h.limits.QuerySplitDuration) } // skip split by if unset diff --git a/pkg/querier/queryrange/splitters.go b/pkg/querier/queryrange/splitters.go index 79e3d5352e06..0aaecf35cb96 100644 --- a/pkg/querier/queryrange/splitters.go +++ b/pkg/querier/queryrange/splitters.go @@ -98,7 +98,7 @@ func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req quer start, end, needsIngesterSplits := ingesterQueryBounds(execTime, s.iqo, req) - if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { + if ingesterQueryInterval := validation.MaxDurationOrZeroPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { // perform splitting using special interval (`split_ingester_queries_by_interval`) util.ForInterval(ingesterQueryInterval, start, end, endTimeInclusive, factory) @@ -212,7 +212,7 @@ func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r qu start, end, needsIngesterSplits = ingesterQueryBounds(execTime, s.iqo, lokiReq) start, end = s.alignStartEnd(r.GetStep(), start, end) - if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { + if ingesterQueryInterval := validation.MaxDurationOrZeroPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { // perform splitting using special interval (`split_ingester_queries_by_interval`) s.buildMetricSplits(lokiReq.GetStep(), ingesterQueryInterval, start, end, factory) diff --git a/pkg/querier/queryrange/volume_cache.go b/pkg/querier/queryrange/volume_cache.go index 954c642ffef8..147d61912db9 100644 --- a/pkg/querier/queryrange/volume_cache.go +++ b/pkg/querier/queryrange/volume_cache.go @@ -101,6 +101,7 @@ func NewVolumeCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -110,7 +111,7 @@ func NewVolumeCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( log, c, - VolumeSplitter{cacheKeyLimits{limits, transformer}}, + VolumeSplitter{cacheKeyLimits{limits, transformer, iqo}}, limits, merger, VolumeExtractor{}, diff --git a/pkg/querier/queryrange/volume_cache_test.go b/pkg/querier/queryrange/volume_cache_test.go index 904e0fc7c3a9..038d8fa925f5 100644 --- a/pkg/querier/queryrange/volume_cache_test.go +++ b/pkg/querier/queryrange/volume_cache_test.go @@ -39,6 +39,7 @@ func TestVolumeCache(t *testing.T) { c, nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -304,6 +305,7 @@ func TestVolumeCache_RecentData(t *testing.T) { c, nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, diff --git a/pkg/storage/chunk/cache/resultscache/cache.go b/pkg/storage/chunk/cache/resultscache/cache.go index 527aea84bcd1..d05d71837404 100644 --- a/pkg/storage/chunk/cache/resultscache/cache.go +++ b/pkg/storage/chunk/cache/resultscache/cache.go @@ -50,7 +50,7 @@ type ResultsCache struct { next Handler cache cache.Cache limits Limits - splitter KeyGenerator + keyGen KeyGenerator cacheGenNumberLoader CacheGenNumberLoader retentionEnabled bool extractor Extractor @@ -86,7 +86,7 @@ func NewResultsCache( next: next, cache: c, limits: limits, - splitter: keyGen, + keyGen: keyGen, cacheGenNumberLoader: cacheGenNumberLoader, retentionEnabled: retentionEnabled, extractor: extractor, @@ -115,7 +115,7 @@ func (s ResultsCache) Do(ctx context.Context, r Request) (Response, error) { } var ( - key = s.splitter.GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), r) + key = s.keyGen.GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), r) extents []Extent response Response ) diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 45dd34f201e8..ac25798c33e3 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -97,6 +97,7 @@ type Limits struct { MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` + MaxMetadataCacheFreshness model.Duration `yaml:"max_metadata_cache_freshness" json:"max_metadata_cache_freshness"` MaxStatsCacheFreshness model.Duration `yaml:"max_stats_cache_freshness" json:"max_stats_cache_freshness"` MaxQueriersPerTenant uint `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` MaxQueryCapacity float64 `yaml:"max_query_capacity" json:"max_query_capacity"` @@ -277,6 +278,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MaxCacheFreshness.Set("10m") f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") + _ = l.MaxMetadataCacheFreshness.Set("24h") + f.Var(&l.MaxMetadataCacheFreshness, "frontend.max-metadata-cache-freshness", "Do not cache metadata request if the end time is within the frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such limits. Defaults to 24h.") + _ = l.MaxStatsCacheFreshness.Set("10m") f.Var(&l.MaxStatsCacheFreshness, "frontend.max-stats-cache-freshness", "Do not cache requests with an end time that falls within Now minus this duration. 0 disables this feature (default).") @@ -298,6 +302,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.QuerySplitDuration.Set("1h") f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by a time interval and execute in parallel. The value 0 disables splitting by time. This also determines how cache keys are chosen when result caching is enabled.") + // with metadata caching, it is not possible to extract a subset of labels/series from a cached extent because unlike samples they are not associated with a timestamp. + // as a result, we could return inaccurate results. example: returning results from an entire 1h extent for a 5m query + // Setting max_metadata_cache_freshness to 24h should help us avoid caching recent data and preseve the correctness. + // For the portion of the request beyond the freshness window, granularity of the cached metadata results is determined by split_metadata_queries_by_interval. _ = l.MetadataQuerySplitDuration.Set("24h") f.Var(&l.MetadataQuerySplitDuration, "querier.split-metadata-queries-by-interval", "Split metadata queries by a time interval and execute in parallel. The value 0 disables splitting metadata queries by time. This also determines how cache keys are chosen when label/series result caching is enabled.") @@ -624,6 +632,10 @@ func (o *Overrides) MaxCacheFreshness(_ context.Context, userID string) time.Dur return time.Duration(o.getOverridesForUser(userID).MaxCacheFreshness) } +func (o *Overrides) MaxMetadataCacheFreshness(_ context.Context, userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).MaxMetadataCacheFreshness) +} + func (o *Overrides) MaxStatsCacheFreshness(_ context.Context, userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxStatsCacheFreshness) } diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index 272aa6942885..ce6852d3f3ea 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -13,6 +13,11 @@ Entries should include a reference to the pull request that introduced the chang [//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.) + +## 5.41.7 + +- [FEATURE] Add support to disable specific alert rules + ## 5.41.6 - [BUGFIX] Added missing namespace to query-scheduler-discovery service when deploying loki in a specific namespace. diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index cb43a70c965b..d8f4486b7de6 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -3,7 +3,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.9.3 -version: 5.41.6 +version: 5.41.7 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index 6b4ec081e9bb..dc016ef13c25 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 5.41.6](https://img.shields.io/badge/Version-5.41.6-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.3](https://img.shields.io/badge/AppVersion-2.9.3-informational?style=flat-square) +![Version: 5.41.7](https://img.shields.io/badge/Version-5.41.7-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.3](https://img.shields.io/badge/AppVersion-2.9.3-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode diff --git a/production/helm/loki/src/alerts.yaml.tpl b/production/helm/loki/src/alerts.yaml.tpl index 2171c94848e5..144e263f7061 100644 --- a/production/helm/loki/src/alerts.yaml.tpl +++ b/production/helm/loki/src/alerts.yaml.tpl @@ -2,6 +2,7 @@ groups: - name: "loki_alerts" rules: +{{- if not (.Values.monitoring.rules.disabled.LokiRequestErrors | default false) }} - alert: "LokiRequestErrors" annotations: message: | @@ -17,6 +18,8 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} +{{- if not (.Values.monitoring.rules.disabled.LokiRequestPanics | default false) }} - alert: "LokiRequestPanics" annotations: message: | @@ -28,6 +31,8 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} +{{- if not (.Values.monitoring.rules.disabled.LokiRequestLatency | default false) }} - alert: "LokiRequestLatency" annotations: message: | @@ -40,6 +45,8 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} +{{- if not (.Values.monitoring.rules.disabled.LokiTooManyCompactorsRunning | default false) }} - alert: "LokiTooManyCompactorsRunning" annotations: message: | @@ -52,6 +59,8 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} +{{- if not (.Values.monitoring.rules.disabled.LokiCanaryLatency | default false) }} - name: "loki_canaries_alerts" rules: - alert: "LokiCanaryLatency" @@ -66,3 +75,4 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml index b8c09ee76465..a7f4ea8f464d 100644 --- a/production/helm/loki/values.yaml +++ b/production/helm/loki/values.yaml @@ -583,6 +583,12 @@ monitoring: enabled: true # -- Include alerting rules alerting: true + # -- Specify which individual alerts should be disabled + # -- Instead of turning off each alert one by one, set the .monitoring.rules.alerting value to false instead. + # -- If you disable all the alerts and keep .monitoring.rules.alerting set to true, the chart will fail to render. + disabled: {} + # LokiRequestErrors: true + # LokiRequestPanics: true # -- Alternative namespace to create PrometheusRule resources in namespace: null # -- Additional annotations for the rules PrometheusRule resource