Skip to content

Commit

Permalink
Merge branch 'main' into jm-chore-fronted-log-arbitrary-headers
Browse files Browse the repository at this point in the history
  • Loading branch information
Danny Kopping authored Jan 22, 2024
2 parents 19039b8 + b581db0 commit 8161ef0
Show file tree
Hide file tree
Showing 61 changed files with 1,469 additions and 538 deletions.
2 changes: 1 addition & 1 deletion .drone/drone.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ local build_image_tag = '0.33.0';
'GIT_TARGET_BRANCH="$DRONE_TARGET_BRANCH"',
]) { depends_on: ['loki'], when: onPRs },
make('validate-example-configs', container=false) { depends_on: ['loki'] },
make('validate-dev-cluster-config', container=false) { depends_on: ['loki'] },
make('validate-dev-cluster-config', container=false) { depends_on: ['validate-example-configs'] },
make('check-example-config-doc', container=false) { depends_on: ['clone'] },
{
name: 'build-docs-website',
Expand Down
4 changes: 2 additions & 2 deletions .drone/drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ steps:
- commands:
- make BUILD_IN_CONTAINER=false validate-dev-cluster-config
depends_on:
- loki
- validate-example-configs
environment: {}
image: grafana/loki-build-image:0.33.0
name: validate-dev-cluster-config
Expand Down Expand Up @@ -2113,6 +2113,6 @@ kind: secret
name: gpg_private_key
---
kind: signature
hmac: fe7669a21410ae5f2d1ad6b6205fdc582af874f65f7bd6a679731a88174e3a1c
hmac: 457592d17208477ceb480f81dbdb88f7b95a5ad015c88d9d6fed06c2422a52f9

...
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@
* [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks.
* [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache.
* [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters.
* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly.
* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly.
* [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results.
* [11679](https://github.com/grafana/loki/pull/11679) **dannykopping** Cache: extending #11535 to align custom ingester query split with cache keys for correct caching of results.
* [11499](https://github.com/grafana/loki/pull/11284) **jmichalek132** Config: Adds `frontend.log-query-request-headers` to enable logging of request headers in query logs.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
* [11195](https://github.com/grafana/loki/pull/11195) **canuteson** Generate tsdb_shipper storage_config even if using_boltdb_shipper is false
* [9831](https://github.com/grafana/loki/pull/9831) **sijmenhuizenga**: Fix Promtail excludepath not evaluated on newly added files.
* [11551](https://github.com/grafana/loki/pull/11551) **dannykopping** Do not reflect label names in request metrics' "route" label.
* [11563](https://github.com/grafana/loki/pull/11563) **ptqa** Fix duplicate logs from docker containers.
* [11601](https://github.com/grafana/loki/pull/11601) **dannykopping** Ruler: Fixed a panic that can be caused by concurrent read-write access of tenant configs when there are a large amount of rules.
* [11606](https://github.com/grafana/loki/pull/11606) **dannykopping** Fixed regression adding newlines to HTTP error response bodies which may break client integrations.
* [11657](https://github.com/grafana/loki/pull/11657) **ashwanthgoli** Log results cache: compose empty response based on the request being served to avoid returning incorrect limit or direction.
Expand Down
2 changes: 1 addition & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@slim-bean is the main/default maintainer.

Some parts of the codebase have other maintainers:
- `@grafana/docs-logs`, which includes [@osg-grafana](https://github.com/osg-grafana) ([Grafana Labs](https://grafana.com/)) and [@knylander-grafana](https://github.com/knylander-grafana) ([Grafana Labs](https://grafana.com/))
- `@grafana/docs-logs`, which includes [@knylander-grafana](https://github.com/knylander-grafana) ([Grafana Labs](https://grafana.com/))
19 changes: 9 additions & 10 deletions clients/cmd/fluent-bit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
Expand Down Expand Up @@ -67,7 +66,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
}
err := clientURL.Set(url)
if err != nil {
return nil, errors.New("failed to parse client URL")
return nil, fmt.Errorf("failed to parse client URL: %w", err)
}
res.clientConfig.URL = clientURL

Expand All @@ -83,7 +82,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
} else {
batchWaitValue, err := time.ParseDuration(batchWait)
if err != nil {
return nil, fmt.Errorf("failed to parse BatchWait: %s", batchWait)
return nil, fmt.Errorf("failed to parse BatchWait %s: %w", batchWait, err)
}
res.clientConfig.BatchWait = batchWaitValue
}
Expand All @@ -93,7 +92,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
if batchSize != "" {
batchSizeValue, err := strconv.Atoi(batchSize)
if err != nil {
return nil, fmt.Errorf("failed to parse BatchSize: %s", batchSize)
return nil, fmt.Errorf("failed to parse BatchSize %s: %w", batchSize, err)
}
res.clientConfig.BatchSize = batchSizeValue
}
Expand All @@ -102,7 +101,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
if timeout != "" {
timeoutValue, err := time.ParseDuration(timeout)
if err != nil {
return nil, fmt.Errorf("failed to parse Timeout: %s", timeout)
return nil, fmt.Errorf("failed to parse Timeout %s: %w", timeout, err)
}
res.clientConfig.Timeout = timeoutValue
}
Expand All @@ -111,7 +110,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
if minBackoff != "" {
minBackoffValue, err := time.ParseDuration(minBackoff)
if err != nil {
return nil, fmt.Errorf("failed to parse MinBackoff: %s", minBackoff)
return nil, fmt.Errorf("failed to parse MinBackoff %s: %w", minBackoff, err)
}
res.clientConfig.BackoffConfig.MinBackoff = minBackoffValue
}
Expand All @@ -120,7 +119,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
if maxBackoff != "" {
maxBackoffValue, err := time.ParseDuration(maxBackoff)
if err != nil {
return nil, fmt.Errorf("failed to parse MaxBackoff: %s", maxBackoff)
return nil, fmt.Errorf("failed to parse MaxBackoff %s: %w", maxBackoff, err)
}
res.clientConfig.BackoffConfig.MaxBackoff = maxBackoffValue
}
Expand All @@ -129,7 +128,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
if maxRetries != "" {
maxRetriesValue, err := strconv.Atoi(maxRetries)
if err != nil {
return nil, fmt.Errorf("failed to parse MaxRetries: %s", maxRetries)
return nil, fmt.Errorf("failed to parse MaxRetries %s: %w", maxRetries, err)
}
res.clientConfig.BackoffConfig.MaxRetries = maxRetriesValue
}
Expand All @@ -154,7 +153,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
}
var level log.Level
if err := level.Set(logLevel); err != nil {
return nil, fmt.Errorf("invalid log level: %v", logLevel)
return nil, fmt.Errorf("invalid log level %v: %w", logLevel, err)
}
res.logLevel = level

Expand Down Expand Up @@ -238,7 +237,7 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
if queueSegmentSize != "" {
res.bufferConfig.dqueConfig.queueSegmentSize, err = strconv.Atoi(queueSegmentSize)
if err != nil {
return nil, fmt.Errorf("impossible to convert string to integer DqueSegmentSize: %v", queueSegmentSize)
return nil, fmt.Errorf("impossible to convert string to integer DqueSegmentSize %v: %w", queueSegmentSize, err)
}
}

Expand Down
1 change: 1 addition & 0 deletions clients/pkg/promtail/targets/docker/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ func (t *Target) process(r io.Reader, logStream string) {
}
t.metrics.dockerEntries.Inc()
t.positions.Put(positions.CursorKey(t.containerName), ts.Unix())
t.since = ts.Unix()
}
}

Expand Down
34 changes: 32 additions & 2 deletions clients/pkg/promtail/targets/docker/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@ func Test_DockerTarget(t *testing.T) {
h := func(w http.ResponseWriter, r *http.Request) {
switch path := r.URL.Path; {
case strings.HasSuffix(path, "/logs"):
dat, err := os.ReadFile("testdata/flog.log")
var filePath string
if strings.Contains(r.URL.RawQuery, "since=0") {
filePath = "testdata/flog.log"
} else {
filePath = "testdata/flog_after_restart.log"
}
dat, err := os.ReadFile(filePath)
require.NoError(t, err)
_, err = w.Write(dat)
require.NoError(t, err)
Expand Down Expand Up @@ -59,7 +65,7 @@ func Test_DockerTarget(t *testing.T) {
})
require.NoError(t, err)

_, err = NewTarget(
target, err := NewTarget(
NewMetrics(prometheus.NewRegistry()),
logger,
entryHandler,
Expand Down Expand Up @@ -92,4 +98,28 @@ func Test_DockerTarget(t *testing.T) {
actualLines = append(actualLines, entry.Line)
}
require.ElementsMatch(t, actualLines, expectedLines)

// restart target to simulate container restart
target.startIfNotRunning()
entryHandler.Clear()
require.Eventually(t, func() bool {
return len(entryHandler.Received()) >= 5
}, 5*time.Second, 100*time.Millisecond)

receivedAfterRestart := entryHandler.Received()
sort.Slice(receivedAfterRestart, func(i, j int) bool {
return receivedAfterRestart[i].Timestamp.Before(receivedAfterRestart[j].Timestamp)
})
actualLinesAfterRestart := make([]string, 0, 5)
for _, entry := range receivedAfterRestart[:5] {
actualLinesAfterRestart = append(actualLinesAfterRestart, entry.Line)
}
expectedLinesAfterRestart := []string{
"243.115.12.215 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /morph/exploit/granular HTTP/1.0\" 500 26468",
"221.41.123.237 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /user-centric/whiteboard HTTP/2.0\" 205 22487",
"89.111.144.144 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /open-source/e-commerce HTTP/1.0\" 401 11092",
"62.180.191.187 - - [09/Dec/2023:09:16:57 +0000] \"DELETE /cultivate/integrate/technologies HTTP/2.0\" 302 12979",
"156.249.2.192 - - [09/Dec/2023:09:16:57 +0000] \"POST /revolutionize/mesh/metrics HTTP/2.0\" 401 5297",
}
require.ElementsMatch(t, actualLinesAfterRestart, expectedLinesAfterRestart)
}
Binary file not shown.
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2835,6 +2835,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness_per_query: <duration> | default = 10m]

# Do not cache metadata request if the end time is within the
# frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such
# limits. Defaults to 24h.
# CLI flag: -frontend.max-metadata-cache-freshness
[max_metadata_cache_freshness: <duration> | default = 1d]

# Do not cache requests with an end time that falls within Now minus this
# duration. 0 disables this feature (default).
# CLI flag: -frontend.max-stats-cache-freshness
Expand Down
9 changes: 9 additions & 0 deletions docs/sources/setup/install/helm/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2707,6 +2707,15 @@ true
<td><pre lang="json">
{}
</pre>
</td>
</tr>
<tr>
<td>monitoring.rules.disabled</td>
<td>object</td>
<td>If you disable all the alerts and keep .monitoring.rules.alerting set to true, the chart will fail to render.</td>
<td><pre lang="json">
{}
</pre>
</td>
</tr>
<tr>
Expand Down
62 changes: 49 additions & 13 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"

"github.com/grafana/loki/integration/client"
Expand Down Expand Up @@ -1061,7 +1062,7 @@ func TestCategorizedLabels(t *testing.T) {

func TestBloomFiltersEndToEnd(t *testing.T) {
commonFlags := []string{
"-bloom-compactor.compaction-interval=2s",
"-bloom-compactor.compaction-interval=10s",
"-bloom-compactor.enable-compaction=true",
"-bloom-compactor.enabled=true",
"-bloom-gateway.enable-filtering=true",
Expand Down Expand Up @@ -1101,7 +1102,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
"-target=index-gateway",
)...,
)
_ = clu.AddComponent(
tBloomGateway = clu.AddComponent(
"bloom-gateway",
append(
commonFlags,
Expand Down Expand Up @@ -1136,7 +1137,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
_ = clu.AddComponent(
tBloomCompactor = clu.AddComponent(
"bloom-compactor",
append(
commonFlags,
Expand Down Expand Up @@ -1186,6 +1187,12 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL())
cliIndexGateway.Now = now

cliBloomGateway := client.New(tenantID, "", tBloomGateway.HTTPURL())
cliBloomGateway.Now = now

cliBloomCompactor := client.New(tenantID, "", tBloomCompactor.HTTPURL())
cliBloomCompactor.Now = now

lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"`
// ingest logs from 10 different pods
// each line contains a random, unique string
Expand All @@ -1206,7 +1213,14 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
require.NoError(t, tIngester.Restart())

// wait for compactor to compact index and for bloom compactor to build bloom filters
time.Sleep(10 * time.Second)
require.Eventually(t, func() bool {
// verify metrics that observe usage of block for filtering
metrics, err := cliBloomCompactor.Metrics()
require.NoError(t, err)
successfulRunCount := getMetricValue(t, "loki_bloomcompactor_runs_completed_total", metrics)
t.Log("successful bloom compactor runs", successfulRunCount)
return successfulRunCount == 1
}, 30*time.Second, time.Second)

// use bloom gateway to perform needle in the haystack queries
randIdx := rand.Intn(len(uniqueStrings))
Expand All @@ -1221,22 +1235,44 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx])
require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1])

// TODO(chaudum):
// verify that bloom blocks have actually been used for querying
// atm, we can only verify by logs, so we should add appropriate metrics for
// uploaded/downloaded blocks and metas
// verify metrics that observe usage of block for filtering
bloomGwMetrics, err := cliBloomGateway.Metrics()
require.NoError(t, err)

unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics)
require.Equal(t, float64(10), unfilteredCount)

filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics)
require.Equal(t, float64(1), filteredCount)

mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics)
require.NoError(t, err)

count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{
Name: proto.String("status"),
Value: proto.String("success"),
}, func(m *dto.Metric) uint64 {
return m.Histogram.GetSampleCount()
})
require.Equal(t, uint64(1), count)
}

func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {
return getValueFromMetricFamilyWithFunc(mf, lbs[0], func(m *dto.Metric) float64 { return m.Counter.GetValue() })
}

func getValueFromMetricFamilyWithFunc[R any](mf *dto.MetricFamily, lbs *dto.LabelPair, f func(*dto.Metric) R) R {
eq := func(e *dto.LabelPair) bool {
return e.GetName() == lbs.GetName() && e.GetValue() == lbs.GetValue()
}
var zero R
for _, m := range mf.Metric {
if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) {
if !slices.ContainsFunc(m.GetLabel(), eq) {
continue
}

return m.Counter.GetValue()
return f(m)
}

return 0
return zero
}

func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) {
Expand Down
Loading

0 comments on commit 8161ef0

Please sign in to comment.