diff --git a/.drone/drone.jsonnet b/.drone/drone.jsonnet index 6dcea160a78c..49f67f06861a 100644 --- a/.drone/drone.jsonnet +++ b/.drone/drone.jsonnet @@ -640,7 +640,7 @@ local build_image_tag = '0.33.0'; 'GIT_TARGET_BRANCH="$DRONE_TARGET_BRANCH"', ]) { depends_on: ['loki'], when: onPRs }, make('validate-example-configs', container=false) { depends_on: ['loki'] }, - make('validate-dev-cluster-config', container=false) { depends_on: ['loki'] }, + make('validate-dev-cluster-config', container=false) { depends_on: ['validate-example-configs'] }, make('check-example-config-doc', container=false) { depends_on: ['clone'] }, { name: 'build-docs-website', diff --git a/.drone/drone.yml b/.drone/drone.yml index d45f7898a085..7a62b621262a 100644 --- a/.drone/drone.yml +++ b/.drone/drone.yml @@ -306,7 +306,7 @@ steps: - commands: - make BUILD_IN_CONTAINER=false validate-dev-cluster-config depends_on: - - loki + - validate-example-configs environment: {} image: grafana/loki-build-image:0.33.0 name: validate-dev-cluster-config @@ -2113,6 +2113,6 @@ kind: secret name: gpg_private_key --- kind: signature -hmac: fe7669a21410ae5f2d1ad6b6205fdc582af874f65f7bd6a679731a88174e3a1c +hmac: 457592d17208477ceb480f81dbdb88f7b95a5ad015c88d9d6fed06c2422a52f9 ... diff --git a/CHANGELOG.md b/CHANGELOG.md index 94a36e1f401e..04b692efcb00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,7 +48,9 @@ * [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks. * [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache. * [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters. -* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. +* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly. +* [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results. +* [11679](https://github.com/grafana/loki/pull/11679) **dannykopping** Cache: extending #11535 to align custom ingester query split with cache keys for correct caching of results. * [11499](https://github.com/grafana/loki/pull/11284) **jmichalek132** Config: Adds `frontend.log-query-request-headers` to enable logging of request headers in query logs. ##### Fixes @@ -56,6 +58,7 @@ * [11195](https://github.com/grafana/loki/pull/11195) **canuteson** Generate tsdb_shipper storage_config even if using_boltdb_shipper is false * [9831](https://github.com/grafana/loki/pull/9831) **sijmenhuizenga**: Fix Promtail excludepath not evaluated on newly added files. * [11551](https://github.com/grafana/loki/pull/11551) **dannykopping** Do not reflect label names in request metrics' "route" label. +* [11563](https://github.com/grafana/loki/pull/11563) **ptqa** Fix duplicate logs from docker containers. * [11601](https://github.com/grafana/loki/pull/11601) **dannykopping** Ruler: Fixed a panic that can be caused by concurrent read-write access of tenant configs when there are a large amount of rules. * [11606](https://github.com/grafana/loki/pull/11606) **dannykopping** Fixed regression adding newlines to HTTP error response bodies which may break client integrations. * [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction. diff --git a/MAINTAINERS.md b/MAINTAINERS.md index ba11c534b1b6..072cc863392a 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -1,4 +1,4 @@ @slim-bean is the main/default maintainer. Some parts of the codebase have other maintainers: -- `@grafana/docs-logs`, which includes [@osg-grafana](https://github.com/osg-grafana) ([Grafana Labs](https://grafana.com/)) and [@knylander-grafana](https://github.com/knylander-grafana) ([Grafana Labs](https://grafana.com/)) +- `@grafana/docs-logs`, which includes [@knylander-grafana](https://github.com/knylander-grafana) ([Grafana Labs](https://grafana.com/)) diff --git a/clients/cmd/fluent-bit/config.go b/clients/cmd/fluent-bit/config.go index 768ba845a330..469e18d495d7 100644 --- a/clients/cmd/fluent-bit/config.go +++ b/clients/cmd/fluent-bit/config.go @@ -2,7 +2,6 @@ package main import ( "encoding/json" - "errors" "fmt" "os" "strconv" @@ -67,7 +66,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { } err := clientURL.Set(url) if err != nil { - return nil, errors.New("failed to parse client URL") + return nil, fmt.Errorf("failed to parse client URL: %w", err) } res.clientConfig.URL = clientURL @@ -83,7 +82,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { } else { batchWaitValue, err := time.ParseDuration(batchWait) if err != nil { - return nil, fmt.Errorf("failed to parse BatchWait: %s", batchWait) + return nil, fmt.Errorf("failed to parse BatchWait %s: %w", batchWait, err) } res.clientConfig.BatchWait = batchWaitValue } @@ -93,7 +92,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { if batchSize != "" { batchSizeValue, err := strconv.Atoi(batchSize) if err != nil { - return nil, fmt.Errorf("failed to parse BatchSize: %s", batchSize) + return nil, fmt.Errorf("failed to parse BatchSize %s: %w", batchSize, err) } res.clientConfig.BatchSize = batchSizeValue } @@ -102,7 +101,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { if timeout != "" { timeoutValue, err := time.ParseDuration(timeout) if err != nil { - return nil, fmt.Errorf("failed to parse Timeout: %s", timeout) + return nil, fmt.Errorf("failed to parse Timeout %s: %w", timeout, err) } res.clientConfig.Timeout = timeoutValue } @@ -111,7 +110,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { if minBackoff != "" { minBackoffValue, err := time.ParseDuration(minBackoff) if err != nil { - return nil, fmt.Errorf("failed to parse MinBackoff: %s", minBackoff) + return nil, fmt.Errorf("failed to parse MinBackoff %s: %w", minBackoff, err) } res.clientConfig.BackoffConfig.MinBackoff = minBackoffValue } @@ -120,7 +119,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { if maxBackoff != "" { maxBackoffValue, err := time.ParseDuration(maxBackoff) if err != nil { - return nil, fmt.Errorf("failed to parse MaxBackoff: %s", maxBackoff) + return nil, fmt.Errorf("failed to parse MaxBackoff %s: %w", maxBackoff, err) } res.clientConfig.BackoffConfig.MaxBackoff = maxBackoffValue } @@ -129,7 +128,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { if maxRetries != "" { maxRetriesValue, err := strconv.Atoi(maxRetries) if err != nil { - return nil, fmt.Errorf("failed to parse MaxRetries: %s", maxRetries) + return nil, fmt.Errorf("failed to parse MaxRetries %s: %w", maxRetries, err) } res.clientConfig.BackoffConfig.MaxRetries = maxRetriesValue } @@ -154,7 +153,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { } var level log.Level if err := level.Set(logLevel); err != nil { - return nil, fmt.Errorf("invalid log level: %v", logLevel) + return nil, fmt.Errorf("invalid log level %v: %w", logLevel, err) } res.logLevel = level @@ -238,7 +237,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) { if queueSegmentSize != "" { res.bufferConfig.dqueConfig.queueSegmentSize, err = strconv.Atoi(queueSegmentSize) if err != nil { - return nil, fmt.Errorf("impossible to convert string to integer DqueSegmentSize: %v", queueSegmentSize) + return nil, fmt.Errorf("impossible to convert string to integer DqueSegmentSize %v: %w", queueSegmentSize, err) } } diff --git a/clients/pkg/promtail/targets/docker/target.go b/clients/pkg/promtail/targets/docker/target.go index 39b91ce21a5b..bb26391ab199 100644 --- a/clients/pkg/promtail/targets/docker/target.go +++ b/clients/pkg/promtail/targets/docker/target.go @@ -222,6 +222,7 @@ func (t *Target) process(r io.Reader, logStream string) { } t.metrics.dockerEntries.Inc() t.positions.Put(positions.CursorKey(t.containerName), ts.Unix()) + t.since = ts.Unix() } } diff --git a/clients/pkg/promtail/targets/docker/target_test.go b/clients/pkg/promtail/targets/docker/target_test.go index d2d2e58b3caa..e9bbf15b55bb 100644 --- a/clients/pkg/promtail/targets/docker/target_test.go +++ b/clients/pkg/promtail/targets/docker/target_test.go @@ -27,7 +27,13 @@ func Test_DockerTarget(t *testing.T) { h := func(w http.ResponseWriter, r *http.Request) { switch path := r.URL.Path; { case strings.HasSuffix(path, "/logs"): - dat, err := os.ReadFile("testdata/flog.log") + var filePath string + if strings.Contains(r.URL.RawQuery, "since=0") { + filePath = "testdata/flog.log" + } else { + filePath = "testdata/flog_after_restart.log" + } + dat, err := os.ReadFile(filePath) require.NoError(t, err) _, err = w.Write(dat) require.NoError(t, err) @@ -59,7 +65,7 @@ func Test_DockerTarget(t *testing.T) { }) require.NoError(t, err) - _, err = NewTarget( + target, err := NewTarget( NewMetrics(prometheus.NewRegistry()), logger, entryHandler, @@ -92,4 +98,28 @@ func Test_DockerTarget(t *testing.T) { actualLines = append(actualLines, entry.Line) } require.ElementsMatch(t, actualLines, expectedLines) + + // restart target to simulate container restart + target.startIfNotRunning() + entryHandler.Clear() + require.Eventually(t, func() bool { + return len(entryHandler.Received()) >= 5 + }, 5*time.Second, 100*time.Millisecond) + + receivedAfterRestart := entryHandler.Received() + sort.Slice(receivedAfterRestart, func(i, j int) bool { + return receivedAfterRestart[i].Timestamp.Before(receivedAfterRestart[j].Timestamp) + }) + actualLinesAfterRestart := make([]string, 0, 5) + for _, entry := range receivedAfterRestart[:5] { + actualLinesAfterRestart = append(actualLinesAfterRestart, entry.Line) + } + expectedLinesAfterRestart := []string{ + "243.115.12.215 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /morph/exploit/granular HTTP/1.0\" 500 26468", + "221.41.123.237 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /user-centric/whiteboard HTTP/2.0\" 205 22487", + "89.111.144.144 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /open-source/e-commerce HTTP/1.0\" 401 11092", + "62.180.191.187 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /cultivate/integrate/technologies HTTP/2.0\" 302 12979", + "156.249.2.192 - - [09/Dec/2023:09:16:57 +0000] \"POST /revolutionize/mesh/metrics HTTP/2.0\" 401 5297", + } + require.ElementsMatch(t, actualLinesAfterRestart, expectedLinesAfterRestart) } diff --git a/clients/pkg/promtail/targets/docker/testdata/flog_after_restart.log b/clients/pkg/promtail/targets/docker/testdata/flog_after_restart.log new file mode 100644 index 000000000000..59afb576805e Binary files /dev/null and b/clients/pkg/promtail/targets/docker/testdata/flog_after_restart.log differ diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index f371c28170b0..edb394733a3c 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2835,6 +2835,12 @@ The `limits_config` block configures global and per-tenant limits in Loki. # CLI flag: -frontend.max-cache-freshness [max_cache_freshness_per_query: | default = 10m] +# Do not cache metadata request if the end time is within the +# frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such +# limits. Defaults to 24h. +# CLI flag: -frontend.max-metadata-cache-freshness +[max_metadata_cache_freshness: | default = 1d] + # Do not cache requests with an end time that falls within Now minus this # duration. 0 disables this feature (default). # CLI flag: -frontend.max-stats-cache-freshness diff --git a/docs/sources/setup/install/helm/reference.md b/docs/sources/setup/install/helm/reference.md index 631aadacac02..ea1ee3c060dc 100644 --- a/docs/sources/setup/install/helm/reference.md +++ b/docs/sources/setup/install/helm/reference.md @@ -2707,6 +2707,15 @@ true
 {}
 
+ + + + monitoring.rules.disabled + object + If you disable all the alerts and keep .monitoring.rules.alerting set to true, the chart will fail to render. +
+{}
+
diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index 1f7dc836b5ff..0c05d13d8ef3 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "google.golang.org/protobuf/proto" "github.com/grafana/loki/integration/client" @@ -1061,7 +1062,7 @@ func TestCategorizedLabels(t *testing.T) { func TestBloomFiltersEndToEnd(t *testing.T) { commonFlags := []string{ - "-bloom-compactor.compaction-interval=2s", + "-bloom-compactor.compaction-interval=10s", "-bloom-compactor.enable-compaction=true", "-bloom-compactor.enabled=true", "-bloom-gateway.enable-filtering=true", @@ -1101,7 +1102,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) { "-target=index-gateway", )..., ) - _ = clu.AddComponent( + tBloomGateway = clu.AddComponent( "bloom-gateway", append( commonFlags, @@ -1136,7 +1137,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) { "-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(), )..., ) - _ = clu.AddComponent( + tBloomCompactor = clu.AddComponent( "bloom-compactor", append( commonFlags, @@ -1186,6 +1187,12 @@ func TestBloomFiltersEndToEnd(t *testing.T) { cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL()) cliIndexGateway.Now = now + cliBloomGateway := client.New(tenantID, "", tBloomGateway.HTTPURL()) + cliBloomGateway.Now = now + + cliBloomCompactor := client.New(tenantID, "", tBloomCompactor.HTTPURL()) + cliBloomCompactor.Now = now + lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"` // ingest logs from 10 different pods // each line contains a random, unique string @@ -1206,7 +1213,14 @@ func TestBloomFiltersEndToEnd(t *testing.T) { require.NoError(t, tIngester.Restart()) // wait for compactor to compact index and for bloom compactor to build bloom filters - time.Sleep(10 * time.Second) + require.Eventually(t, func() bool { + // verify metrics that observe usage of block for filtering + metrics, err := cliBloomCompactor.Metrics() + require.NoError(t, err) + successfulRunCount := getMetricValue(t, "loki_bloomcompactor_runs_completed_total", metrics) + t.Log("successful bloom compactor runs", successfulRunCount) + return successfulRunCount == 1 + }, 30*time.Second, time.Second) // use bloom gateway to perform needle in the haystack queries randIdx := rand.Intn(len(uniqueStrings)) @@ -1221,22 +1235,44 @@ func TestBloomFiltersEndToEnd(t *testing.T) { expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx]) require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1]) - // TODO(chaudum): - // verify that bloom blocks have actually been used for querying - // atm, we can only verify by logs, so we should add appropriate metrics for - // uploaded/downloaded blocks and metas + // verify metrics that observe usage of block for filtering + bloomGwMetrics, err := cliBloomGateway.Metrics() + require.NoError(t, err) + + unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics) + require.Equal(t, float64(10), unfilteredCount) + + filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics) + require.Equal(t, float64(1), filteredCount) + + mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics) + require.NoError(t, err) + + count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{ + Name: proto.String("status"), + Value: proto.String("success"), + }, func(m *dto.Metric) uint64 { + return m.Histogram.GetSampleCount() + }) + require.Equal(t, uint64(1), count) } func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 { + return getValueFromMetricFamilyWithFunc(mf, lbs[0], func(m *dto.Metric) float64 { return m.Counter.GetValue() }) +} + +func getValueFromMetricFamilyWithFunc[R any](mf *dto.MetricFamily, lbs *dto.LabelPair, f func(*dto.Metric) R) R { + eq := func(e *dto.LabelPair) bool { + return e.GetName() == lbs.GetName() && e.GetValue() == lbs.GetValue() + } + var zero R for _, m := range mf.Metric { - if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) { + if !slices.ContainsFunc(m.GetLabel(), eq) { continue } - - return m.Counter.GetValue() + return f(m) } - - return 0 + return zero } func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) { diff --git a/integration/multi_tenant_queries_test.go b/integration/multi_tenant_queries_test.go index 468cc98de18a..76fbd63f13bd 100644 --- a/integration/multi_tenant_queries_test.go +++ b/integration/multi_tenant_queries_test.go @@ -13,6 +13,7 @@ import ( ) func TestMultiTenantQuery(t *testing.T) { + t.Skip("This test is flaky on CI but it's hardly reproducible locally.") clu := cluster.New(nil) defer func() { assert.NoError(t, clu.Cleanup()) @@ -36,24 +37,25 @@ func TestMultiTenantQuery(t *testing.T) { require.NoError(t, cliTenant2.PushLogLine("lineB", cliTenant2.Now.Add(-45*time.Minute), nil, map[string]string{"job": "fake2"})) // check that tenant1 only have access to log line A. - matchLines(t, cliTenant1, `{job="fake2"}`, []string{}) - matchLines(t, cliTenant1, `{job=~"fake.*"}`, []string{"lineA"}) - matchLines(t, cliTenant1, `{job="fake1"}`, []string{"lineA"}) + require.ElementsMatch(t, query(t, cliTenant1, `{job="fake2"}`), []string{}) + require.ElementsMatch(t, query(t, cliTenant1, `{job=~"fake.*"}`), []string{"lineA"}) + require.ElementsMatch(t, query(t, cliTenant1, `{job="fake1"}`), []string{"lineA"}) // check that tenant2 only have access to log line B. - matchLines(t, cliTenant2, `{job="fake1"}`, []string{}) - matchLines(t, cliTenant2, `{job=~"fake.*"}`, []string{"lineB"}) - matchLines(t, cliTenant2, `{job="fake2"}`, []string{"lineB"}) + require.ElementsMatch(t, query(t, cliTenant2, `{job="fake1"}`), []string{}) + require.ElementsMatch(t, query(t, cliTenant2, `{job=~"fake.*"}`), []string{"lineB"}) + require.ElementsMatch(t, query(t, cliTenant2, `{job="fake2"}`), []string{"lineB"}) // check that multitenant has access to all log lines on same query. - matchLines(t, cliMultitenant, `{job=~"fake.*"}`, []string{"lineA", "lineB"}) - matchLines(t, cliMultitenant, `{job="fake1"}`, []string{"lineA"}) - matchLines(t, cliMultitenant, `{job="fake2"}`, []string{"lineB"}) - matchLines(t, cliMultitenant, `{job="fake3"}`, []string{}) + require.ElementsMatch(t, query(t, cliMultitenant, `{job=~"fake.*"}`), []string{"lineA", "lineB"}) + require.ElementsMatch(t, query(t, cliMultitenant, `{job="fake1"}`), []string{"lineA"}) + require.ElementsMatch(t, query(t, cliMultitenant, `{job="fake2"}`), []string{"lineB"}) + require.ElementsMatch(t, query(t, cliMultitenant, `{job="fake3"}`), []string{}) } -func matchLines(t *testing.T, client *client.Client, labels string, expectedLines []string) { - resp, err := client.RunRangeQuery(context.Background(), labels) +func query(t *testing.T, client *client.Client, labels string) []string { + t.Helper() + resp, err := client.RunRangeQueryWithStartEnd(context.Background(), labels, client.Now.Add(-1*time.Hour), client.Now) require.NoError(t, err) var lines []string @@ -62,5 +64,5 @@ func matchLines(t *testing.T, client *client.Client, labels string, expectedLine lines = append(lines, val[1]) } } - require.ElementsMatch(t, expectedLines, lines) + return lines } diff --git a/integration/parse_metrics.go b/integration/parse_metrics.go index 46ea42497856..9f2bf5fc8fc2 100644 --- a/integration/parse_metrics.go +++ b/integration/parse_metrics.go @@ -13,16 +13,24 @@ var ( ErrInvalidMetricType = fmt.Errorf("invalid metric type") ) -func extractMetric(metricName, metrics string) (float64, map[string]string, error) { +func extractMetricFamily(name, metrics string) (*io_prometheus_client.MetricFamily, error) { var parser expfmt.TextParser mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics)) if err != nil { - return 0, nil, err + return nil, err + } + + mf, ok := mfs[name] + if !ok { + return nil, ErrNoMetricFound } + return mf, nil +} - mf, found := mfs[metricName] - if !found { - return 0, nil, ErrNoMetricFound +func extractMetric(metricName, metrics string) (float64, map[string]string, error) { + mf, err := extractMetricFamily(metricName, metrics) + if err != nil { + return 0, nil, err } var val float64 diff --git a/operator/apis/config/v1/projectconfig_types.go b/operator/apis/config/v1/projectconfig_types.go index b6a80175266b..488f7b2cb64f 100644 --- a/operator/apis/config/v1/projectconfig_types.go +++ b/operator/apis/config/v1/projectconfig_types.go @@ -2,7 +2,7 @@ package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - cfg "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" //nolint:staticcheck + configv1alpha1 "k8s.io/component-base/config/v1alpha1" ) // BuiltInCertManagement is the configuration for the built-in facility to generate and rotate @@ -142,6 +142,67 @@ const ( TLSProfileModernType TLSProfileType = "Modern" ) +// ControllerManagerConfigurationSpec defines the desired state of GenericControllerManagerConfiguration. +type ControllerManagerConfigurationSpec struct { + // LeaderElection is the LeaderElection config to be used when configuring + // the manager.Manager leader election + // +optional + LeaderElection *configv1alpha1.LeaderElectionConfiguration `json:"leaderElection,omitempty"` + + // Metrics contains the controller metrics configuration + // +optional + Metrics ControllerMetrics `json:"metrics,omitempty"` + + // Health contains the controller health configuration + // +optional + Health ControllerHealth `json:"health,omitempty"` + + // Webhook contains the controllers webhook configuration + // +optional + Webhook ControllerWebhook `json:"webhook,omitempty"` +} + +// ControllerMetrics defines the metrics configs. +type ControllerMetrics struct { + // BindAddress is the TCP address that the controller should bind to + // for serving prometheus metrics. + // It can be set to "0" to disable the metrics serving. + // +optional + BindAddress string `json:"bindAddress,omitempty"` +} + +// ControllerHealth defines the health configs. +type ControllerHealth struct { + // HealthProbeBindAddress is the TCP address that the controller should bind to + // for serving health probes + // It can be set to "0" or "" to disable serving the health probe. + // +optional + HealthProbeBindAddress string `json:"healthProbeBindAddress,omitempty"` +} + +// ControllerWebhook defines the webhook server for the controller. +type ControllerWebhook struct { + // Port is the port that the webhook server serves at. + // It is used to set webhook.Server.Port. + // +optional + Port *int `json:"port,omitempty"` +} + +//+kubebuilder:object:root=true + +// ControllerManagerConfiguration is the Schema for the GenericControllerManagerConfigurations API. +type ControllerManagerConfiguration struct { + metav1.TypeMeta `json:",inline"` + + // ControllerManagerConfiguration returns the contfigurations for controllers + ControllerManagerConfigurationSpec `json:",inline"` +} + +// Complete returns the configuration for controller-runtime. +func (c *ControllerManagerConfigurationSpec) Complete() (ControllerManagerConfigurationSpec, error) { + return *c, nil +} + //+kubebuilder:object:root=true // ProjectConfig is the Schema for the projectconfigs API @@ -149,7 +210,7 @@ type ProjectConfig struct { metav1.TypeMeta `json:",inline"` // ControllerManagerConfigurationSpec returns the contfigurations for controllers - cfg.ControllerManagerConfigurationSpec `json:",inline"` + ControllerManagerConfigurationSpec `json:",inline"` Gates FeatureGates `json:"featureGates,omitempty"` } diff --git a/operator/apis/config/v1/zz_generated.deepcopy.go b/operator/apis/config/v1/zz_generated.deepcopy.go index ef20274e286e..f047818445aa 100644 --- a/operator/apis/config/v1/zz_generated.deepcopy.go +++ b/operator/apis/config/v1/zz_generated.deepcopy.go @@ -6,6 +6,7 @@ package v1 import ( runtime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/component-base/config/v1alpha1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -23,6 +24,104 @@ func (in *BuiltInCertManagement) DeepCopy() *BuiltInCertManagement { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ControllerHealth) DeepCopyInto(out *ControllerHealth) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ControllerHealth. +func (in *ControllerHealth) DeepCopy() *ControllerHealth { + if in == nil { + return nil + } + out := new(ControllerHealth) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ControllerManagerConfiguration) DeepCopyInto(out *ControllerManagerConfiguration) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ControllerManagerConfigurationSpec.DeepCopyInto(&out.ControllerManagerConfigurationSpec) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ControllerManagerConfiguration. +func (in *ControllerManagerConfiguration) DeepCopy() *ControllerManagerConfiguration { + if in == nil { + return nil + } + out := new(ControllerManagerConfiguration) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ControllerManagerConfiguration) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ControllerManagerConfigurationSpec) DeepCopyInto(out *ControllerManagerConfigurationSpec) { + *out = *in + if in.LeaderElection != nil { + in, out := &in.LeaderElection, &out.LeaderElection + *out = new(v1alpha1.LeaderElectionConfiguration) + (*in).DeepCopyInto(*out) + } + out.Metrics = in.Metrics + out.Health = in.Health + in.Webhook.DeepCopyInto(&out.Webhook) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ControllerManagerConfigurationSpec. +func (in *ControllerManagerConfigurationSpec) DeepCopy() *ControllerManagerConfigurationSpec { + if in == nil { + return nil + } + out := new(ControllerManagerConfigurationSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ControllerMetrics) DeepCopyInto(out *ControllerMetrics) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ControllerMetrics. +func (in *ControllerMetrics) DeepCopy() *ControllerMetrics { + if in == nil { + return nil + } + out := new(ControllerMetrics) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ControllerWebhook) DeepCopyInto(out *ControllerWebhook) { + *out = *in + if in.Port != nil { + in, out := &in.Port, &out.Port + *out = new(int) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ControllerWebhook. +func (in *ControllerWebhook) DeepCopy() *ControllerWebhook { + if in == nil { + return nil + } + out := new(ControllerWebhook) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FeatureGates) DeepCopyInto(out *FeatureGates) { *out = *in diff --git a/operator/config/docs/config.json b/operator/config/docs/config.json index 3d912e857d2b..fb1b7d8a11b2 100644 --- a/operator/config/docs/config.json +++ b/operator/config/docs/config.json @@ -4,7 +4,13 @@ ], "hideTypePatterns": [ "ParseError$", - "List$" + "List$", + "ControllerHealth$", + "ControllerManagerConfiguration$", + "ControllerManagerConfigurationSpec$", + "ControllerMetrics$", + "ControllerWebhook$", + "ProjectConfig$" ], "externalPackages": [ { @@ -38,22 +44,6 @@ { "typeMatchPrefix": "^k8s\\.io/component-base/config/v1alpha1\\.LeaderElectionConfiguration$", "docsURLTemplate": "https://pkg.go.dev/k8s.io/component-base/config#LeaderElectionConfiguration" - }, - { - "typeMatchPrefix": "^sigs\\.k8s\\.io/controller-runtime/pkg/config/v1alpha1\\.ControllerConfigurationSpec$", - "docsURLTemplate": "https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/config/v1alpha1#ControllerConfigurationSpec" - }, - { - "typeMatchPrefix": "^sigs\\.k8s\\.io/controller-runtime/pkg/config/v1alpha1\\.ControllerMetrics$", - "docsURLTemplate": "https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/config/v1alpha1#ControllerMetrics" - }, - { - "typeMatchPrefix": "^sigs\\.k8s\\.io/controller-runtime/pkg/config/v1alpha1\\.ControllerHealth$", - "docsURLTemplate": "https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/config/v1alpha1#ControllerHealth" - }, - { - "typeMatchPrefix": "^sigs\\.k8s\\.io/controller-runtime/pkg/config/v1alpha1\\.ControllerWebhook$", - "docsURLTemplate": "https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/config/v1alpha1#ControllerWebhook" } ], "typeDisplayNamePrefixOverrides": { @@ -66,4 +56,4 @@ "github.com/grafana/loki/operator/apis/loki/config/v1": "Feature Gates" }, "markdownDisabled": false -} \ No newline at end of file +} diff --git a/operator/docs/operator/feature-gates.md b/operator/docs/operator/feature-gates.md index 7475dfa8a1cc..1d5c046be775 100644 --- a/operator/docs/operator/feature-gates.md +++ b/operator/docs/operator/feature-gates.md @@ -99,9 +99,6 @@ The refresh is applied to all LokiStack certificates at once.

## FeatureGates { #config-loki-grafana-com-v1-FeatureGates } -

-(Appears on:ProjectConfig) -

FeatureGates is the supported set of all operator feature gates.

@@ -415,156 +412,6 @@ bool -## ProjectConfig { #config-loki-grafana-com-v1-ProjectConfig } -
-

ProjectConfig is the Schema for the projectconfigs API

-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
FieldDescription
-syncPeriod
- - -Kubernetes meta/v1.Duration - - -
-(Optional) -

SyncPeriod determines the minimum frequency at which watched resources are -reconciled. A lower period will correct entropy more quickly, but reduce -responsiveness to change if there are many watched resources. Change this -value only if you know what you are doing. Defaults to 10 hours if unset. -there will a 10 percent jitter between the SyncPeriod of all controllers -so that all controllers will not send list requests simultaneously.

-
-leaderElection
- - -Kubernetes v1alpha1.LeaderElectionConfiguration - - -
-(Optional) -

LeaderElection is the LeaderElection config to be used when configuring -the manager.Manager leader election

-
-cacheNamespace
- -string - -
-(Optional) -

CacheNamespace if specified restricts the manager’s cache to watch objects in -the desired namespace Defaults to all namespaces

-

Note: If a namespace is specified, controllers can still Watch for a -cluster-scoped resource (e.g Node). For namespaced resources the cache -will only hold objects from the desired namespace.

-
-gracefulShutDown
- - -Kubernetes meta/v1.Duration - - -
-

GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop. -To disable graceful shutdown, set to time.Duration(0) -To use graceful shutdown without timeout, set to a negative duration, e.G. time.Duration(-1) -The graceful shutdown is skipped for safety reasons in case the leader election lease is lost.

-
-controller
- - -K8S Controller-runtime v1alpha1.ControllerConfigurationSpec - - -
-(Optional) -

Controller contains global configuration options for controllers -registered within this manager.

-
-metrics
- - -K8S Controller-runtime v1alpha1.ControllerMetrics - - -
-(Optional) -

Metrics contains the controller metrics configuration

-
-health
- - -K8S Controller-runtime v1alpha1.ControllerHealth - - -
-(Optional) -

Health contains the controller health configuration

-
-webhook
- - -K8S Controller-runtime v1alpha1.ControllerWebhook - - -
-(Optional) -

Webhook contains the controllers webhook configuration

-
-featureGates
- - -FeatureGates - - -
-
- ## TLSProfileType { #config-loki-grafana-com-v1-TLSProfileType } (string alias)
diff --git a/operator/go.mod b/operator/go.mod index 4ffc3899d11c..0ee7c037f16c 100644 --- a/operator/go.mod +++ b/operator/go.mod @@ -23,6 +23,7 @@ require ( k8s.io/apimachinery v0.27.7 k8s.io/apiserver v0.27.7 k8s.io/client-go v0.27.7 + k8s.io/component-base v0.27.7 k8s.io/utils v0.0.0-20230505201702-9f6742963106 sigs.k8s.io/controller-runtime v0.15.3 sigs.k8s.io/yaml v1.3.0 @@ -150,7 +151,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.27.7 // indirect - k8s.io/component-base v0.27.7 // indirect k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/operator/internal/config/loader.go b/operator/internal/config/loader.go new file mode 100644 index 000000000000..b5af090ddb88 --- /dev/null +++ b/operator/internal/config/loader.go @@ -0,0 +1,30 @@ +package config + +import ( + "errors" + "fmt" + "os" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + + configv1 "github.com/grafana/loki/operator/apis/config/v1" +) + +var errConfigFileLoading = errors.New("could not read file at path") + +func loadConfigFile(scheme *runtime.Scheme, configFile string) (*configv1.ProjectConfig, error) { + content, err := os.ReadFile(configFile) + if err != nil { + return nil, fmt.Errorf("%w %s", errConfigFileLoading, configFile) + } + + codecs := serializer.NewCodecFactory(scheme) + + outConfig := &configv1.ProjectConfig{} + if err = runtime.DecodeInto(codecs.UniversalDecoder(), content, outConfig); err != nil { + return nil, fmt.Errorf("could not decode file into runtime.Object: %w", err) + } + + return outConfig, nil +} diff --git a/operator/internal/config/options.go b/operator/internal/config/options.go new file mode 100644 index 000000000000..8aeba0eb53b3 --- /dev/null +++ b/operator/internal/config/options.go @@ -0,0 +1,94 @@ +package config + +import ( + "fmt" + "reflect" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/webhook" + + configv1 "github.com/grafana/loki/operator/apis/config/v1" +) + +// LoadConfig initializes the controller configuration, optionally overriding the defaults +// from a provided configuration file. +func LoadConfig(scheme *runtime.Scheme, configFile string) (*configv1.ProjectConfig, ctrl.Options, error) { + options := ctrl.Options{Scheme: scheme} + if configFile == "" { + return &configv1.ProjectConfig{}, options, nil + } + + ctrlCfg, err := loadConfigFile(scheme, configFile) + if err != nil { + return nil, options, fmt.Errorf("failed to parse controller manager config file: %w", err) + } + + options = mergeOptionsFromFile(options, ctrlCfg) + return ctrlCfg, options, nil +} + +func mergeOptionsFromFile(o manager.Options, cfg *configv1.ProjectConfig) manager.Options { + o = setLeaderElectionConfig(o, cfg.ControllerManagerConfigurationSpec) + + if o.MetricsBindAddress == "" && cfg.Metrics.BindAddress != "" { + o.MetricsBindAddress = cfg.Metrics.BindAddress + } + + if o.HealthProbeBindAddress == "" && cfg.Health.HealthProbeBindAddress != "" { + o.HealthProbeBindAddress = cfg.Health.HealthProbeBindAddress + } + + //nolint:staticcheck + if o.Port == 0 && cfg.Webhook.Port != nil { + o.Port = *cfg.Webhook.Port + } + + //nolint:staticcheck + if o.WebhookServer == nil { + o.WebhookServer = webhook.NewServer(webhook.Options{ + Port: o.Port, + }) + } + + return o +} + +func setLeaderElectionConfig(o manager.Options, obj configv1.ControllerManagerConfigurationSpec) manager.Options { + if obj.LeaderElection == nil { + // The source does not have any configuration; noop + return o + } + + if !o.LeaderElection && obj.LeaderElection.LeaderElect != nil { + o.LeaderElection = *obj.LeaderElection.LeaderElect + } + + if o.LeaderElectionResourceLock == "" && obj.LeaderElection.ResourceLock != "" { + o.LeaderElectionResourceLock = obj.LeaderElection.ResourceLock + } + + if o.LeaderElectionNamespace == "" && obj.LeaderElection.ResourceNamespace != "" { + o.LeaderElectionNamespace = obj.LeaderElection.ResourceNamespace + } + + if o.LeaderElectionID == "" && obj.LeaderElection.ResourceName != "" { + o.LeaderElectionID = obj.LeaderElection.ResourceName + } + + if o.LeaseDuration == nil && !reflect.DeepEqual(obj.LeaderElection.LeaseDuration, metav1.Duration{}) { + o.LeaseDuration = &obj.LeaderElection.LeaseDuration.Duration + } + + if o.RenewDeadline == nil && !reflect.DeepEqual(obj.LeaderElection.RenewDeadline, metav1.Duration{}) { + o.RenewDeadline = &obj.LeaderElection.RenewDeadline.Duration + } + + if o.RetryPeriod == nil && !reflect.DeepEqual(obj.LeaderElection.RetryPeriod, metav1.Duration{}) { + o.RetryPeriod = &obj.LeaderElection.RetryPeriod.Duration + } + + return o +} diff --git a/operator/main.go b/operator/main.go index 6b101175407e..ffa16608707c 100644 --- a/operator/main.go +++ b/operator/main.go @@ -21,6 +21,7 @@ import ( lokiv1 "github.com/grafana/loki/operator/apis/loki/v1" lokiv1beta1 "github.com/grafana/loki/operator/apis/loki/v1beta1" lokictrl "github.com/grafana/loki/operator/controllers/loki" + "github.com/grafana/loki/operator/internal/config" "github.com/grafana/loki/operator/internal/metrics" "github.com/grafana/loki/operator/internal/operator" "github.com/grafana/loki/operator/internal/validation" @@ -59,14 +60,10 @@ func main() { var err error - ctrlCfg := ctrlconfigv1.ProjectConfig{} - options := ctrl.Options{Scheme: scheme} - if configFile != "" { - options, err = options.AndFrom(ctrl.ConfigFile().AtPath(configFile).OfKind(&ctrlCfg)) //nolint:staticcheck - if err != nil { - logger.Error(err, "failed to parse controller manager config file") - os.Exit(1) - } + ctrlCfg, options, err := config.LoadConfig(scheme, configFile) + if err != nil { + logger.Error(err, "failed to load operator configuration") + os.Exit(1) } if ctrlCfg.Gates.LokiStackAlerts && !ctrlCfg.Gates.ServiceMonitors { diff --git a/pkg/bloomgateway/bloomgateway.go b/pkg/bloomgateway/bloomgateway.go index b0c3251a0843..5c2cc9dad003 100644 --- a/pkg/bloomgateway/bloomgateway.go +++ b/pkg/bloomgateway/bloomgateway.go @@ -27,8 +27,6 @@ of line filter expressions. | bloomgateway.Worker | - bloomshipper.Store - | bloomshipper.Shipper | bloomshipper.BloomFileClient @@ -80,8 +78,10 @@ var ( ) type metrics struct { - queueDuration prometheus.Histogram - inflightRequests prometheus.Summary + queueDuration prometheus.Histogram + inflightRequests prometheus.Summary + chunkRefsUnfiltered prometheus.Counter + chunkRefsFiltered prometheus.Counter } func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics { @@ -102,9 +102,29 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) * MaxAge: time.Minute, AgeBuckets: 6, }), + chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkrefs_pre_filtering", + Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.", + }), + chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "chunkrefs_post_filtering", + Help: "Total amount of chunk refs post filtering.", + }), } } +func (m *metrics) addUnfilteredCount(n int) { + m.chunkRefsUnfiltered.Add(float64(n)) +} + +func (m *metrics) addFilteredCount(n int) { + m.chunkRefsFiltered.Add(float64(n)) +} + // SyncMap is a map structure which can be synchronized using the RWMutex type SyncMap[k comparable, v any] struct { sync.RWMutex @@ -149,9 +169,9 @@ type Gateway struct { workerMetrics *workerMetrics queueMetrics *queue.Metrics - queue *queue.RequestQueue - activeUsers *util.ActiveUsersCleanupService - bloomStore bloomshipper.Store + queue *queue.RequestQueue + activeUsers *util.ActiveUsersCleanupService + bloomShipper bloomshipper.Interface sharding ShardingStrategy @@ -200,13 +220,8 @@ func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, o return nil, err } - bloomStore, err := bloomshipper.NewBloomStore(bloomShipper) - if err != nil { - return nil, err - } - // We need to keep a reference to be able to call Stop() on shutdown of the gateway. - g.bloomStore = bloomStore + g.bloomShipper = bloomShipper if err := g.initServices(); err != nil { return nil, err @@ -221,7 +236,7 @@ func (g *Gateway) initServices() error { svcs := []services.Service{g.queue, g.activeUsers} for i := 0; i < g.cfg.WorkerConcurrency; i++ { id := fmt.Sprintf("bloom-query-worker-%d", i) - w := newWorker(id, g.workerConfig, g.queue, g.bloomStore, g.pendingTasks, g.logger, g.workerMetrics) + w := newWorker(id, g.workerConfig, g.queue, g.bloomShipper, g.pendingTasks, g.logger, g.workerMetrics) svcs = append(svcs, w) } g.serviceMngr, err = services.NewManager(svcs...) @@ -273,7 +288,7 @@ func (g *Gateway) running(ctx context.Context) error { } func (g *Gateway) stopping(_ error) error { - g.bloomStore.Stop() + g.bloomShipper.Stop() return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr) } @@ -284,8 +299,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk return nil, err } + numChunksUnfiltered := len(req.Refs) + // Shortcut if request does not contain filters if len(req.Filters) == 0 { + g.metrics.addUnfilteredCount(numChunksUnfiltered) + g.metrics.addFilteredCount(len(req.Refs)) return &logproto.FilterChunkRefResponse{ ChunkRefs: req.Refs, }, nil @@ -313,6 +332,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk responses := responsesPool.Get(requestCount) defer responsesPool.Put(responses) +outer: for { select { case <-ctx.Done(): @@ -325,17 +345,24 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount)) // wait for all parts of the full response if len(responses) == requestCount { - for _, o := range responses { - if res.Removals.Len() == 0 { - continue - } - // we must not remove items from req.Refs as long as the worker may iterater over them - g.removeNotMatchingChunks(req, o) - } - return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil + break outer } } } + + for _, o := range responses { + if o.Removals.Len() == 0 { + continue + } + // we must not remove items from req.Refs as long as the worker may iterater over them + g.removeNotMatchingChunks(req, o) + } + + g.metrics.addUnfilteredCount(numChunksUnfiltered) + g.metrics.addFilteredCount(len(req.Refs)) + + level.Debug(g.logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs)) + return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil } func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) { diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 183a2aad2190..1e85e7d2089c 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -277,7 +277,7 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { // replace store implementation and re-initialize workers and sub-services bqs, data := createBlockQueriers(t, 5, now.Add(-8*time.Hour), now, 0, 1024) - gw.bloomStore = newMockBloomStore(bqs) + gw.bloomShipper = newMockBloomStore(bqs) err = gw.initServices() require.NoError(t, err) @@ -331,7 +331,6 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { ctx := user.InjectOrgID(context.Background(), tenantID) res, err := gw.FilterChunkRefs(ctx, req) require.NoError(t, err) - expectedResponse := &logproto.FilterChunkRefResponse{ ChunkRefs: inputChunkRefs[:1], } @@ -373,15 +372,10 @@ type mockBloomStore struct { bqs []bloomshipper.BlockQuerierWithFingerprintRange } -var _ bloomshipper.Store = &mockBloomStore{} - -// GetBlockQueriersForBlockRefs implements bloomshipper.Store. -func (s *mockBloomStore) GetBlockQueriersForBlockRefs(_ context.Context, _ string, _ []bloomshipper.BlockRef) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) { - return s.bqs, nil -} +var _ bloomshipper.Interface = &mockBloomStore{} -// GetBlockRefs implements bloomshipper.Store. -func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _, _ time.Time) ([]bloomshipper.BlockRef, error) { +// GetBlockRefs implements bloomshipper.Interface +func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _, _ model.Time) ([]bloomshipper.BlockRef, error) { blocks := make([]bloomshipper.BlockRef, 0, len(s.bqs)) for i := range s.bqs { blocks = append(blocks, bloomshipper.BlockRef{ @@ -395,15 +389,11 @@ func (s *mockBloomStore) GetBlockRefs(_ context.Context, tenant string, _, _ tim return blocks, nil } -// GetBlockQueriers implements bloomshipper.Store. -func (s *mockBloomStore) GetBlockQueriers(_ context.Context, _ string, _, _ time.Time, _ []uint64) ([]bloomshipper.BlockQuerierWithFingerprintRange, error) { - return s.bqs, nil -} - +// Stop implements bloomshipper.Interface func (s *mockBloomStore) Stop() {} -// ForEach implements bloomshipper.Store. -func (s *mockBloomStore) ForEach(_ context.Context, _ string, _ []bloomshipper.BlockRef, callback bloomshipper.ForEachBlockCallback) error { +// Fetch implements bloomshipper.Interface +func (s *mockBloomStore) Fetch(_ context.Context, _ string, _ []bloomshipper.BlockRef, callback bloomshipper.ForEachBlockCallback) error { shuffled := make([]bloomshipper.BlockQuerierWithFingerprintRange, len(s.bqs)) _ = copy(shuffled, s.bqs) diff --git a/pkg/bloomgateway/worker.go b/pkg/bloomgateway/worker.go index a8f9c56d50ba..73100025a743 100644 --- a/pkg/bloomgateway/worker.go +++ b/pkg/bloomgateway/worker.go @@ -27,6 +27,7 @@ type workerMetrics struct { dequeueErrors *prometheus.CounterVec dequeueWaitTime *prometheus.SummaryVec storeAccessLatency *prometheus.HistogramVec + bloomQueryLatency *prometheus.HistogramVec } func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics { @@ -50,6 +51,13 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str Name: "dequeue_wait_time", Help: "Time spent waiting for dequeuing tasks from queue", }, labels), + bloomQueryLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "bloom_query_latency", + Help: "Latency in seconds of processing bloom blocks", + }, append(labels, "status")), + // TODO(chaudum): Move this metric into the bloomshipper storeAccessLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, Subsystem: subsystem, @@ -70,18 +78,18 @@ type worker struct { id string cfg workerConfig queue *queue.RequestQueue - store bloomshipper.Store + shipper bloomshipper.Interface tasks *pendingTasks logger log.Logger metrics *workerMetrics } -func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, store bloomshipper.Store, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker { +func newWorker(id string, cfg workerConfig, queue *queue.RequestQueue, shipper bloomshipper.Interface, tasks *pendingTasks, logger log.Logger, metrics *workerMetrics) *worker { w := &worker{ id: id, cfg: cfg, queue: queue, - store: store, + shipper: shipper, tasks: tasks, logger: log.With(logger, "worker", id), metrics: metrics, @@ -154,7 +162,7 @@ func (w *worker) running(ctx context.Context) error { level.Debug(logger).Log("msg", "process tasks", "tasks", len(tasks)) storeFetchStart := time.Now() - blockRefs, err := w.store.GetBlockRefs(taskCtx, tasks[0].Tenant, day, day.Add(Day).Add(-1*time.Nanosecond)) + blockRefs, err := w.shipper.GetBlockRefs(taskCtx, tasks[0].Tenant, toModelTime(day), toModelTime(day.Add(Day).Add(-1*time.Nanosecond))) w.metrics.storeAccessLatency.WithLabelValues(w.id, "GetBlockRefs").Observe(time.Since(storeFetchStart).Seconds()) if err != nil { for _, t := range tasks { @@ -210,32 +218,39 @@ func (w *worker) stopping(err error) error { } func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant string, day time.Time, blockRefs []bloomshipper.BlockRef, boundedRefs []boundedTasks) error { - return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error { + return w.shipper.Fetch(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error { for _, b := range boundedRefs { if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp { - processBlock(bq, day, b.tasks) - return nil + return w.processBlock(bq, day, b.tasks) } } return nil }) } -func processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) { +func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error { schema, err := blockQuerier.Schema() if err != nil { - for _, t := range tasks { - t.ErrCh <- errors.Wrap(err, "failed to get block schema") - } + return err } tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0) it := newTaskMergeIterator(day, tokenizer, tasks...) fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it}) + + start := time.Now() err = fq.Run() + duration := time.Since(start).Seconds() + if err != nil { - for _, t := range tasks { - t.ErrCh <- errors.Wrap(err, "failed to run chunk check") - } + w.metrics.bloomQueryLatency.WithLabelValues(w.id, "failure").Observe(duration) + return err } + + w.metrics.bloomQueryLatency.WithLabelValues(w.id, "success").Observe(duration) + return nil +} + +func toModelTime(t time.Time) model.Time { + return model.TimeFromUnixNano(t.UnixNano()) } diff --git a/pkg/logql/log/labels.go b/pkg/logql/log/labels.go index 2144abdf5d7e..ea4570e652df 100644 --- a/pkg/logql/log/labels.go +++ b/pkg/logql/log/labels.go @@ -157,7 +157,7 @@ type LabelsBuilder struct { // NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results. func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint, without, noLabels bool) *BaseLabelsBuilder { if parserKeyHints == nil { - parserKeyHints = noParserHints + parserKeyHints = NoParserHints() } const labelsCapacity = 16 @@ -179,7 +179,7 @@ func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint // NewBaseLabelsBuilder creates a new base labels builder. func NewBaseLabelsBuilder() *BaseLabelsBuilder { - return NewBaseLabelsBuilderWithGrouping(nil, noParserHints, false, false) + return NewBaseLabelsBuilderWithGrouping(nil, NoParserHints(), false, false) } // ForLabels creates a labels builder for a given labels set as base. diff --git a/pkg/logql/log/parser_hints.go b/pkg/logql/log/parser_hints.go index a8b1f73f3109..3fd4cff2b332 100644 --- a/pkg/logql/log/parser_hints.go +++ b/pkg/logql/log/parser_hints.go @@ -6,7 +6,9 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" ) -var noParserHints = &Hints{} +func NoParserHints() ParserHint { + return &Hints{} +} // ParserHint are hints given to LogQL parsers. // This is specially useful for parser that extract implicitly all possible label keys. diff --git a/pkg/logql/log/parser_test.go b/pkg/logql/log/parser_test.go index 246dbed499c9..bd57603ab808 100644 --- a/pkg/logql/log/parser_test.go +++ b/pkg/logql/log/parser_test.go @@ -28,7 +28,7 @@ func Test_jsonParser_Parse(t *testing.T) { "pod_uuid", "foo", "pod_deployment_ref", "foobar", ), - noParserHints, + NoParserHints(), }, { "numeric", @@ -37,7 +37,7 @@ func Test_jsonParser_Parse(t *testing.T) { labels.FromStrings("counter", "1", "price__net_", "5.56909", ), - noParserHints, + NoParserHints(), }, { "escaped", @@ -47,7 +47,7 @@ func Test_jsonParser_Parse(t *testing.T) { "price__net_", "5.56909", "foo", `foo\"bar`, ), - noParserHints, + NoParserHints(), }, { "utf8 error rune", @@ -57,21 +57,21 @@ func Test_jsonParser_Parse(t *testing.T) { "price__net_", "5.56909", "foo", "", ), - noParserHints, + NoParserHints(), }, { "skip arrays", []byte(`{"counter":1, "price": {"net_":["10","20"]}}`), labels.EmptyLabels(), labels.FromStrings("counter", "1"), - noParserHints, + NoParserHints(), }, { "bad key replaced", []byte(`{"cou-nter":1}`), labels.EmptyLabels(), labels.FromStrings("cou_nter", "1"), - noParserHints, + NoParserHints(), }, { "errors", @@ -80,7 +80,7 @@ func Test_jsonParser_Parse(t *testing.T) { labels.FromStrings("__error__", "JSONParserErr", "__error_details__", "Value looks like object, but can't find closing '}' symbol", ), - noParserHints, + NoParserHints(), }, { "errors hints", @@ -103,7 +103,7 @@ func Test_jsonParser_Parse(t *testing.T) { "next_err", "false", "pod_deployment_ref", "foobar", ), - noParserHints, + NoParserHints(), }, } for _, tt := range tests { @@ -255,7 +255,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("app", "foo"), - noParserHints, + NoParserHints(), }, { "alternate syntax", @@ -265,7 +265,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("test", "value"), - noParserHints, + NoParserHints(), }, { "multiple fields", @@ -278,7 +278,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("app", "foo", "namespace", "prod", ), - noParserHints, + NoParserHints(), }, { "utf8", @@ -288,7 +288,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("utf8", "value"), - noParserHints, + NoParserHints(), }, { "nested field", @@ -298,7 +298,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", "foo"), - noParserHints, + NoParserHints(), }, { "nested field alternate syntax", @@ -308,7 +308,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", "foo"), - noParserHints, + NoParserHints(), }, { "nested field alternate syntax 2", @@ -318,7 +318,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", "foo"), - noParserHints, + NoParserHints(), }, { "nested field alternate syntax 3", @@ -328,7 +328,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", "foo"), - noParserHints, + NoParserHints(), }, { "array element", @@ -338,7 +338,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("param", "1"), - noParserHints, + NoParserHints(), }, { "full array", @@ -348,7 +348,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("params", "[1,2,3]"), - noParserHints, + NoParserHints(), }, { "full object", @@ -358,7 +358,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("deployment", `{"ref":"foobar", "params": [1,2,3]}`), - noParserHints, + NoParserHints(), }, { "expression matching nothing", @@ -368,7 +368,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("nope", ""), - noParserHints, + NoParserHints(), }, { "null field", @@ -379,7 +379,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.EmptyLabels(), labels.FromStrings("nf", ""), // null is coerced to an empty string - noParserHints, + NoParserHints(), }, { "boolean field", @@ -389,7 +389,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("bool", `false`), - noParserHints, + NoParserHints(), }, { "label override", @@ -401,7 +401,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("uuid", "bar", "uuid_extracted", "foo", ), - noParserHints, + NoParserHints(), }, { "non-matching expression", @@ -413,7 +413,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("uuid", "bar", "request_size", "", ), - noParserHints, + NoParserHints(), }, { "empty line", @@ -423,7 +423,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.EmptyLabels(), labels.FromStrings("uuid", ""), - noParserHints, + NoParserHints(), }, { "existing labels are not affected", @@ -435,7 +435,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("foo", "bar", "uuid", "", ), - noParserHints, + NoParserHints(), }, { "invalid JSON line", @@ -447,7 +447,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("foo", "bar", logqlmodel.ErrorLabel, errJSON, ), - noParserHints, + NoParserHints(), }, { "invalid JSON line with hints", @@ -470,7 +470,7 @@ func TestJSONExpressionParser(t *testing.T) { }, labels.FromStrings("foo", "bar"), labels.FromStrings("foo", "bar"), - noParserHints, + NoParserHints(), }, { "nested escaped object", @@ -482,7 +482,7 @@ func TestJSONExpressionParser(t *testing.T) { labels.FromStrings("foo", "bar", "app", `{ "key": "value", "key2":"value2"}`, ), - noParserHints, + NoParserHints(), }, } for _, tt := range tests { @@ -746,7 +746,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", "logfmt syntax error at pos 8 : unexpected '='", ), - noParserHints, + NoParserHints(), }, { "not logfmt with hints", @@ -766,7 +766,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.EmptyLabels(), labels.FromStrings("buzz", "foo"), nil, - noParserHints, + NoParserHints(), }, { "key alone logfmt", @@ -775,7 +775,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("foo", "bar", "bar", "foo"), nil, - noParserHints, + NoParserHints(), }, { "quoted logfmt", @@ -785,7 +785,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo bar", ), nil, - noParserHints, + NoParserHints(), }, { "escaped control chars in logfmt", @@ -795,7 +795,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo\nbar\tbaz", ), nil, - noParserHints, + NoParserHints(), }, { "literal control chars in logfmt", @@ -805,7 +805,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo\nbar\tbaz", ), nil, - noParserHints, + NoParserHints(), }, { "escaped slash logfmt", @@ -815,7 +815,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", `foo ba\r baz`, ), nil, - noParserHints, + NoParserHints(), }, { "literal newline and escaped slash logfmt", @@ -825,7 +825,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "foo bar\nb\\az", ), nil, - noParserHints, + NoParserHints(), }, { "double property logfmt", @@ -836,7 +836,7 @@ func TestLogfmtParser_parse(t *testing.T) { "latency", "10ms", ), nil, - noParserHints, + NoParserHints(), }, { "duplicate from line property", @@ -846,7 +846,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "10ms", ), nil, - noParserHints, + NoParserHints(), }, { "duplicate property", @@ -857,7 +857,7 @@ func TestLogfmtParser_parse(t *testing.T) { "foobar", "10ms", ), nil, - noParserHints, + NoParserHints(), }, { "invalid key names", @@ -869,7 +869,7 @@ func TestLogfmtParser_parse(t *testing.T) { "test_dash", "foo", ), nil, - noParserHints, + NoParserHints(), }, { "nil", @@ -877,7 +877,7 @@ func TestLogfmtParser_parse(t *testing.T) { labels.FromStrings("foo", "bar"), labels.FromStrings("foo", "bar"), nil, - noParserHints, + NoParserHints(), }, { "empty key", @@ -892,7 +892,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", "logfmt syntax error at pos 15 : unexpected '='", ), - noParserHints, + NoParserHints(), }, { "error rune in key", @@ -906,7 +906,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", "logfmt syntax error at pos 20 : invalid key", ), - noParserHints, + NoParserHints(), }, { "double quote in key", @@ -920,7 +920,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", `logfmt syntax error at pos 17 : unexpected '"'`, ), - noParserHints, + NoParserHints(), }, { "= in value", @@ -933,7 +933,7 @@ func TestLogfmtParser_parse(t *testing.T) { "__error__", "LogfmtParserErr", "__error_details__", `logfmt syntax error at pos 7 : unexpected '='`, ), - noParserHints, + NoParserHints(), }, } @@ -1200,7 +1200,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`some message`), - noParserHints, + NoParserHints(), }, { "wrong json", @@ -1210,7 +1210,7 @@ func Test_unpackParser_Parse(t *testing.T) { "__error_details__", "expecting json object(6), but it is not", ), []byte(`"app":"foo","namespace":"prod","_entry":"some message","pod":{"uid":"1"}`), - noParserHints, + NoParserHints(), }, { "empty line", @@ -1218,7 +1218,7 @@ func Test_unpackParser_Parse(t *testing.T) { labels.FromStrings("cluster", "us-central1"), labels.FromStrings("cluster", "us-central1"), []byte(``), - noParserHints, + NoParserHints(), }, { "wrong json with hints", @@ -1240,7 +1240,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`["foo","bar"]`), - noParserHints, + NoParserHints(), }, { "should rename", @@ -1254,7 +1254,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`some message`), - noParserHints, + NoParserHints(), }, { "should not change log and labels if no packed entry", @@ -1266,7 +1266,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`{"bar":1,"app":"foo","namespace":"prod","pod":{"uid":"1"}}`), - noParserHints, + NoParserHints(), }, { "non json with escaped quotes", @@ -1278,7 +1278,7 @@ func Test_unpackParser_Parse(t *testing.T) { "cluster", "us-central1", ), []byte(`I0303 17:49:45.976518 1526 kubelet_getters.go:178] "Pod status updated" pod="openshift-etcd/etcd-ip-10-0-150-50.us-east-2.compute.internal" status=Running`), - noParserHints, + NoParserHints(), }, { "invalid key names", @@ -1289,7 +1289,7 @@ func Test_unpackParser_Parse(t *testing.T) { "test_dash", "foo", ), []byte(`some message`), - noParserHints, + NoParserHints(), }, } for _, tt := range tests { diff --git a/pkg/logql/syntax/parser.go b/pkg/logql/syntax/parser.go index 710bf7132c4c..79213049f376 100644 --- a/pkg/logql/syntax/parser.go +++ b/pkg/logql/syntax/parser.go @@ -31,7 +31,13 @@ var parserPool = sync.Pool{ }, } -const maxInputSize = 5120 +// (E.Welch) We originally added this limit from fuzz testing and realizing there should be some maximum limit to an allowed query size. +// The original limit was 5120 based on some internet searching and a best estimate of what a reasonable limit would be. +// We have seen use cases with queries containing a lot of filter expressions or long expanded variable names where this limit was too small. +// Apparently the spec does not specify a limit, and more internet searching suggests almost all browsers will handle 100k+ length urls without issue +// Some limit here still seems prudent however, so the new limit is now 128k. +// Also note this is used to allocate the buffer for reading the query string, so there is some memory cost to making this larger. +const maxInputSize = 131072 func init() { // Improve the error messages coming out of yacc. diff --git a/pkg/querier/queryrange/index_stats_cache.go b/pkg/querier/queryrange/index_stats_cache.go index a985167456a7..d52f2e22323f 100644 --- a/pkg/querier/queryrange/index_stats_cache.go +++ b/pkg/querier/queryrange/index_stats_cache.go @@ -93,6 +93,7 @@ func NewIndexStatsCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -102,7 +103,7 @@ func NewIndexStatsCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( log, c, - IndexStatsSplitter{cacheKeyLimits{limits, transformer}}, + IndexStatsSplitter{cacheKeyLimits{limits, transformer, iqo}}, limits, merger, IndexStatsExtractor{}, diff --git a/pkg/querier/queryrange/index_stats_cache_test.go b/pkg/querier/queryrange/index_stats_cache_test.go index c8119c6b9fe2..1127b88576e1 100644 --- a/pkg/querier/queryrange/index_stats_cache_test.go +++ b/pkg/querier/queryrange/index_stats_cache_test.go @@ -37,6 +37,7 @@ func TestIndexStatsCache(t *testing.T) { c, nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -180,6 +181,7 @@ func TestIndexStatsCache_RecentData(t *testing.T) { c, nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, diff --git a/pkg/querier/queryrange/ingester_query_window.go b/pkg/querier/queryrange/ingester_query_window.go new file mode 100644 index 000000000000..7a161f40c007 --- /dev/null +++ b/pkg/querier/queryrange/ingester_query_window.go @@ -0,0 +1,33 @@ +package queryrange + +import ( + "time" + + "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/validation" +) + +// SplitIntervalForTimeRange returns the correct split interval to use. It accounts for the given upperBound value being +// within the ingester query window, in which case it returns the ingester query split (unless it's not set, then the default +// split interval will be used). +func SplitIntervalForTimeRange(iqo util.IngesterQueryOptions, limits Limits, defaultSplitFn func(string) time.Duration, tenantIDs []string, ref, upperBound time.Time) time.Duration { + split := validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, defaultSplitFn) + + if iqo == nil { + return split + } + + // if the query is within the ingester query window, choose the ingester split duration (if configured), otherwise + // revert to the default split duration + ingesterQueryWindowStart := ref.Add(-iqo.QueryIngestersWithin()) + + // query is (even partially) within the ingester query window + if upperBound.After(ingesterQueryWindowStart) { + ingesterSplit := validation.MaxDurationOrZeroPerTenant(tenantIDs, limits.IngesterQuerySplitDuration) + if !iqo.QueryStoreOnly() && ingesterSplit > 0 { + split = ingesterSplit + } + } + + return split +} diff --git a/pkg/querier/queryrange/labels_cache.go b/pkg/querier/queryrange/labels_cache.go index 1e0dd225fa7b..66c811490403 100644 --- a/pkg/querier/queryrange/labels_cache.go +++ b/pkg/querier/queryrange/labels_cache.go @@ -11,18 +11,21 @@ import ( "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" + "github.com/grafana/loki/pkg/util" ) type cacheKeyLabels struct { Limits transformer UserIDTransformer + iqo util.IngesterQueryOptions } // GenerateCacheKey generates a cache key based on the userID, split duration and the interval of the request. // It also includes the label name and the provided query for label values request. func (i cacheKeyLabels) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { lr := r.(*LabelRequest) - split := i.MetadataQuerySplitDuration(userID) + + split := SplitIntervalForTimeRange(i.iqo, i.Limits, i.MetadataQuerySplitDuration, []string{userID}, time.Now().UTC(), r.GetEnd().UTC()) var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { @@ -77,6 +80,7 @@ func NewLabelsCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -86,12 +90,14 @@ func NewLabelsCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( logger, c, - cacheKeyLabels{limits, transformer}, + cacheKeyLabels{limits, transformer, iqo}, limits, merger, labelsExtractor{}, cacheGenNumberLoader, - shouldCache, + func(ctx context.Context, r queryrangebase.Request) bool { + return shouldCacheMetadataReq(ctx, logger, shouldCache, r, limits) + }, parallelismForReq, retentionEnabled, metrics, diff --git a/pkg/querier/queryrange/labels_cache_test.go b/pkg/querier/queryrange/labels_cache_test.go index 73ab9ad8f4f8..4c645b8d19ce 100644 --- a/pkg/querier/queryrange/labels_cache_test.go +++ b/pkg/querier/queryrange/labels_cache_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "fmt" + "regexp" "testing" "time" @@ -69,6 +70,7 @@ func TestLabelsCache(t *testing.T) { cache.NewMockCache(), nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -249,3 +251,215 @@ func TestLabelsCache(t *testing.T) { }) } } + +func TestLabelCache_freshness(t *testing.T) { + testTime := time.Now().Add(-1 * time.Hour) + from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime) + start, end := from.Time(), through.Time() + nonOverlappingStart, nonOverlappingEnd := from.Add(-24*time.Hour).Time(), through.Add(-24*time.Hour).Time() + + for _, tt := range []struct { + name string + req *LabelRequest + shouldCache bool + maxMetadataCacheFreshness time.Duration + }{ + { + name: "max metadata freshness not set", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &start, + End: &end, + }, + }, + shouldCache: true, + }, + { + name: "req overlaps with max cache freshness window", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &start, + End: &end, + }, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: false, + }, + { + name: "req does not overlap max cache freshness window", + req: &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &nonOverlappingStart, + End: &nonOverlappingEnd, + }, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + cacheMiddleware, err := NewLabelsCacheMiddleware( + log.NewNopLogger(), + fakeLimits{ + metadataSplitDuration: map[string]time.Duration{ + "fake": 24 * time.Hour, + }, + maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness, + }, + DefaultCodec, + cache.NewMockCache(), + nil, + nil, + nil, + func(_ context.Context, _ []string, _ queryrangebase.Request) int { + return 1 + }, + false, + nil, + nil, + ) + require.NoError(t, err) + + labelsResp := &LokiLabelNamesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: []string{"bar", "buzz"}, + Statistics: stats.Result{ + Summary: stats.Summary{ + Splits: 1, + }, + }, + } + + called := 0 + handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + called++ + + // should request the entire length with no partitioning as nothing is cached yet. + require.Equal(t, tt.req.GetStart(), r.GetStart()) + require.Equal(t, tt.req.GetEnd(), r.GetEnd()) + + return labelsResp, nil + })) + + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, tt.req) + require.NoError(t, err) + require.Equal(t, 1, called) // called actual handler, as not cached. + require.Equal(t, labelsResp, got) + + called = 0 + got, err = handler.Do(ctx, tt.req) + require.NoError(t, err) + if !tt.shouldCache { + require.Equal(t, 1, called) + } else { + require.Equal(t, 0, called) + } + require.Equal(t, labelsResp, got) + }) + } +} + +func TestLabelQueryCacheKey(t *testing.T) { + const ( + defaultTenant = "a" + alternateTenant = "b" + defaultSplit = time.Hour + ingesterSplit = 90 * time.Minute + ingesterQueryWindow = defaultSplit * 3 + ) + + l := fakeLimits{ + metadataSplitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit}, + ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit}, + } + + cases := []struct { + name, tenantID string + start, end time.Time + expectedSplit time.Duration + iqo util.IngesterQueryOptions + values bool + }{ + { + name: "outside ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-5 * time.Hour), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: ingesterSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window, but query store only", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: true, + }, + }, + { + name: "within ingester query window, but no ingester split duration configured", + tenantID: alternateTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + } + + for _, values := range []bool{true, false} { + for _, tc := range cases { + t.Run(fmt.Sprintf("%s (values: %v)", tc.name, values), func(t *testing.T) { + keyGen := cacheKeyLabels{l, nil, tc.iqo} + + r := &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &tc.start, + End: &tc.end, + }, + } + + const labelName = "foo" + const query = `{cluster="eu-west1"}` + + if values { + r.LabelRequest.Values = true + r.LabelRequest.Name = labelName + r.LabelRequest.Query = query + } + + // we use regex here because cache key always refers to the current time to get the ingester query window, + // and therefore we can't know the current interval apriori without duplicating the logic + var pattern *regexp.Regexp + if values { + pattern = regexp.MustCompile(fmt.Sprintf(`labelvalues:%s:%s:%s:(\d+):%d`, tc.tenantID, labelName, regexp.QuoteMeta(query), tc.expectedSplit)) + } else { + pattern = regexp.MustCompile(fmt.Sprintf(`labels:%s:(\d+):%d`, tc.tenantID, tc.expectedSplit)) + } + + require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r)) + }) + } + } +} diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 673c995a600b..79cc9ad16a36 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -30,6 +30,7 @@ import ( "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" "github.com/grafana/loki/pkg/storage/config" "github.com/grafana/loki/pkg/storage/stores/index/stats" + "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" "github.com/grafana/loki/pkg/util/validation" @@ -102,10 +103,11 @@ type UserIDTransformer func(context.Context, string) string type cacheKeyLimits struct { Limits transformer UserIDTransformer + iqo util.IngesterQueryOptions } func (l cacheKeyLimits) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { - split := l.QuerySplitDuration(userID) + split := SplitIntervalForTimeRange(l.iqo, l.Limits, l.QuerySplitDuration, []string{userID}, time.Now().UTC(), r.GetEnd().UTC()) var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { diff --git a/pkg/querier/queryrange/limits/definitions.go b/pkg/querier/queryrange/limits/definitions.go index 57b2e03c6697..e12255883bf4 100644 --- a/pkg/querier/queryrange/limits/definitions.go +++ b/pkg/querier/queryrange/limits/definitions.go @@ -30,5 +30,6 @@ type Limits interface { MaxQueryBytesRead(context.Context, string) int MaxQuerierBytesRead(context.Context, string) int MaxStatsCacheFreshness(context.Context, string) time.Duration + MaxMetadataCacheFreshness(context.Context, string) time.Duration VolumeEnabled(string) bool } diff --git a/pkg/querier/queryrange/limits_test.go b/pkg/querier/queryrange/limits_test.go index 0de342e42644..a80cf96dde80 100644 --- a/pkg/querier/queryrange/limits_test.go +++ b/pkg/querier/queryrange/limits_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "regexp" "sync" "testing" "time" @@ -22,6 +23,7 @@ import ( "github.com/grafana/loki/pkg/querier/plan" base "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/math" @@ -48,10 +50,99 @@ func TestLimits(t *testing.T) { require.Equal( t, fmt.Sprintf("%s:%s:%d:%d:%d", "a", r.GetQuery(), r.GetStep(), r.GetStart().UnixMilli()/int64(time.Hour/time.Millisecond), int64(time.Hour)), - cacheKeyLimits{wrapped, nil}.GenerateCacheKey(context.Background(), "a", r), + cacheKeyLimits{wrapped, nil, nil}.GenerateCacheKey(context.Background(), "a", r), ) } +func TestMetricQueryCacheKey(t *testing.T) { + const ( + defaultTenant = "a" + alternateTenant = "b" + query = `sum(rate({foo="bar"}[1]))` + defaultSplit = time.Hour + ingesterSplit = 90 * time.Minute + ingesterQueryWindow = defaultSplit * 3 + ) + + var ( + step = (15 * time.Second).Milliseconds() + ) + + l := fakeLimits{ + splitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit}, + ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit}, + } + + cases := []struct { + name, tenantID string + start, end time.Time + expectedSplit time.Duration + iqo util.IngesterQueryOptions + }{ + { + name: "outside ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-5 * time.Hour), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: ingesterSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window, but query store only", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: true, + }, + }, + { + name: "within ingester query window, but no ingester split duration configured", + tenantID: alternateTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + keyGen := cacheKeyLimits{l, nil, tc.iqo} + + r := &LokiRequest{ + Query: query, + StartTs: tc.start, + EndTs: tc.end, + Step: step, + } + + // we use regex here because cache key always refers to the current time to get the ingester query window, + // and therefore we can't know the current interval apriori without duplicating the logic + pattern := regexp.MustCompile(fmt.Sprintf(`%s:%s:%d:(\d+):%d`, tc.tenantID, regexp.QuoteMeta(query), step, tc.expectedSplit)) + require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r)) + }) + } +} + func Test_seriesLimiter(t *testing.T) { cfg := testConfig cfg.CacheResults = false @@ -308,7 +399,7 @@ func Test_MaxQueryLookBack_Types(t *testing.T) { } func Test_GenerateCacheKey_NoDivideZero(t *testing.T) { - l := cacheKeyLimits{WithSplitByLimits(nil, 0), nil} + l := cacheKeyLimits{WithSplitByLimits(nil, 0), nil, nil} start := time.Now() r := &LokiRequest{ Query: "qry", diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 6d0d62af7a88..8223704eea02 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -177,38 +177,36 @@ func NewMiddleware( var codec base.Codec = DefaultCodec - split := newDefaultSplitter(limits, iqo) - - indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, split, statsCache, + indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, iqo, statsCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) if err != nil { return nil, nil, err } - metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, newMetricQuerySplitter(limits, iqo), resultsCache, + metricsTripperware, err := NewMetricTripperware(cfg, engineOpts, log, limits, schema, codec, iqo, resultsCache, cacheGenNumLoader, retentionEnabled, PrometheusExtractor{}, metrics, indexStatsTripperware, metricsNamespace) if err != nil { return nil, nil, err } - limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec, split) + limitedTripperware, err := NewLimitedTripperware(cfg, engineOpts, log, limits, schema, metrics, indexStatsTripperware, codec, iqo) if err != nil { return nil, nil, err } // NOTE: When we would start caching response from non-metric queries we would have to consider cache gen headers as well in // MergeResponse implementation for Loki codecs same as it is done in Cortex at https://github.com/cortexproject/cortex/blob/21bad57b346c730d684d6d0205efef133422ab28/pkg/querier/queryrange/query_range.go#L170 - logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, split, resultsCache, metrics, indexStatsTripperware, metricsNamespace) + logFilterTripperware, err := NewLogFilterTripperware(cfg, engineOpts, log, limits, schema, codec, iqo, resultsCache, metrics, indexStatsTripperware, metricsNamespace) if err != nil { return nil, nil, err } - seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, split, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace) + seriesTripperware, err := NewSeriesTripperware(cfg, log, limits, metrics, schema, codec, iqo, seriesCache, cacheGenNumLoader, retentionEnabled, metricsNamespace) if err != nil { return nil, nil, err } - labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, split, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace) + labelsTripperware, err := NewLabelsTripperware(cfg, log, limits, codec, iqo, labelsCache, cacheGenNumLoader, retentionEnabled, metrics, schema, metricsNamespace) if err != nil { return nil, nil, err } @@ -218,7 +216,7 @@ func NewMiddleware( return nil, nil, err } - seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, split, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) + seriesVolumeTripperware, err := NewVolumeTripperware(cfg, log, limits, schema, codec, iqo, volumeCache, cacheGenNumLoader, retentionEnabled, metrics, metricsNamespace) if err != nil { return nil, nil, err } @@ -409,7 +407,7 @@ func getOperation(path string) string { } // NewLogFilterTripperware creates a new frontend tripperware responsible for handling log requests. -func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { +func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) @@ -418,7 +416,7 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo NewLimitsMiddleware(limits), NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), } if cfg.CacheResults { @@ -473,7 +471,7 @@ func NewLogFilterTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Lo } // NewLimitedTripperware creates a new frontend tripperware responsible for handling log requests which are label matcher only, no filter expression. -func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger, split splitter) (base.Middleware, error) { +func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, metrics *Metrics, indexStatsTripperware base.Middleware, merger base.Merger, iqo util.IngesterQueryOptions) (base.Middleware, error) { return base.MiddlewareFunc(func(next base.Handler) base.Handler { statsHandler := indexStatsTripperware.Wrap(next) @@ -482,7 +480,7 @@ func NewLimitedTripperware(_ Config, engineOpts logql.EngineOpts, log log.Logger NewLimitsMiddleware(limits), NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, WithMaxParallelism(limits, limitedQuerySplits), merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), NewQuerierSizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), } @@ -501,7 +499,7 @@ func NewSeriesTripperware( metrics *Metrics, schema config.SchemaConfig, merger base.Merger, - split splitter, + iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, @@ -516,6 +514,7 @@ func NewSeriesTripperware( merger, c, cacheGenNumLoader, + iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -542,7 +541,7 @@ func NewSeriesTripperware( StatsCollectorMiddleware(), NewLimitsMiddleware(limits), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), } if cfg.CacheSeriesResults { @@ -584,7 +583,7 @@ func NewLabelsTripperware( log log.Logger, limits Limits, merger base.Merger, - split splitter, + iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, @@ -601,6 +600,7 @@ func NewLabelsTripperware( merger, c, cacheGenNumLoader, + iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -627,7 +627,7 @@ func NewLabelsTripperware( StatsCollectorMiddleware(), NewLimitsMiddleware(limits), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, newDefaultSplitter(limits, iqo), metrics.SplitByMetrics), } if cfg.CacheLabelResults { @@ -652,8 +652,8 @@ func NewLabelsTripperware( } // NewMetricTripperware creates a new frontend tripperware responsible for handling metric queries -func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { - cacheKey := cacheKeyLimits{limits, cfg.Transformer} +func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, extractor base.Extractor, metrics *Metrics, indexStatsTripperware base.Middleware, metricsNamespace string) (base.Middleware, error) { + cacheKey := cacheKeyLimits{limits, cfg.Transformer, iqo} var queryCacheMiddleware base.Middleware if cfg.CacheResults { var err error @@ -706,7 +706,7 @@ func NewMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log log.Logge queryRangeMiddleware, NewQuerySizeLimiterMiddleware(schema.Configs, engineOpts, log, limits, statsHandler), base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), - SplitByIntervalMiddleware(schema.Configs, limits, merger, split, metrics.SplitByMetrics), + SplitByIntervalMiddleware(schema.Configs, limits, merger, newMetricQuerySplitter(limits, iqo), metrics.SplitByMetrics), ) if cfg.CacheResults { @@ -804,7 +804,7 @@ func NewInstantMetricTripperware(cfg Config, engineOpts logql.EngineOpts, log lo }), nil } -func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { +func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { // Parallelize the volume requests, so it doesn't send a huge request to a single index-gw (i.e. {app=~".+"} for 30d). // Indices are sharded by 24 hours, so we split the volume request in 24h intervals. limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval) @@ -817,6 +817,7 @@ func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema conf merger, c, cacheGenNumLoader, + iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -843,7 +844,7 @@ func NewVolumeTripperware(cfg Config, log log.Logger, limits Limits, schema conf cacheMiddleware, cfg, merger, - split, + newDefaultSplitter(limits, iqo), limits, log, metrics, @@ -912,7 +913,7 @@ func volumeFeatureFlagRoundTripper(nextTW base.Middleware, limits Limits) base.M }) } -func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, split splitter, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { +func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema config.SchemaConfig, merger base.Merger, iqo util.IngesterQueryOptions, c cache.Cache, cacheGenNumLoader base.CacheGenNumberLoader, retentionEnabled bool, metrics *Metrics, metricsNamespace string) (base.Middleware, error) { limits = WithSplitByLimits(limits, indexStatsQuerySplitInterval) var cacheMiddleware base.Middleware @@ -924,6 +925,7 @@ func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema merger, c, cacheGenNumLoader, + iqo, func(_ context.Context, r base.Request) bool { return !r.GetCachingOptions().Disabled }, @@ -950,7 +952,7 @@ func NewIndexStatsTripperware(cfg Config, log log.Logger, limits Limits, schema cacheMiddleware, cfg, merger, - split, + newDefaultSplitter(limits, iqo), limits, log, metrics, diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index fe8799fffe79..c7c7cff4595a 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -1237,23 +1237,24 @@ func TestMetricsTripperware_SplitShardStats(t *testing.T) { } type fakeLimits struct { - maxQueryLength time.Duration - maxQueryParallelism int - tsdbMaxQueryParallelism int - maxQueryLookback time.Duration - maxEntriesLimitPerQuery int - maxSeries int - splitDuration map[string]time.Duration - metadataSplitDuration map[string]time.Duration - ingesterSplitDuration map[string]time.Duration - minShardingLookback time.Duration - queryTimeout time.Duration - requiredLabels []string - requiredNumberLabels int - maxQueryBytesRead int - maxQuerierBytesRead int - maxStatsCacheFreshness time.Duration - volumeEnabled bool + maxQueryLength time.Duration + maxQueryParallelism int + tsdbMaxQueryParallelism int + maxQueryLookback time.Duration + maxEntriesLimitPerQuery int + maxSeries int + splitDuration map[string]time.Duration + metadataSplitDuration map[string]time.Duration + ingesterSplitDuration map[string]time.Duration + minShardingLookback time.Duration + queryTimeout time.Duration + requiredLabels []string + requiredNumberLabels int + maxQueryBytesRead int + maxQuerierBytesRead int + maxStatsCacheFreshness time.Duration + maxMetadataCacheFreshness time.Duration + volumeEnabled bool } func (f fakeLimits) QuerySplitDuration(key string) time.Duration { @@ -1344,6 +1345,10 @@ func (f fakeLimits) MaxStatsCacheFreshness(_ context.Context, _ string) time.Dur return f.maxStatsCacheFreshness } +func (f fakeLimits) MaxMetadataCacheFreshness(_ context.Context, _ string) time.Duration { + return f.maxMetadataCacheFreshness +} + func (f fakeLimits) VolumeEnabled(_ string) bool { return f.volumeEnabled } diff --git a/pkg/querier/queryrange/series_cache.go b/pkg/querier/queryrange/series_cache.go index 9ad67f70acf5..bbbf96e2dd70 100644 --- a/pkg/querier/queryrange/series_cache.go +++ b/pkg/querier/queryrange/series_cache.go @@ -9,21 +9,29 @@ import ( "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/common/model" + + "github.com/grafana/dskit/tenant" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/chunk/cache/resultscache" + "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/validation" ) type cacheKeySeries struct { Limits transformer UserIDTransformer + iqo util.IngesterQueryOptions } // GenerateCacheKey generates a cache key based on the userID, matchers, split duration and the interval of the request. func (i cacheKeySeries) GenerateCacheKey(ctx context.Context, userID string, r resultscache.Request) string { sr := r.(*LokiSeriesRequest) - split := i.MetadataQuerySplitDuration(userID) + + split := SplitIntervalForTimeRange(i.iqo, i.Limits, i.MetadataQuerySplitDuration, []string{userID}, time.Now().UTC(), r.GetEnd().UTC()) var currentInterval int64 if denominator := int64(split / time.Millisecond); denominator > 0 { @@ -34,11 +42,12 @@ func (i cacheKeySeries) GenerateCacheKey(ctx context.Context, userID string, r r userID = i.transformer(ctx, userID) } - matchers := sr.GetMatch() - sort.Strings(matchers) - matcherStr := strings.Join(matchers, ",") + return fmt.Sprintf("series:%s:%s:%d:%d", userID, i.joinMatchers(sr.GetMatch()), currentInterval, split) +} - return fmt.Sprintf("series:%s:%s:%d:%d", userID, matcherStr, currentInterval, split) +func (i cacheKeySeries) joinMatchers(matchers []string) string { + sort.Strings(matchers) + return strings.Join(matchers, ",") } type seriesExtractor struct{} @@ -78,6 +87,7 @@ func NewSeriesCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -87,14 +97,33 @@ func NewSeriesCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( logger, c, - cacheKeySeries{limits, transformer}, + cacheKeySeries{limits, transformer, iqo}, limits, merger, seriesExtractor{}, cacheGenNumberLoader, - shouldCache, + func(ctx context.Context, r queryrangebase.Request) bool { + return shouldCacheMetadataReq(ctx, logger, shouldCache, r, limits) + }, parallelismForReq, retentionEnabled, metrics, ) } + +func shouldCacheMetadataReq(ctx context.Context, logger log.Logger, shouldCache queryrangebase.ShouldCacheFn, req queryrangebase.Request, l Limits) bool { + if shouldCache != nil && !shouldCache(ctx, req) { + return false + } + + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + level.Error(logger).Log("msg", "failed to determine if metadata request should be cached. won't cache", "err", err) + return false + } + + cacheFreshnessCapture := func(id string) time.Duration { return l.MaxMetadataCacheFreshness(ctx, id) } + maxCacheFreshness := validation.MaxDurationPerTenant(tenantIDs, cacheFreshnessCapture) + + return maxCacheFreshness == 0 || model.Time(req.GetEnd().UnixMilli()).Before(model.Now().Add(-maxCacheFreshness)) +} diff --git a/pkg/querier/queryrange/series_cache_test.go b/pkg/querier/queryrange/series_cache_test.go index abe992001217..d73efa9deea8 100644 --- a/pkg/querier/queryrange/series_cache_test.go +++ b/pkg/querier/queryrange/series_cache_test.go @@ -3,6 +3,7 @@ package queryrange import ( "context" "fmt" + "regexp" "testing" "time" @@ -77,6 +78,7 @@ func TestSeriesCache(t *testing.T) { cache.NewMockCache(), nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -312,3 +314,203 @@ func TestSeriesCache(t *testing.T) { } }) } + +func TestSeriesCache_freshness(t *testing.T) { + testTime := time.Now().Add(-1 * time.Hour) + from, through := util.RoundToMilliseconds(testTime.Add(-1*time.Hour), testTime) + + for _, tt := range []struct { + name string + req *LokiSeriesRequest + shouldCache bool + maxMetadataCacheFreshness time.Duration + }{ + { + name: "max metadata freshness not set", + req: &LokiSeriesRequest{ + StartTs: from.Time(), + EndTs: through.Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + shouldCache: true, + }, + { + name: "req overlaps with max cache freshness window", + req: &LokiSeriesRequest{ + StartTs: from.Time(), + EndTs: through.Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: false, + }, + { + name: "req does not overlap max cache freshness window", + req: &LokiSeriesRequest{ + StartTs: from.Add(-24 * time.Hour).Time(), + EndTs: through.Add(-24 * time.Hour).Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + }, + maxMetadataCacheFreshness: 24 * time.Hour, + shouldCache: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + cacheMiddleware, err := NewSeriesCacheMiddleware( + log.NewNopLogger(), + fakeLimits{ + metadataSplitDuration: map[string]time.Duration{ + "fake": 24 * time.Hour, + }, + maxMetadataCacheFreshness: tt.maxMetadataCacheFreshness, + }, + DefaultCodec, + cache.NewMockCache(), + nil, + nil, + nil, + func(_ context.Context, _ []string, _ queryrangebase.Request) int { + return 1 + }, + false, + nil, + nil, + ) + require.NoError(t, err) + + seriesResp := &LokiSeriesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: []logproto.SeriesIdentifier{ + { + Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, + }, + }, + Statistics: stats.Result{ + Summary: stats.Summary{ + Splits: 1, + }, + }, + } + + called := 0 + handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + called++ + + // should request the entire length with no partitioning as nothing is cached yet. + require.Equal(t, tt.req.GetStart(), r.GetStart()) + require.Equal(t, tt.req.GetEnd(), r.GetEnd()) + + return seriesResp, nil + })) + + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, tt.req) + require.NoError(t, err) + require.Equal(t, 1, called) // called actual handler, as not cached. + require.Equal(t, seriesResp, got) + + called = 0 + got, err = handler.Do(ctx, tt.req) + require.NoError(t, err) + if !tt.shouldCache { + require.Equal(t, 1, called) + } else { + require.Equal(t, 0, called) + } + require.Equal(t, seriesResp, got) + }) + } +} + +func TestSeriesQueryCacheKey(t *testing.T) { + const ( + defaultTenant = "a" + alternateTenant = "b" + defaultSplit = time.Hour + ingesterSplit = 90 * time.Minute + ingesterQueryWindow = defaultSplit * 3 + ) + + l := fakeLimits{ + metadataSplitDuration: map[string]time.Duration{defaultTenant: defaultSplit, alternateTenant: defaultSplit}, + ingesterSplitDuration: map[string]time.Duration{defaultTenant: ingesterSplit}, + } + + cases := []struct { + name, tenantID string + start, end time.Time + expectedSplit time.Duration + iqo util.IngesterQueryOptions + values bool + }{ + { + name: "outside ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-5 * time.Hour), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: ingesterSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + { + name: "within ingester query window, but query store only", + tenantID: defaultTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: true, + }, + }, + { + name: "within ingester query window, but no ingester split duration configured", + tenantID: alternateTenant, + start: time.Now().Add(-6 * time.Hour), + end: time.Now().Add(-ingesterQueryWindow / 2), + expectedSplit: defaultSplit, + iqo: ingesterQueryOpts{ + queryIngestersWithin: ingesterQueryWindow, + queryStoreOnly: false, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + matchers := []string{`{namespace="prod"}`, `{service="foo"}`} + + keyGen := cacheKeySeries{l, nil, tc.iqo} + + r := &LokiSeriesRequest{ + StartTs: tc.start, + EndTs: tc.end, + Match: matchers, + Path: seriesAPIPath, + } + + // we use regex here because cache key always refers to the current time to get the ingester query window, + // and therefore we can't know the current interval apriori without duplicating the logic + pattern := regexp.MustCompile(fmt.Sprintf(`series:%s:%s:(\d+):%d`, tc.tenantID, regexp.QuoteMeta(keyGen.joinMatchers(matchers)), tc.expectedSplit)) + + require.Regexp(t, pattern, keyGen.GenerateCacheKey(context.Background(), tc.tenantID, r)) + }) + } +} diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index b332fe5e612e..ef05aa969ec1 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -186,7 +186,7 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (que case *LokiSeriesRequest, *LabelRequest: interval = validation.MaxDurationOrZeroPerTenant(tenantIDs, h.limits.MetadataQuerySplitDuration) default: - interval = validation.MaxDurationOrZeroPerTenant(tenantIDs, h.limits.QuerySplitDuration) + interval = validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, h.limits.QuerySplitDuration) } // skip split by if unset diff --git a/pkg/querier/queryrange/splitters.go b/pkg/querier/queryrange/splitters.go index 79e3d5352e06..0aaecf35cb96 100644 --- a/pkg/querier/queryrange/splitters.go +++ b/pkg/querier/queryrange/splitters.go @@ -98,7 +98,7 @@ func (s *defaultSplitter) split(execTime time.Time, tenantIDs []string, req quer start, end, needsIngesterSplits := ingesterQueryBounds(execTime, s.iqo, req) - if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { + if ingesterQueryInterval := validation.MaxDurationOrZeroPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { // perform splitting using special interval (`split_ingester_queries_by_interval`) util.ForInterval(ingesterQueryInterval, start, end, endTimeInclusive, factory) @@ -212,7 +212,7 @@ func (s *metricQuerySplitter) split(execTime time.Time, tenantIDs []string, r qu start, end, needsIngesterSplits = ingesterQueryBounds(execTime, s.iqo, lokiReq) start, end = s.alignStartEnd(r.GetStep(), start, end) - if ingesterQueryInterval := validation.MaxDurationPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { + if ingesterQueryInterval := validation.MaxDurationOrZeroPerTenant(tenantIDs, s.limits.IngesterQuerySplitDuration); ingesterQueryInterval != 0 && needsIngesterSplits { // perform splitting using special interval (`split_ingester_queries_by_interval`) s.buildMetricSplits(lokiReq.GetStep(), ingesterQueryInterval, start, end, factory) diff --git a/pkg/querier/queryrange/volume_cache.go b/pkg/querier/queryrange/volume_cache.go index 954c642ffef8..147d61912db9 100644 --- a/pkg/querier/queryrange/volume_cache.go +++ b/pkg/querier/queryrange/volume_cache.go @@ -101,6 +101,7 @@ func NewVolumeCacheMiddleware( merger queryrangebase.Merger, c cache.Cache, cacheGenNumberLoader queryrangebase.CacheGenNumberLoader, + iqo util.IngesterQueryOptions, shouldCache queryrangebase.ShouldCacheFn, parallelismForReq queryrangebase.ParallelismForReqFn, retentionEnabled bool, @@ -110,7 +111,7 @@ func NewVolumeCacheMiddleware( return queryrangebase.NewResultsCacheMiddleware( log, c, - VolumeSplitter{cacheKeyLimits{limits, transformer}}, + VolumeSplitter{cacheKeyLimits{limits, transformer, iqo}}, limits, merger, VolumeExtractor{}, diff --git a/pkg/querier/queryrange/volume_cache_test.go b/pkg/querier/queryrange/volume_cache_test.go index 904e0fc7c3a9..038d8fa925f5 100644 --- a/pkg/querier/queryrange/volume_cache_test.go +++ b/pkg/querier/queryrange/volume_cache_test.go @@ -39,6 +39,7 @@ func TestVolumeCache(t *testing.T) { c, nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, @@ -304,6 +305,7 @@ func TestVolumeCache_RecentData(t *testing.T) { c, nil, nil, + nil, func(_ context.Context, _ []string, _ queryrangebase.Request) int { return 1 }, diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 622e076f97b0..d7cf63e91654 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -5,6 +5,7 @@ import ( "errors" "testing" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/chunkenc" @@ -193,3 +194,50 @@ func TestMergeBuilder(t *testing.T) { querier, ) } + +func TestBlockReset(t *testing.T) { + numSeries := 100 + numKeysPerSeries := 10000 + data, _ := mkBasicSeriesWithBlooms(numSeries, numKeysPerSeries, 1, 0xffff, 0, 10000) + + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + + schema := Schema{ + version: DefaultSchemaVersion, + encoding: chunkenc.EncSnappy, + nGramLength: 10, + nGramSkip: 2, + } + + builder, err := NewBlockBuilder( + BlockOptions{ + schema: schema, + SeriesPageSize: 100, + BloomPageSize: 10 << 10, + }, + writer, + ) + + require.Nil(t, err) + itr := NewSliceIter[SeriesWithBloom](data) + _, err = builder.BuildFrom(itr) + require.Nil(t, err) + block := NewBlock(reader) + querier := NewBlockQuerier(block) + + rounds := make([][]model.Fingerprint, 2) + + for i := 0; i < len(rounds); i++ { + for querier.Next() { + rounds[i] = append(rounds[i], querier.At().Series.Fingerprint) + } + + err = querier.Seek(0) // reset at end + require.Nil(t, err) + } + + require.Equal(t, rounds[0], rounds[1]) +} diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index fb74e6a1638d..1ccc372248a8 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -297,8 +297,9 @@ func (d *SeriesPageDecoder) Next() bool { } func (d *SeriesPageDecoder) Seek(fp model.Fingerprint) { - if fp > d.header.ThroughFp || fp < d.header.FromFp { - // shortcut: we know the fingerprint is not in this page + if fp > d.header.ThroughFp { + // shortcut: we know the fingerprint is too large so nothing in this page + // will match the seek call, which returns the first found fingerprint >= fp. // so masquerade the index as if we've already iterated through d.i = d.header.NumSeries } diff --git a/pkg/storage/bloom/v1/index_querier.go b/pkg/storage/bloom/v1/index_querier.go index cbd8da7579c3..2d653e35bad9 100644 --- a/pkg/storage/bloom/v1/index_querier.go +++ b/pkg/storage/bloom/v1/index_querier.go @@ -55,7 +55,7 @@ func (it *LazySeriesIter) Seek(fp model.Fingerprint) error { page := it.b.index.pageHeaders[desiredPage] switch { - case desiredPage == len(it.b.index.pageHeaders), page.FromFp > fp: + case desiredPage == len(it.b.index.pageHeaders): // no overlap exists, either because no page was found with a throughFP >= fp // or because the first page that was found has a fromFP > fp, // meaning successive pages would also have a fromFP > fp diff --git a/pkg/storage/chunk/cache/resultscache/cache.go b/pkg/storage/chunk/cache/resultscache/cache.go index 527aea84bcd1..d05d71837404 100644 --- a/pkg/storage/chunk/cache/resultscache/cache.go +++ b/pkg/storage/chunk/cache/resultscache/cache.go @@ -50,7 +50,7 @@ type ResultsCache struct { next Handler cache cache.Cache limits Limits - splitter KeyGenerator + keyGen KeyGenerator cacheGenNumberLoader CacheGenNumberLoader retentionEnabled bool extractor Extractor @@ -86,7 +86,7 @@ func NewResultsCache( next: next, cache: c, limits: limits, - splitter: keyGen, + keyGen: keyGen, cacheGenNumberLoader: cacheGenNumberLoader, retentionEnabled: retentionEnabled, extractor: extractor, @@ -115,7 +115,7 @@ func (s ResultsCache) Do(ctx context.Context, r Request) (Response, error) { } var ( - key = s.splitter.GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), r) + key = s.keyGen.GenerateCacheKey(ctx, tenant.JoinTenantIDs(tenantIDs), r) extents []Extent response Response ) diff --git a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go index a28c76c12f78..ffe715c857ec 100644 --- a/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go +++ b/pkg/storage/stores/shipper/bloomshipper/block_downloader_test.go @@ -74,7 +74,7 @@ func Test_blockDownloader_downloadBlocks(t *testing.T) { func Test_blockDownloader_downloadBlock(t *testing.T) { tests := map[string]struct { cacheEnabled bool - expectedTotalGetBlocksCalls int + expectedTotalGetBlocksCalls int32 }{ "cache disabled": { cacheEnabled: false, @@ -129,7 +129,7 @@ func Test_blockDownloader_downloadBlock(t *testing.T) { case <-done: } require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded") - require.Equal(t, 20, blockClient.getBlockCalls) + require.Equal(t, int32(20), blockClient.getBlockCalls.Load()) blocksCh, errorsCh = downloader.downloadBlocks(context.Background(), "fake", blockReferences) downloadedBlocks = make(map[string]any, len(blockReferences)) @@ -150,7 +150,7 @@ func Test_blockDownloader_downloadBlock(t *testing.T) { case <-done: } require.Len(t, downloadedBlocks, 20, "all 20 block must be downloaded") - require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls) + require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls.Load()) }) } } @@ -158,7 +158,7 @@ func Test_blockDownloader_downloadBlock(t *testing.T) { func Test_blockDownloader_downloadBlock_deduplication(t *testing.T) { tests := map[string]struct { cacheEnabled bool - expectedTotalGetBlocksCalls int + expectedTotalGetBlocksCalls int32 }{ "requests to blockClient must be deduplicated by blockPath if cache is enabled": { cacheEnabled: true, @@ -195,7 +195,7 @@ func Test_blockDownloader_downloadBlock_deduplication(t *testing.T) { t.Cleanup(downloader.stop) require.NoError(t, err) - blocksDownloadedCount := atomic.Uint32{} + var blocksDownloadedCount atomic.Uint32 mutex := sync.Mutex{} multiError := util.MultiError{} waitGroup := sync.WaitGroup{} @@ -225,7 +225,7 @@ func Test_blockDownloader_downloadBlock_deduplication(t *testing.T) { require.NoError(t, multiError.Err()) require.Equal(t, uint32(10), blocksDownloadedCount.Load()) - require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls) + require.Equal(t, testData.expectedTotalGetBlocksCalls, blockClient.getBlockCalls.Load()) }) } } @@ -340,11 +340,11 @@ type blockSupplier func() LazyBlock type mockBlockClient struct { responseDelay time.Duration mockData map[string]blockSupplier - getBlockCalls int + getBlockCalls atomic.Int32 } func (m *mockBlockClient) GetBlock(_ context.Context, reference BlockRef) (LazyBlock, error) { - m.getBlockCalls++ + m.getBlockCalls.Inc() time.Sleep(m.responseDelay) supplier, exists := m.mockData[reference.BlockPath] if exists { diff --git a/pkg/storage/stores/shipper/bloomshipper/shipper.go b/pkg/storage/stores/shipper/bloomshipper/shipper.go index d9d96fcc7783..36bfba913c98 100644 --- a/pkg/storage/stores/shipper/bloomshipper/shipper.go +++ b/pkg/storage/stores/shipper/bloomshipper/shipper.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/common/model" "golang.org/x/exp/slices" + v1 "github.com/grafana/loki/pkg/storage/bloom/v1" "github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper/config" ) @@ -24,6 +25,19 @@ func (r fpRange) maxFp() uint64 { return r[1] } +type BlockQuerierWithFingerprintRange struct { + *v1.BlockQuerier + MinFp, MaxFp model.Fingerprint +} + +type ForEachBlockCallback func(bq *v1.BlockQuerier, minFp, maxFp uint64) error + +type Interface interface { + GetBlockRefs(ctx context.Context, tenant string, from, through model.Time) ([]BlockRef, error) + Fetch(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error + Stop() +} + type Shipper struct { client Client config config.Config diff --git a/pkg/storage/stores/shipper/bloomshipper/store.go b/pkg/storage/stores/shipper/bloomshipper/store.go deleted file mode 100644 index 40c23658e9a1..000000000000 --- a/pkg/storage/stores/shipper/bloomshipper/store.go +++ /dev/null @@ -1,61 +0,0 @@ -package bloomshipper - -import ( - "context" - "time" - - "github.com/prometheus/common/model" - - v1 "github.com/grafana/loki/pkg/storage/bloom/v1" -) - -type ForEachBlockCallback func(bq *v1.BlockQuerier, minFp, maxFp uint64) error - -type ReadShipper interface { - GetBlockRefs(ctx context.Context, tenant string, from, through model.Time) ([]BlockRef, error) - Fetch(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error -} - -type Interface interface { - ReadShipper - Stop() -} - -type BlockQuerierWithFingerprintRange struct { - *v1.BlockQuerier - MinFp, MaxFp model.Fingerprint -} - -type Store interface { - GetBlockRefs(ctx context.Context, tenant string, from, through time.Time) ([]BlockRef, error) - ForEach(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error - Stop() -} - -type BloomStore struct { - shipper Interface -} - -func NewBloomStore(shipper Interface) (*BloomStore, error) { - return &BloomStore{ - shipper: shipper, - }, nil -} - -func (bs *BloomStore) Stop() { - bs.shipper.Stop() -} - -// GetBlockRefs implements Store -func (bs *BloomStore) GetBlockRefs(ctx context.Context, tenant string, from, through time.Time) ([]BlockRef, error) { - return bs.shipper.GetBlockRefs(ctx, tenant, toModelTime(from), toModelTime(through)) -} - -// ForEach implements Store -func (bs *BloomStore) ForEach(ctx context.Context, tenant string, blocks []BlockRef, callback ForEachBlockCallback) error { - return bs.shipper.Fetch(ctx, tenant, blocks, callback) -} - -func toModelTime(t time.Time) model.Time { - return model.TimeFromUnixNano(t.UnixNano()) -} diff --git a/pkg/storage/stores/shipper/bloomshipper/store_test.go b/pkg/storage/stores/shipper/bloomshipper/store_test.go deleted file mode 100644 index ec48f7caa040..000000000000 --- a/pkg/storage/stores/shipper/bloomshipper/store_test.go +++ /dev/null @@ -1,11 +0,0 @@ -package bloomshipper - -import ( - "testing" -) - -func TestBloomShipper(_ *testing.T) { -} - -func TestBloomStore(_ *testing.T) { -} diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go index d9f6382c2d2b..bae41255554d 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager.go @@ -144,66 +144,75 @@ func NewHeadManager(name string, logger log.Logger, dir string, metrics *Metrics return m } -func (m *HeadManager) loop() { - defer m.wg.Done() +func (m *HeadManager) buildPrev() error { + if m.prev == nil { + return nil + } - buildPrev := func() error { - if m.prev == nil { - return nil - } + if err := m.buildTSDBFromHead(m.prevHeads); err != nil { + return err + } - if err := m.buildTSDBFromHead(m.prevHeads); err != nil { - return err - } + // Now that the tsdbManager has the updated TSDBs, we can remove our references + m.mtx.Lock() + defer m.mtx.Unlock() + // We nil-out the previous wal to signal that we've built the TSDBs from it successfully. + // We don't nil-out the heads because we need to keep the them around + // in order to serve queries for the recently rotated out period until + // the index-gws|queriers have time to download the new TSDBs + m.prev = nil - // Now that the tsdbManager has the updated TSDBs, we can remove our references - m.mtx.Lock() - defer m.mtx.Unlock() - m.prevHeads = nil - m.prev = nil + return nil +} - return nil +// tick handles one iteration for `loop()`. It builds new heads, +// cleans up previous heads, and performs rotations. +func (m *HeadManager) tick(now time.Time) { + // retry tsdb build failures from previous run + if err := m.buildPrev(); err != nil { + level.Error(m.log).Log( + "msg", "failed building tsdb head", + "period", m.period.PeriodFor(m.prev.initialized), + "err", err, + ) + // rotating head without building prev would result in loss of index for that period (until restart) + return + } + + if activePeriod := m.period.PeriodFor(m.activeHeads.start); m.period.PeriodFor(now) > activePeriod { + if err := m.Rotate(now); err != nil { + m.metrics.headRotations.WithLabelValues(statusFailure).Inc() + level.Error(m.log).Log( + "msg", "failed rotating tsdb head", + "period", activePeriod, + "err", err, + ) + return + } + m.metrics.headRotations.WithLabelValues(statusSuccess).Inc() } + // build tsdb from rotated-out period + if err := m.buildPrev(); err != nil { + level.Error(m.log).Log( + "msg", "failed building tsdb head", + "period", m.period.PeriodFor(m.prev.initialized), + "err", err, + ) + } +} + +func (m *HeadManager) loop() { + defer m.wg.Done() + ticker := time.NewTicker(defaultRotationCheckPeriod) defer ticker.Stop() for { select { case <-ticker.C: - // retry tsdb build failures from previous run - if err := buildPrev(); err != nil { - level.Error(m.log).Log( - "msg", "failed building tsdb head", - "period", m.period.PeriodFor(m.prev.initialized), - "err", err, - ) - // rotating head without building prev would result in loss of index for that period (until restart) - continue - } - now := time.Now() - if activePeriod := m.period.PeriodFor(m.activeHeads.start); m.period.PeriodFor(now) > activePeriod { - if err := m.Rotate(now); err != nil { - m.metrics.headRotations.WithLabelValues(statusFailure).Inc() - level.Error(m.log).Log( - "msg", "failed rotating tsdb head", - "period", activePeriod, - "err", err, - ) - continue - } - m.metrics.headRotations.WithLabelValues(statusSuccess).Inc() - } - - // build tsdb from rotated-out period - if err := buildPrev(); err != nil { - level.Error(m.log).Log( - "msg", "failed building tsdb head", - "period", m.period.PeriodFor(m.prev.initialized), - "err", err, - ) - } + m.tick(now) case <-m.cancel: return } diff --git a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager_test.go b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager_test.go index f8a1b38a1fd7..c58e55645717 100644 --- a/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager_test.go +++ b/pkg/storage/stores/shipper/indexshipper/tsdb/head_manager_test.go @@ -43,7 +43,7 @@ func newNoopTSDBManager(name, dir string) noopTSDBManager { } func (m noopTSDBManager) BuildFromHead(_ *tenantHeads) error { - panic("BuildFromHead not implemented") + return nil } func (m noopTSDBManager) BuildFromWALs(_ time.Time, wals []WALIdentifier, _ bool) error { @@ -266,6 +266,61 @@ func Test_HeadManager_RecoverHead(t *testing.T) { } +// test head still serves data for the most recently rotated period. +func Test_HeadManager_QueryAfterRotate(t *testing.T) { + now := time.Now() + dir := t.TempDir() + cases := []struct { + Labels labels.Labels + Fingerprint uint64 + Chunks []index.ChunkMeta + User string + }{ + { + User: "tenant1", + Labels: mustParseLabels(`{foo="bar", bazz="buzz"}`), + Fingerprint: mustParseLabels(`{foo="bar", bazz="buzz"}`).Hash(), + Chunks: []index.ChunkMeta{ + { + MinTime: 1, + MaxTime: 10, + Checksum: 3, + }, + }, + }, + } + + storeName := "store_2010-10-10" + mgr := NewHeadManager(storeName, log.NewNopLogger(), dir, NewMetrics(nil), newNoopTSDBManager(storeName, dir)) + // This bit is normally handled by the Start() fn, but we're testing a smaller surface area + // so ensure our dirs exist + for _, d := range managerRequiredDirs(storeName, dir) { + require.Nil(t, util.EnsureDirectory(d)) + } + require.Nil(t, mgr.Rotate(now)) // initialize head (usually done by Start()) + + // add data for both tenants + for _, tc := range cases { + require.Nil(t, mgr.Append(tc.User, tc.Labels, tc.Labels.Hash(), tc.Chunks)) + } + + nextPeriod := time.Now().Add(time.Duration(mgr.period)) + mgr.tick(nextPeriod) // synthetic tick to rotate head + + for _, c := range cases { + refs, err := mgr.GetChunkRefs( + context.Background(), + c.User, + 0, math.MaxInt64, + nil, nil, + labels.MustNewMatcher(labels.MatchRegexp, "foo", ".+"), + ) + require.Nil(t, err) + require.Equal(t, chunkMetasToChunkRefs(c.User, c.Fingerprint, c.Chunks), refs) + } + +} + // test mgr recover from multiple wals across multiple periods func Test_HeadManager_Lifecycle(t *testing.T) { dir := t.TempDir() diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 45dd34f201e8..ac25798c33e3 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -97,6 +97,7 @@ type Limits struct { MaxConcurrentTailRequests int `yaml:"max_concurrent_tail_requests" json:"max_concurrent_tail_requests"` MaxEntriesLimitPerQuery int `yaml:"max_entries_limit_per_query" json:"max_entries_limit_per_query"` MaxCacheFreshness model.Duration `yaml:"max_cache_freshness_per_query" json:"max_cache_freshness_per_query"` + MaxMetadataCacheFreshness model.Duration `yaml:"max_metadata_cache_freshness" json:"max_metadata_cache_freshness"` MaxStatsCacheFreshness model.Duration `yaml:"max_stats_cache_freshness" json:"max_stats_cache_freshness"` MaxQueriersPerTenant uint `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"` MaxQueryCapacity float64 `yaml:"max_query_capacity" json:"max_query_capacity"` @@ -277,6 +278,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.MaxCacheFreshness.Set("10m") f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.") + _ = l.MaxMetadataCacheFreshness.Set("24h") + f.Var(&l.MaxMetadataCacheFreshness, "frontend.max-metadata-cache-freshness", "Do not cache metadata request if the end time is within the frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such limits. Defaults to 24h.") + _ = l.MaxStatsCacheFreshness.Set("10m") f.Var(&l.MaxStatsCacheFreshness, "frontend.max-stats-cache-freshness", "Do not cache requests with an end time that falls within Now minus this duration. 0 disables this feature (default).") @@ -298,6 +302,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { _ = l.QuerySplitDuration.Set("1h") f.Var(&l.QuerySplitDuration, "querier.split-queries-by-interval", "Split queries by a time interval and execute in parallel. The value 0 disables splitting by time. This also determines how cache keys are chosen when result caching is enabled.") + // with metadata caching, it is not possible to extract a subset of labels/series from a cached extent because unlike samples they are not associated with a timestamp. + // as a result, we could return inaccurate results. example: returning results from an entire 1h extent for a 5m query + // Setting max_metadata_cache_freshness to 24h should help us avoid caching recent data and preseve the correctness. + // For the portion of the request beyond the freshness window, granularity of the cached metadata results is determined by split_metadata_queries_by_interval. _ = l.MetadataQuerySplitDuration.Set("24h") f.Var(&l.MetadataQuerySplitDuration, "querier.split-metadata-queries-by-interval", "Split metadata queries by a time interval and execute in parallel. The value 0 disables splitting metadata queries by time. This also determines how cache keys are chosen when label/series result caching is enabled.") @@ -624,6 +632,10 @@ func (o *Overrides) MaxCacheFreshness(_ context.Context, userID string) time.Dur return time.Duration(o.getOverridesForUser(userID).MaxCacheFreshness) } +func (o *Overrides) MaxMetadataCacheFreshness(_ context.Context, userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).MaxMetadataCacheFreshness) +} + func (o *Overrides) MaxStatsCacheFreshness(_ context.Context, userID string) time.Duration { return time.Duration(o.getOverridesForUser(userID).MaxStatsCacheFreshness) } diff --git a/production/helm/loki/CHANGELOG.md b/production/helm/loki/CHANGELOG.md index 272aa6942885..ce6852d3f3ea 100644 --- a/production/helm/loki/CHANGELOG.md +++ b/production/helm/loki/CHANGELOG.md @@ -13,6 +13,11 @@ Entries should include a reference to the pull request that introduced the chang [//]: # ( : do not remove this line. This locator is used by the CI pipeline to automatically create a changelog entry for each new Loki release. Add other chart versions and respective changelog entries bellow this line.) + +## 5.41.7 + +- [FEATURE] Add support to disable specific alert rules + ## 5.41.6 - [BUGFIX] Added missing namespace to query-scheduler-discovery service when deploying loki in a specific namespace. diff --git a/production/helm/loki/Chart.yaml b/production/helm/loki/Chart.yaml index cb43a70c965b..d8f4486b7de6 100644 --- a/production/helm/loki/Chart.yaml +++ b/production/helm/loki/Chart.yaml @@ -3,7 +3,7 @@ name: loki description: Helm chart for Grafana Loki in simple, scalable mode type: application appVersion: 2.9.3 -version: 5.41.6 +version: 5.41.7 home: https://grafana.github.io/helm-charts sources: - https://github.com/grafana/loki diff --git a/production/helm/loki/README.md b/production/helm/loki/README.md index 6b4ec081e9bb..dc016ef13c25 100644 --- a/production/helm/loki/README.md +++ b/production/helm/loki/README.md @@ -1,6 +1,6 @@ # loki -![Version: 5.41.6](https://img.shields.io/badge/Version-5.41.6-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.3](https://img.shields.io/badge/AppVersion-2.9.3-informational?style=flat-square) +![Version: 5.41.7](https://img.shields.io/badge/Version-5.41.7-informational?style=flat-square) ![Type: application](https://img.shields.io/badge/Type-application-informational?style=flat-square) ![AppVersion: 2.9.3](https://img.shields.io/badge/AppVersion-2.9.3-informational?style=flat-square) Helm chart for Grafana Loki in simple, scalable mode diff --git a/production/helm/loki/src/alerts.yaml.tpl b/production/helm/loki/src/alerts.yaml.tpl index 2171c94848e5..144e263f7061 100644 --- a/production/helm/loki/src/alerts.yaml.tpl +++ b/production/helm/loki/src/alerts.yaml.tpl @@ -2,6 +2,7 @@ groups: - name: "loki_alerts" rules: +{{- if not (.Values.monitoring.rules.disabled.LokiRequestErrors | default false) }} - alert: "LokiRequestErrors" annotations: message: | @@ -17,6 +18,8 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} +{{- if not (.Values.monitoring.rules.disabled.LokiRequestPanics | default false) }} - alert: "LokiRequestPanics" annotations: message: | @@ -28,6 +31,8 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} +{{- if not (.Values.monitoring.rules.disabled.LokiRequestLatency | default false) }} - alert: "LokiRequestLatency" annotations: message: | @@ -40,6 +45,8 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} +{{- if not (.Values.monitoring.rules.disabled.LokiTooManyCompactorsRunning | default false) }} - alert: "LokiTooManyCompactorsRunning" annotations: message: | @@ -52,6 +59,8 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} +{{- if not (.Values.monitoring.rules.disabled.LokiCanaryLatency | default false) }} - name: "loki_canaries_alerts" rules: - alert: "LokiCanaryLatency" @@ -66,3 +75,4 @@ groups: {{- if .Values.monitoring.rules.additionalRuleLabels }} {{ toYaml .Values.monitoring.rules.additionalRuleLabels | indent 10 }} {{- end }} +{{- end }} diff --git a/production/helm/loki/values.yaml b/production/helm/loki/values.yaml index b8c09ee76465..a7f4ea8f464d 100644 --- a/production/helm/loki/values.yaml +++ b/production/helm/loki/values.yaml @@ -583,6 +583,12 @@ monitoring: enabled: true # -- Include alerting rules alerting: true + # -- Specify which individual alerts should be disabled + # -- Instead of turning off each alert one by one, set the .monitoring.rules.alerting value to false instead. + # -- If you disable all the alerts and keep .monitoring.rules.alerting set to true, the chart will fail to render. + disabled: {} + # LokiRequestErrors: true + # LokiRequestPanics: true # -- Alternative namespace to create PrometheusRule resources in namespace: null # -- Additional annotations for the rules PrometheusRule resource diff --git a/production/ksonnet/loki/gateway.libsonnet b/production/ksonnet/loki/gateway.libsonnet index b5b1eb2401ca..d8c923a3bd03 100644 --- a/production/ksonnet/loki/gateway.libsonnet +++ b/production/ksonnet/loki/gateway.libsonnet @@ -48,7 +48,7 @@ local k = import 'ksonnet-util/kausal.libsonnet'; server { listen 80; - auth_basic “Prometheus”; + auth_basic "Prometheus"; auth_basic_user_file /etc/nginx/secrets/.htpasswd; proxy_set_header X-Scope-OrgID %(gateway_tenant_id)s;