Skip to content

Commit

Permalink
feat: Enable warnings in Loki query responses
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle committed Apr 1, 2024
1 parent 018856c commit f2b3963
Show file tree
Hide file tree
Showing 29 changed files with 955 additions and 367 deletions.
3 changes: 3 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"flag"
"fmt"
"github.com/grafana/loki/pkg/logqlmodel/metadata"
"math/rand"
"net/http"
"os"
Expand Down Expand Up @@ -874,6 +875,7 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { /
func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error {
// initialize stats collection for ingester queries.
_, ctx := stats.NewContext(queryServer.Context())
_, ctx = metadata.NewContext(ctx)

if req.Plan == nil {
parsed, err := syntax.ParseLogSelector(req.Selector, true)
Expand Down Expand Up @@ -933,6 +935,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error {
// initialize stats collection for ingester queries.
_, ctx := stats.NewContext(queryServer.Context())
_, ctx = metadata.NewContext(ctx)
sp := opentracing.SpanFromContext(ctx)

// If the plan is empty we want all series to be returned.
Expand Down
9 changes: 9 additions & 0 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"context"
"fmt"
"github.com/grafana/loki/pkg/logqlmodel/metadata"
"math"
"net/http"
"os"
Expand Down Expand Up @@ -953,6 +954,7 @@ type QuerierQueryServer interface {

func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error {
stats := stats.FromContext(ctx)
metadata := metadata.FromContext(ctx)

// send until the limit is reached.
for limit != 0 && !isDone(ctx) {
Expand All @@ -971,6 +973,7 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ

stats.AddIngesterBatch(int64(batchSize))
batch.Stats = stats.Ingester()
batch.Warnings = metadata.Warnings()

if isDone(ctx) {
break
Expand All @@ -985,6 +988,7 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ
}

stats.Reset()
metadata.Reset()
}
return nil
}
Expand All @@ -993,6 +997,7 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer
sp := opentracing.SpanFromContext(ctx)

stats := stats.FromContext(ctx)
metadata := metadata.FromContext(ctx)
for !isDone(ctx) {
batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize)
if err != nil {
Expand All @@ -1001,6 +1006,8 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer

stats.AddIngesterBatch(int64(size))
batch.Stats = stats.Ingester()
batch.Warnings = metadata.Warnings()

if isDone(ctx) {
break
}
Expand All @@ -1014,6 +1021,8 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer
}

stats.Reset()
metadata.Reset()

if sp != nil {
sp.LogKV("event", "sent batch", "size", size)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/iter/entry_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package iter

import (
"context"
"github.com/grafana/loki/pkg/logqlmodel/metadata"
"io"
"math"
"sync"
Expand Down Expand Up @@ -379,6 +380,7 @@ func (i *queryClientIterator) Next() bool {
return false
}
stats.JoinIngesters(ctx, batch.Stats)
_ = metadata.AddWarnings(ctx, batch.Warnings...)
i.curr = NewQueryResponseIterator(batch, i.direction)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/iter/sample_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package iter
import (
"container/heap"
"context"
"github.com/grafana/loki/pkg/logqlmodel/metadata"
"io"
"sync"

Expand Down Expand Up @@ -490,6 +491,8 @@ func (i *sampleQueryClientIterator) Next() bool {
return false
}
stats.JoinIngesters(ctx, batch.Stats)
_ = metadata.AddWarnings(ctx, batch.Warnings...)

i.curr = NewSampleQueryResponseIterator(batch)
}
return true
Expand Down
14 changes: 12 additions & 2 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,25 @@ const (

// QueryResponse represents the http json response to a Loki range and instant query
type QueryResponse struct {
Status string `json:"status"`
Data QueryResponseData `json:"data"`
Status string `json:"status"`
Warnings []string `json:"warnings,omitempty"`
Data QueryResponseData `json:"data"`
}

func (q *QueryResponse) UnmarshalJSON(data []byte) error {
return jsonparser.ObjectEach(data, func(key, value []byte, dataType jsonparser.ValueType, offset int) error {
switch string(key) {
case "status":
q.Status = string(value)
case "warnings":
var warnings []string
if _, err := jsonparser.ArrayEach(value, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
warnings = append(warnings, string(value))
}); err != nil {
return err
}

q.Warnings = warnings
case "data":
var responseData QueryResponseData
if err := responseData.UnmarshalJSON(value); err != nil {
Expand Down
449 changes: 290 additions & 159 deletions pkg/logproto/logproto.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/logproto/logproto.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ message QueryResponse {
(gogoproto.nullable) = true
];
stats.Ingester stats = 2 [(gogoproto.nullable) = false];
repeated string warnings = 3;
}

message SampleQueryResponse {
Expand All @@ -110,6 +111,7 @@ message SampleQueryResponse {
(gogoproto.nullable) = true
];
stats.Ingester stats = 2 [(gogoproto.nullable) = false];
repeated string warnings = 3;
}

enum Direction {
Expand Down
20 changes: 17 additions & 3 deletions pkg/logql/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ type AccumulatedStreams struct {
streams []*logproto.Stream
order logproto.Direction

stats stats.Result // for accumulating statistics from downstream requests
headers map[string][]string // for accumulating headers from downstream requests
stats stats.Result // for accumulating statistics from downstream requests
headers map[string][]string // for accumulating headers from downstream requests
warnings map[string]struct{} // for accumulating warnings from downstream requests
}

// NewStreamAccumulator returns an accumulator for limited log queries.
Expand All @@ -113,7 +114,8 @@ func NewStreamAccumulator(params Params) *AccumulatedStreams {
order: order,
limit: int(params.Limit()),

headers: make(map[string][]string),
headers: make(map[string][]string),
warnings: make(map[string]struct{}),
}
}

Expand Down Expand Up @@ -353,6 +355,14 @@ func (acc *AccumulatedStreams) Result() []logqlmodel.Result {
)
}

warnings := make([]string, 0, len(acc.warnings))
for w := range acc.warnings {
warnings = append(warnings, w)
}
sort.Strings(warnings)

res.Warnings = warnings

return []logqlmodel.Result{res}
}

Expand All @@ -367,6 +377,10 @@ func (acc *AccumulatedStreams) Accumulate(_ context.Context, x logqlmodel.Result
acc.stats.Merge(x.Statistics)
metadata.ExtendHeaders(acc.headers, x.Headers)

for _, w := range x.Warnings {
acc.warnings[w] = struct{}{}
}

switch got := x.Data.(type) {
case logqlmodel.Streams:
for i := range got {
Expand Down
4 changes: 4 additions & 0 deletions pkg/logql/downstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,10 @@ func (ev DownstreamEvaluator) Downstream(ctx context.Context, queries []Downstre
}

for _, res := range results {
if err := metadata.AddWarnings(ctx, res.Warnings...); err != nil {
level.Warn(util_log.Logger).Log("msg", "unable to add headers to results context", "error", err)
}

if err := metadata.JoinHeaders(ctx, res.Headers); err != nil {
level.Warn(util_log.Logger).Log("msg", "unable to add headers to results context", "error", err)
break
Expand Down
4 changes: 4 additions & 0 deletions pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,14 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
RecordRangeAndInstantQueryMetrics(ctx, q.logger, q.params, strconv.Itoa(status), statResult, data)
}

warnings := metadataCtx.Warnings()
_ = warnings

return logqlmodel.Result{
Data: data,
Statistics: statResult,
Headers: metadataCtx.Headers(),
Warnings: metadataCtx.Warnings(),
}, err
}

Expand Down
1 change: 1 addition & 0 deletions pkg/logqlmodel/logqlmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Result struct {
Data parser.Value
Statistics stats.Result
Headers []*definitions.PrometheusResponseHeader
Warnings []string
}

// Streams is promql.Value
Expand Down
49 changes: 45 additions & 4 deletions pkg/logqlmodel/metadata/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ var (

// Context is the metadata context. It is passed through the query path and accumulates metadata.
type Context struct {
mtx sync.Mutex
headers map[string][]string
mtx sync.Mutex
headers map[string][]string
warnings map[string]struct{}
}

// NewContext creates a new metadata context
func NewContext(ctx context.Context) (*Context, context.Context) {
contextData := &Context{
headers: map[string][]string{},
headers: map[string][]string{},
warnings: map[string]struct{}{},
}
ctx = context.WithValue(ctx, metadataKey, contextData)
return contextData, ctx
Expand All @@ -45,7 +47,8 @@ func FromContext(ctx context.Context) *Context {
v, ok := ctx.Value(metadataKey).(*Context)
if !ok {
return &Context{
headers: map[string][]string{},
headers: map[string][]string{},
warnings: map[string]struct{}{},
}
}
return v
Expand All @@ -72,6 +75,28 @@ func (c *Context) Headers() []*definitions.PrometheusResponseHeader {
return headers
}

func (c *Context) Warnings() []string {
c.mtx.Lock()
defer c.mtx.Unlock()

warnings := make([]string, 0, len(c.warnings))
for warning := range c.warnings {
warnings = append(warnings, warning)
}

sort.Strings(warnings)

return warnings
}

func (c *Context) Reset() {
c.mtx.Lock()
defer c.mtx.Unlock()

clear(c.headers)
clear(c.warnings)
}

// JoinHeaders merges a Headers with the embedded Headers in a context in a concurrency-safe manner.
// JoinHeaders will consolidate all distinct headers but will override same-named headers in an
// undefined way
Expand All @@ -94,3 +119,19 @@ func ExtendHeaders(dst map[string][]string, src []*definitions.PrometheusRespons
dst[header.Name] = header.Values
}
}

func AddWarnings(ctx context.Context, warnings ...string) error {
context, ok := ctx.Value(metadataKey).(*Context)
if !ok {
return ErrNoCtxData
}

context.mtx.Lock()
defer context.mtx.Unlock()

for _, w := range warnings {
context.warnings[w] = struct{}{}
}

return nil
}
Loading

0 comments on commit f2b3963

Please sign in to comment.