Skip to content

Commit

Permalink
Merge branch 'main' into op-swift-tls
Browse files Browse the repository at this point in the history
  • Loading branch information
btaani authored Jan 18, 2024
2 parents aaaabd6 + 423f3e9 commit cf851de
Show file tree
Hide file tree
Showing 37 changed files with 902 additions and 164 deletions.
2 changes: 1 addition & 1 deletion .drone/drone.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ local build_image_tag = '0.33.0';
'GIT_TARGET_BRANCH="$DRONE_TARGET_BRANCH"',
]) { depends_on: ['loki'], when: onPRs },
make('validate-example-configs', container=false) { depends_on: ['loki'] },
make('validate-dev-cluster-config', container=false) { depends_on: ['loki'] },
make('validate-dev-cluster-config', container=false) { depends_on: ['validate-example-configs'] },
make('check-example-config-doc', container=false) { depends_on: ['clone'] },
{
name: 'build-docs-website',
Expand Down
4 changes: 2 additions & 2 deletions .drone/drone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ steps:
- commands:
- make BUILD_IN_CONTAINER=false validate-dev-cluster-config
depends_on:
- loki
- validate-example-configs
environment: {}
image: grafana/loki-build-image:0.33.0
name: validate-dev-cluster-config
Expand Down Expand Up @@ -2113,6 +2113,6 @@ kind: secret
name: gpg_private_key
---
kind: signature
hmac: fe7669a21410ae5f2d1ad6b6205fdc582af874f65f7bd6a679731a88174e3a1c
hmac: 457592d17208477ceb480f81dbdb88f7b95a5ad015c88d9d6fed06c2422a52f9

...
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
* [11545](https://github.com/grafana/loki/pull/11545) **dannykopping** Force correct memcached timeout when fetching chunks.
* [11589](https://github.com/grafana/loki/pull/11589) **ashwanthgoli** Results Cache: Adds `query_length_served` cache stat to measure the length of the query served from cache.
* [11535](https://github.com/grafana/loki/pull/11535) **dannykopping** Query Frontend: Allow customisable splitting of queries which overlap the `query_ingester_within` window to reduce query pressure on ingesters.
* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly.
* [11654](https://github.com/grafana/loki/pull/11654) **dannykopping** Cache: atomically check background cache size limit correctly.
* [11682](https://github.com/grafana/loki/pull/11682) **ashwanthgoli** Metadata cache: Adds `frontend.max-metadata-cache-freshness` to configure the time window for which metadata results are not cached. This helps avoid returning inaccurate results by not caching recent results.
* [11679](https://github.com/grafana/loki/pull/11679) **dannykopping** Cache: extending #11535 to align custom ingester query split with cache keys for correct caching of results.

##### Fixes
* [11074](https://github.com/grafana/loki/pull/11074) **hainenber** Fix panic in lambda-promtail due to mishandling of empty DROP_LABELS env var.
Expand Down
2 changes: 1 addition & 1 deletion MAINTAINERS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
@slim-bean is the main/default maintainer.

Some parts of the codebase have other maintainers:
- `@grafana/docs-logs`, which includes [@osg-grafana](https://github.com/osg-grafana) ([Grafana Labs](https://grafana.com/)) and [@knylander-grafana](https://github.com/knylander-grafana) ([Grafana Labs](https://grafana.com/))
- `@grafana/docs-logs`, which includes [@knylander-grafana](https://github.com/knylander-grafana) ([Grafana Labs](https://grafana.com/))
6 changes: 6 additions & 0 deletions docs/sources/configure/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,12 @@ The `limits_config` block configures global and per-tenant limits in Loki.
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness_per_query: <duration> | default = 10m]

# Do not cache metadata request if the end time is within the
# frontend.max-metadata-cache-freshness window. Set this to 0 to apply no such
# limits. Defaults to 24h.
# CLI flag: -frontend.max-metadata-cache-freshness
[max_metadata_cache_freshness: <duration> | default = 1d]

# Do not cache requests with an end time that falls within Now minus this
# duration. 0 disables this feature (default).
# CLI flag: -frontend.max-stats-cache-freshness
Expand Down
9 changes: 9 additions & 0 deletions docs/sources/setup/install/helm/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2707,6 +2707,15 @@ true
<td><pre lang="json">
{}
</pre>
</td>
</tr>
<tr>
<td>monitoring.rules.disabled</td>
<td>object</td>
<td>If you disable all the alerts and keep .monitoring.rules.alerting set to true, the chart will fail to render.</td>
<td><pre lang="json">
{}
</pre>
</td>
</tr>
<tr>
Expand Down
62 changes: 49 additions & 13 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/proto"

"github.com/grafana/loki/integration/client"
Expand Down Expand Up @@ -1061,7 +1062,7 @@ func TestCategorizedLabels(t *testing.T) {

func TestBloomFiltersEndToEnd(t *testing.T) {
commonFlags := []string{
"-bloom-compactor.compaction-interval=2s",
"-bloom-compactor.compaction-interval=10s",
"-bloom-compactor.enable-compaction=true",
"-bloom-compactor.enabled=true",
"-bloom-gateway.enable-filtering=true",
Expand Down Expand Up @@ -1101,7 +1102,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
"-target=index-gateway",
)...,
)
_ = clu.AddComponent(
tBloomGateway = clu.AddComponent(
"bloom-gateway",
append(
commonFlags,
Expand Down Expand Up @@ -1136,7 +1137,7 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
"-tsdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)...,
)
_ = clu.AddComponent(
tBloomCompactor = clu.AddComponent(
"bloom-compactor",
append(
commonFlags,
Expand Down Expand Up @@ -1186,6 +1187,12 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
cliIndexGateway := client.New(tenantID, "", tIndexGateway.HTTPURL())
cliIndexGateway.Now = now

cliBloomGateway := client.New(tenantID, "", tBloomGateway.HTTPURL())
cliBloomGateway.Now = now

cliBloomCompactor := client.New(tenantID, "", tBloomCompactor.HTTPURL())
cliBloomCompactor.Now = now

lineTpl := `caller=loki_micro_services_test.go msg="push log line" id="%s"`
// ingest logs from 10 different pods
// each line contains a random, unique string
Expand All @@ -1206,7 +1213,14 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
require.NoError(t, tIngester.Restart())

// wait for compactor to compact index and for bloom compactor to build bloom filters
time.Sleep(10 * time.Second)
require.Eventually(t, func() bool {
// verify metrics that observe usage of block for filtering
metrics, err := cliBloomCompactor.Metrics()
require.NoError(t, err)
successfulRunCount := getMetricValue(t, "loki_bloomcompactor_runs_completed_total", metrics)
t.Log("successful bloom compactor runs", successfulRunCount)
return successfulRunCount == 1
}, 30*time.Second, time.Second)

// use bloom gateway to perform needle in the haystack queries
randIdx := rand.Intn(len(uniqueStrings))
Expand All @@ -1221,22 +1235,44 @@ func TestBloomFiltersEndToEnd(t *testing.T) {
expectedLine := fmt.Sprintf(lineTpl, uniqueStrings[randIdx])
require.Equal(t, expectedLine, resp.Data.Stream[0].Values[0][1])

// TODO(chaudum):
// verify that bloom blocks have actually been used for querying
// atm, we can only verify by logs, so we should add appropriate metrics for
// uploaded/downloaded blocks and metas
// verify metrics that observe usage of block for filtering
bloomGwMetrics, err := cliBloomGateway.Metrics()
require.NoError(t, err)

unfilteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_pre_filtering", bloomGwMetrics)
require.Equal(t, float64(10), unfilteredCount)

filteredCount := getMetricValue(t, "loki_bloom_gateway_chunkrefs_post_filtering", bloomGwMetrics)
require.Equal(t, float64(1), filteredCount)

mf, err := extractMetricFamily("loki_bloom_gateway_bloom_query_latency", bloomGwMetrics)
require.NoError(t, err)

count := getValueFromMetricFamilyWithFunc(mf, &dto.LabelPair{
Name: proto.String("status"),
Value: proto.String("success"),
}, func(m *dto.Metric) uint64 {
return m.Histogram.GetSampleCount()
})
require.Equal(t, uint64(1), count)
}

func getValueFromMF(mf *dto.MetricFamily, lbs []*dto.LabelPair) float64 {
return getValueFromMetricFamilyWithFunc(mf, lbs[0], func(m *dto.Metric) float64 { return m.Counter.GetValue() })
}

func getValueFromMetricFamilyWithFunc[R any](mf *dto.MetricFamily, lbs *dto.LabelPair, f func(*dto.Metric) R) R {
eq := func(e *dto.LabelPair) bool {
return e.GetName() == lbs.GetName() && e.GetValue() == lbs.GetValue()
}
var zero R
for _, m := range mf.Metric {
if !assert.ObjectsAreEqualValues(lbs, m.GetLabel()) {
if !slices.ContainsFunc(m.GetLabel(), eq) {
continue
}

return m.Counter.GetValue()
return f(m)
}

return 0
return zero
}

func assertCacheState(t *testing.T, metrics string, e *expectedCacheState) {
Expand Down
18 changes: 13 additions & 5 deletions integration/parse_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,24 @@ var (
ErrInvalidMetricType = fmt.Errorf("invalid metric type")
)

func extractMetric(metricName, metrics string) (float64, map[string]string, error) {
func extractMetricFamily(name, metrics string) (*io_prometheus_client.MetricFamily, error) {
var parser expfmt.TextParser
mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics))
if err != nil {
return 0, nil, err
return nil, err
}

mf, ok := mfs[name]
if !ok {
return nil, ErrNoMetricFound
}
return mf, nil
}

mf, found := mfs[metricName]
if !found {
return 0, nil, ErrNoMetricFound
func extractMetric(metricName, metrics string) (float64, map[string]string, error) {
mf, err := extractMetricFamily(metricName, metrics)
if err != nil {
return 0, nil, err
}

var val float64
Expand Down
54 changes: 44 additions & 10 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ var (
)

type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRefsUnfiltered prometheus.Counter
chunkRefsFiltered prometheus.Counter
}

func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
Expand All @@ -102,9 +104,29 @@ func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *
MaxAge: time.Minute,
AgeBuckets: 6,
}),
chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_pre_filtering",
Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.",
}),
chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_post_filtering",
Help: "Total amount of chunk refs post filtering.",
}),
}
}

func (m *metrics) addUnfilteredCount(n int) {
m.chunkRefsUnfiltered.Add(float64(n))
}

func (m *metrics) addFilteredCount(n int) {
m.chunkRefsFiltered.Add(float64(n))
}

// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Expand Down Expand Up @@ -284,8 +306,12 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
return nil, err
}

numChunksUnfiltered := len(req.Refs)

// Shortcut if request does not contain filters
if len(req.Filters) == 0 {
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
Expand Down Expand Up @@ -313,6 +339,7 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
responses := responsesPool.Get(requestCount)
defer responsesPool.Put(responses)

outer:
for {
select {
case <-ctx.Done():
Expand All @@ -325,17 +352,24 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
level.Debug(g.logger).Log("msg", "got partial result", "task", task.ID, "tenant", tenantID, "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len(), "progress", fmt.Sprintf("%d/%d", len(responses), requestCount))
// wait for all parts of the full response
if len(responses) == requestCount {
for _, o := range responses {
if res.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
}
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
break outer
}
}
}

for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
// we must not remove items from req.Refs as long as the worker may iterater over them
g.removeNotMatchingChunks(req, o)
}

g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))

level.Debug(g.logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs))
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}

func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) {
Expand Down
29 changes: 20 additions & 9 deletions pkg/bloomgateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type workerMetrics struct {
dequeueErrors *prometheus.CounterVec
dequeueWaitTime *prometheus.SummaryVec
storeAccessLatency *prometheus.HistogramVec
bloomQueryLatency *prometheus.HistogramVec
}

func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem string) *workerMetrics {
Expand All @@ -50,6 +51,13 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Name: "dequeue_wait_time",
Help: "Time spent waiting for dequeuing tasks from queue",
}, labels),
bloomQueryLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "bloom_query_latency",
Help: "Latency in seconds of processing bloom blocks",
}, append(labels, "status")),
// TODO(chaudum): Move this metric into the bloomshipper
storeAccessLatency: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down Expand Up @@ -213,29 +221,32 @@ func (w *worker) processBlocksWithCallback(taskCtx context.Context, tenant strin
return w.store.ForEach(taskCtx, tenant, blockRefs, func(bq *v1.BlockQuerier, minFp, maxFp uint64) error {
for _, b := range boundedRefs {
if b.blockRef.MinFingerprint == minFp && b.blockRef.MaxFingerprint == maxFp {
processBlock(bq, day, b.tasks)
return nil
return w.processBlock(bq, day, b.tasks)
}
}
return nil
})
}

func processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) {
func (w *worker) processBlock(blockQuerier *v1.BlockQuerier, day time.Time, tasks []Task) error {
schema, err := blockQuerier.Schema()
if err != nil {
for _, t := range tasks {
t.ErrCh <- errors.Wrap(err, "failed to get block schema")
}
return err
}

tokenizer := v1.NewNGramTokenizer(schema.NGramLen(), 0)
it := newTaskMergeIterator(day, tokenizer, tasks...)
fq := blockQuerier.Fuse([]v1.PeekingIterator[v1.Request]{it})

start := time.Now()
err = fq.Run()
duration := time.Since(start).Seconds()

if err != nil {
for _, t := range tasks {
t.ErrCh <- errors.Wrap(err, "failed to run chunk check")
}
w.metrics.bloomQueryLatency.WithLabelValues(w.id, "failure").Observe(duration)
return err
}

w.metrics.bloomQueryLatency.WithLabelValues(w.id, "success").Observe(duration)
return nil
}
4 changes: 2 additions & 2 deletions pkg/logql/log/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ type LabelsBuilder struct {
// NewBaseLabelsBuilderWithGrouping creates a new base labels builder with grouping to compute results.
func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint, without, noLabels bool) *BaseLabelsBuilder {
if parserKeyHints == nil {
parserKeyHints = noParserHints
parserKeyHints = NoParserHints()
}

const labelsCapacity = 16
Expand All @@ -179,7 +179,7 @@ func NewBaseLabelsBuilderWithGrouping(groups []string, parserKeyHints ParserHint

// NewBaseLabelsBuilder creates a new base labels builder.
func NewBaseLabelsBuilder() *BaseLabelsBuilder {
return NewBaseLabelsBuilderWithGrouping(nil, noParserHints, false, false)
return NewBaseLabelsBuilderWithGrouping(nil, NoParserHints(), false, false)
}

// ForLabels creates a labels builder for a given labels set as base.
Expand Down
Loading

0 comments on commit cf851de

Please sign in to comment.