Skip to content

Commit

Permalink
fix: make detected fields work for both json and proto (#12682)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Apr 19, 2024
1 parent ee0020c commit f68d1f7
Show file tree
Hide file tree
Showing 9 changed files with 468 additions and 196 deletions.
10 changes: 5 additions & 5 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (c *Client) PushOTLPLogLine(line string, timestamp time.Time, logAttributes
return c.pushOTLPLogLine(line, timestamp, logAttributes)
}

func formatTS(ts time.Time) string {
func FormatTS(ts time.Time) string {
return strconv.FormatInt(ts.UnixNano(), 10)
}

Expand All @@ -130,7 +130,7 @@ func (c *Client) pushLogLine(line string, timestamp time.Time, structuredMetadat
},
Values: [][]any{
{
formatTS(timestamp),
FormatTS(timestamp),
line,
structuredMetadata,
},
Expand Down Expand Up @@ -509,7 +509,7 @@ func (c *Client) RunQuery(ctx context.Context, query string, extraHeaders ...Hea

v := url.Values{}
v.Set("query", query)
v.Set("time", formatTS(c.Now.Add(time.Second)))
v.Set("time", FormatTS(c.Now.Add(time.Second)))

u, err := url.Parse(c.baseURL)
if err != nil {
Expand Down Expand Up @@ -568,8 +568,8 @@ func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
func (c *Client) rangeQueryURL(query string, start, end time.Time) string {
v := url.Values{}
v.Set("query", query)
v.Set("start", formatTS(start))
v.Set("end", formatTS(end))
v.Set("start", FormatTS(start))
v.Set("end", FormatTS(end))

u, err := url.Parse(c.baseURL)
if err != nil {
Expand Down
223 changes: 223 additions & 0 deletions integration/explore_logs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
//go:build integration

package integration

import (
"context"
"encoding/json"
"io"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/v3/integration/client"
"github.com/grafana/loki/v3/integration/cluster"
)

type DetectedField struct {
Label string `json:"label"`
Type string `json:"type"`
Cardinality uint64 `json:"cardinality"`
}

type DetectedFields []DetectedField
type DetectedFieldResponse struct {
Fields DetectedFields `json:"fields"`
}

func Test_ExploreLogsApis(t *testing.T) {
clu := cluster.New(nil, cluster.SchemaWithTSDBAndTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
assert.NoError(t, clu.Cleanup())
}()

// run initially the compactor, indexgateway, and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-compactor.compaction-interval=1s",
"-compactor.retention-delete-delay=1s",
// By default, a minute is added to the delete request start time. This compensates for that.
"-compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())

// then, run only the ingester and query scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())

// the run querier.
var (
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())

// finally, run the query-frontend.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
"-querier.per-request-limits-enabled=true",
"-frontend.encoding=protobuf",
"-querier.shard-aggregations=quantile_over_time",
"-frontend.tail-proxy-url="+tQuerier.HTTPURL(),
)
)
require.NoError(t, clu.Run())

tenantID := randStringRunes()

now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now

t.Run("/detected_fields", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now.Add(-45*time.Minute), nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=blue", now.Add(-45*time.Minute), nil, map[string]string{"job": "fake"}))

require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now.Add(-5*time.Second), nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=purple", now.Add(-5*time.Second), nil, map[string]string{"job": "fake"}))

require.NoError(t, cliDistributor.PushLogLine("foo=bar color=green", now, nil, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("foo=bar color=red", now, nil, map[string]string{"job": "fake"}))

// validate logs are there
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)

var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"foo=bar color=red", "foo=bar color=blue", "foo=bar color=red", "foo=bar color=purple", "foo=bar color=green", "foo=bar color=red"}, lines)

t.Run("non-split queries", func(t *testing.T) {
start := cliQueryFrontend.Now.Add(-1 * time.Minute)
end := cliQueryFrontend.Now.Add(time.Minute)

v := url.Values{}
v.Set("query", `{job="fake"}`)
v.Set("start", client.FormatTS(start))
v.Set("end", client.FormatTS(end))

u := url.URL{}
u.Path = "/loki/api/v1/detected_fields"
u.RawQuery = v.Encode()
dfResp, err := cliQueryFrontend.Get(u.String())
require.NoError(t, err)
defer dfResp.Body.Close()

buf, err := io.ReadAll(dfResp.Body)
require.NoError(t, err)

var detectedFieldResponse DetectedFieldResponse
err = json.Unmarshal(buf, &detectedFieldResponse)
require.NoError(t, err)

require.Equal(t, 2, len(detectedFieldResponse.Fields))

var fooField, colorField DetectedField
for _, field := range detectedFieldResponse.Fields {
if field.Label == "foo" {
fooField = field
}

if field.Label == "color" {
colorField = field
}
}

require.Equal(t, "string", fooField.Type)
require.Equal(t, "string", colorField.Type)
require.Equal(t, uint64(1), fooField.Cardinality)
require.Equal(t, uint64(3), colorField.Cardinality)
})

t.Run("split queries", func(t *testing.T) {
start := cliQueryFrontend.Now.Add(-24 * time.Hour)
end := cliQueryFrontend.Now.Add(time.Minute)

v := url.Values{}
v.Set("query", `{job="fake"}`)
v.Set("start", client.FormatTS(start))
v.Set("end", client.FormatTS(end))

u := url.URL{}
u.Path = "/loki/api/v1/detected_fields"
u.RawQuery = v.Encode()
dfResp, err := cliQueryFrontend.Get(u.String())
require.NoError(t, err)
defer dfResp.Body.Close()

buf, err := io.ReadAll(dfResp.Body)
require.NoError(t, err)

var detectedFieldResponse DetectedFieldResponse
err = json.Unmarshal(buf, &detectedFieldResponse)
require.NoError(t, err)

require.Equal(t, 2, len(detectedFieldResponse.Fields))

var fooField, colorField DetectedField
for _, field := range detectedFieldResponse.Fields {
if field.Label == "foo" {
fooField = field
}

if field.Label == "color" {
colorField = field
}
}

require.Equal(t, "string", fooField.Type)
require.Equal(t, "string", colorField.Type)
require.Equal(t, uint64(1), fooField.Cardinality)
require.Equal(t, uint64(4), colorField.Cardinality)
})
})
}
Loading

0 comments on commit f68d1f7

Please sign in to comment.