Skip to content

Commit

Permalink
Support protobuf QueryRequest in querier. (#10858)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This is one step to support pure protobuf encoding without `httpgrpc`.
It is only on the scheduler and is fully backwards compatible.

**Which issue(s) this PR fixes**:
This is a sub change of #10688

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)

---------

Signed-off-by: Kaviraj <[email protected]>
Co-authored-by: Kaviraj <[email protected]>
Co-authored-by: Danny Kopping <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2023
1 parent 6b6e5b8 commit 9fcc42d
Show file tree
Hide file tree
Showing 41 changed files with 1,415 additions and 1,358 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ require (
github.com/d4l3k/messagediff v1.2.1
github.com/efficientgo/core v1.0.0-rc.2
github.com/fsnotify/fsnotify v1.6.0
github.com/gogo/googleapis v1.4.0
github.com/grafana/loki/pkg/push v0.0.0-20231017172654-cfc4f0e84adc
github.com/heroku/x v0.0.61
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
Expand Down Expand Up @@ -218,7 +219,6 @@ require (
github.com/go-playground/validator/v10 v10.11.2 // indirect
github.com/go-zookeeper/zk v1.0.3 // indirect
github.com/gofrs/flock v0.8.1 // indirect
github.com/gogo/googleapis v1.4.0 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/btree v1.1.2 // indirect
Expand Down
36 changes: 35 additions & 1 deletion integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ func (c *Client) LabelNames(ctx context.Context) ([]string, error) {
return values.Data, nil
}

// LabelValues return a LabelValues query
// LabelValues return a LabelValues query result
func (c *Client) LabelValues(ctx context.Context, labelName string) ([]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
Expand Down Expand Up @@ -543,6 +543,40 @@ func (c *Client) LabelValues(ctx context.Context, labelName string) ([]string, e
return values.Data, nil
}

// Series return a series query result
func (c *Client) Series(ctx context.Context, matcher string) ([]map[string]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()

v := url.Values{}
v.Set("match[]", matcher)

u, err := url.Parse(c.baseURL)
if err != nil {
panic(err)
}
u.Path = "/loki/api/v1/series"
u.RawQuery = v.Encode()

buf, statusCode, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}

if statusCode/100 != 2 {
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
}

var values struct {
Data []map[string]string `json:"data"`
}
if err := json.Unmarshal(buf, &values); err != nil {
return nil, err
}

return values.Data, nil
}

func (c *Client) request(ctx context.Context, method string, url string) (*http.Request, error) {
ctx = user.InjectOrgID(ctx, c.instanceID)
req, err := http.NewRequestWithContext(ctx, method, url, nil)
Expand Down
6 changes: 6 additions & 0 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ func TestMicroServicesIngestQuery(t *testing.T) {
assert.ElementsMatch(t, []string{"fake"}, resp)
})

t.Run("series", func(t *testing.T) {
resp, err := cliQueryFrontend.Series(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.ElementsMatch(t, []map[string]string{{"job": "fake"}}, resp)
})

t.Run("per-request-limits", func(t *testing.T) {
queryLimitsPolicy := client.InjectHeadersOption(map[string][]string{querylimits.HTTPHeaderQueryLimitsKey: {`{"maxQueryLength": "1m"}`}})
cliQueryFrontendLimited := client.New(tenantID, "", tQueryFrontend.HTTPURL(), queryLimitsPolicy)
Expand Down
24 changes: 23 additions & 1 deletion pkg/loghttp/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (

"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)

const (
defaultQueryLimit = 100
defaultSince = 1 * time.Hour
defaultDirection = logproto.BACKWARD
)

func limit(r *http.Request) (uint32, error) {
Expand All @@ -39,7 +42,7 @@ func ts(r *http.Request) (time.Time, error) {
}

func direction(r *http.Request) (logproto.Direction, error) {
return parseDirection(r.Form.Get("direction"), logproto.BACKWARD)
return parseDirection(r.Form.Get("direction"), defaultDirection)
}

func shards(r *http.Request) []string {
Expand Down Expand Up @@ -178,3 +181,22 @@ func parseSecondsOrDuration(value string) (time.Duration, error) {
}
return 0, errors.Errorf("cannot parse %q to a valid duration", value)
}

// parseRegexQuery parses regex and query querystring from httpRequest and returns the combined LogQL query.
// This is used only to keep regexp query string support until it gets fully deprecated.
func parseRegexQuery(httpRequest *http.Request) (string, error) {
query := httpRequest.Form.Get("query")
regexp := httpRequest.Form.Get("regexp")
if regexp != "" {
expr, err := syntax.ParseLogSelector(query, true)
if err != nil {
return "", err
}
newExpr, err := syntax.AddFilterExpr(expr, labels.MatchRegexp, "", regexp)
if err != nil {
return "", err
}
query = newExpr.String()
}
return query, nil
}
61 changes: 61 additions & 0 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ import (
json "github.com/json-iterator/go"
"github.com/prometheus/common/model"

"github.com/grafana/dskit/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
"github.com/grafana/loki/pkg/util"
)

var (
Expand Down Expand Up @@ -400,6 +405,24 @@ type RangeQuery struct {
Shards []string
}

func NewRangeQueryWithDefaults() *RangeQuery {
start, end, _ := determineBounds(time.Now(), "", "", "")
result := &RangeQuery{
Start: start,
End: end,
Limit: defaultQueryLimit,
Direction: defaultDirection,
Interval: 0,
}
result.UpdateStep()
return result
}

// UpdateStep will adjust the step given new start and end.
func (q *RangeQuery) UpdateStep() {
q.Step = time.Duration(defaultQueryRangeStep(q.Start, q.End)) * time.Second
}

// ParseRangeQuery parses a RangeQuery request from an http request.
func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
var result RangeQuery
Expand Down Expand Up @@ -451,6 +474,23 @@ func ParseRangeQuery(r *http.Request) (*RangeQuery, error) {
return nil, errNegativeInterval
}

if GetVersion(r.URL.Path) == VersionLegacy {
result.Query, err = parseRegexQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

expr, err := syntax.ParseExpr(result.Query)
if err != nil {
return nil, err
}

// short circuit metric queries
if _, ok := expr.(syntax.SampleExpr); ok {
return nil, httpgrpc.Errorf(http.StatusBadRequest, "legacy endpoints only support %s result type", logqlmodel.ValueTypeStreams)
}
}

return &result, nil
}

Expand All @@ -460,6 +500,27 @@ func ParseIndexStatsQuery(r *http.Request) (*RangeQuery, error) {
return ParseRangeQuery(r)
}

func NewVolumeRangeQueryWithDefaults(matchers string) *logproto.VolumeRequest {
start, end, _ := determineBounds(time.Now(), "", "", "")
step := (time.Duration(defaultQueryRangeStep(start, end)) * time.Second).Milliseconds()
from, through := util.RoundToMilliseconds(start, end)
return &logproto.VolumeRequest{
From: from,
Through: through,
Matchers: matchers,
Limit: seriesvolume.DefaultLimit,
Step: step,
TargetLabels: nil,
AggregateBy: seriesvolume.DefaultAggregateBy,
}
}

func NewVolumeInstantQueryWithDefaults(matchers string) *logproto.VolumeRequest {
r := NewVolumeRangeQueryWithDefaults(matchers)
r.Step = 0
return r
}

type VolumeInstantQuery struct {
Start time.Time
End time.Time
Expand Down
7 changes: 7 additions & 0 deletions pkg/loghttp/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

json "github.com/json-iterator/go"

"github.com/grafana/dskit/httpgrpc"

"github.com/grafana/loki/pkg/logproto"
)

Expand Down Expand Up @@ -69,6 +71,11 @@ func ParseTailQuery(r *http.Request) (*logproto.TailRequest, error) {
Query: query(r),
}

req.Query, err = parseRegexQuery(r)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

req.Limit, err = limit(r)
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/logproto/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,3 +319,7 @@ func (m *VolumeRequest) LogToSpan(sp opentracing.Span) {
otlog.String("end", timestamp.Time(m.GetEnd()).String()),
)
}

func (*VolumeResponse) GetHeaders() []*definitions.PrometheusResponseHeader {
return nil
}
2 changes: 2 additions & 0 deletions pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ type Loki struct {
deleteClientMetrics *deletion.DeleteRequestClientMetrics

HTTPAuthMiddleware middleware.Interface

Codec worker.GRPCCodec
}

// New makes a new Loki.
Expand Down
Loading

0 comments on commit 9fcc42d

Please sign in to comment.