Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fallback on traditional query path for old data #13545

Closed
wants to merge 10 commits into from
35 changes: 12 additions & 23 deletions pkg/pattern/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,33 +307,21 @@ func (i *Ingester) QuerySample(
return err
}

level.Debug(i.logger).Log("msg", "QuerySample", "instanceID", instanceID, "expr", expr)
iterator, err := instance.QuerySample(ctx, expr, req) // this is returning a first value of 0,0
level.Debug(i.logger).Log("msg", "executing query sample in pattern ingester",
"instanceID", instanceID,
"expr", expr,
"start", req.Start,
"end", req.End,
"length", req.End.Sub(req.Start),
"step", req.Step,
)
iterator, err := instance.QuerySample(ctx, expr, req)
if err != nil {
return err
}

// TODO(twhitney): query store
// if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok {
// storeReq := logql.SelectSampleParams{SampleQueryRequest: &logproto.SampleQueryRequest{
// Start: start,
// End: end,
// Selector: req.Selector,
// Shards: req.Shards,
// Deletes: req.Deletes,
// Plan: req.Plan,
// }}
// storeItr, err := i.store.SelectSamples(ctx, storeReq)
// if err != nil {
// util.LogErrorWithContext(ctx, "closing iterator", it.Close)
// return err
// }

// it = iter.NewMergeSampleIterator(ctx, []iter.SampleIterator{it, storeItr})
// }

defer util.LogErrorWithContext(ctx, "closing iterator", iterator.Close)
return sendMetricSamples(ctx, iterator, stream, i.logger)
return sendMetricSamples(ctx, iterator, stream, i.logger, req)
}

func sendPatternSample(ctx context.Context, it pattern_iter.Iterator, stream logproto.Pattern_QueryServer) error {
Expand All @@ -357,9 +345,10 @@ func sendMetricSamples(
it loki_iter.SampleIterator,
stream logproto.Pattern_QuerySampleServer,
logger log.Logger,
req *logproto.QuerySamplesRequest,
) error {
for ctx.Err() == nil {
batch, err := pattern_iter.ReadMetricsBatch(it, readBatchSize, logger)
batch, err := pattern_iter.ReadMetrics(it, logger, req)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pattern/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (q *IngesterQuerier) Samples(
}

// TODO(twhitney): what should batch size be here?
resp, err := pattern_iter.ReadMetricsBatch(pattern_iter.NewSumMergeSampleIterator(iterators), math.MaxInt32, q.logger)
resp, err := pattern_iter.ReadMetrics(pattern_iter.NewSumMergeSampleIterator(iterators), q.logger, req)
if err != nil {
return nil, err
}
Expand Down
22 changes: 12 additions & 10 deletions pkg/pattern/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,22 @@ func (i *instance) QuerySample(
}

spanLogger := spanlogger.FromContext(ctx)
var withDebug log.Logger
if spanLogger != nil {
level.Debug(spanLogger).Log(
"msg", "summing results of querying streams",
"num_iters", len(iters),
"iters", fmt.Sprintf("%v", iters),
)
withDebug = level.Debug(spanLogger)
} else {
level.Debug(i.logger).Log(
"msg", "summing results of querying streams",
"num_iters", len(iters),
"iters", fmt.Sprintf("%v", iters),
)
withDebug = level.Debug(i.logger)
}

withDebug.Log(
"msg", "finished querying samples in pattern ingester",
"num_iters", len(iters),
"start", from,
"end", through,
"length", through.Sub(from),
"expr", expr,
)

return pattern_iter.NewSumMergeSampleIterator(iters), nil
}

Expand Down
37 changes: 29 additions & 8 deletions pkg/pattern/iter/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@ func ReadAll(it Iterator) (*logproto.QueryPatternsResponse, error) {
return ReadBatch(it, math.MaxInt32)
}

func ReadMetricsBatch(it iter.SampleIterator, batchSize int, logger log.Logger) (*logproto.QuerySamplesResponse, error) {
var (
series = map[uint64]logproto.Series{}
respSize int
)
func ReadMetrics(it iter.SampleIterator, logger log.Logger, req *logproto.QuerySamplesRequest) (*logproto.QuerySamplesResponse, error) {
series := map[uint64]logproto.Series{}

for ; respSize < batchSize && it.Next(); respSize++ {
var mint, maxt int64
for it.Next() && it.Err() == nil {
hash := it.StreamHash()
s, ok := series[hash]
if !ok {
Expand All @@ -56,7 +54,16 @@ func ReadMetricsBatch(it iter.SampleIterator, batchSize int, logger log.Logger)
series[hash] = s
}

s.Samples = append(s.Samples, it.At())
curSample := it.At()
if mint == 0 || curSample.Timestamp < mint {
mint = curSample.Timestamp
}

if maxt == 0 || curSample.Timestamp > maxt {
maxt = curSample.Timestamp
}

s.Samples = append(s.Samples, curSample)
series[hash] = s
}

Expand All @@ -68,10 +75,24 @@ func ReadMetricsBatch(it iter.SampleIterator, batchSize int, logger log.Logger)
level.Debug(logger).Log("msg", "appending series", "s", fmt.Sprintf("%v", s))
result.Series = append(result.Series, s)
}

if req != nil {
level.Debug(logger).Log("msg", "finished reading metrics",
"num_series", len(result.Series),
"start", req.Start,
"end", req.End,
"query", req.Query,
"length", req.End.Sub(req.Start),
"mint", mint,
"maxt", maxt,
"delta_seconds", maxt - mint / 1e9,
)
}

return &result, it.Err()
}

// ReadAllSamples reads all samples from the given iterator. It is only used in tests.
func ReadAllSamples(it iter.SampleIterator) (*logproto.QuerySamplesResponse, error) {
return ReadMetricsBatch(it, math.MaxInt32, log.NewNopLogger())
return ReadMetrics(it, log.NewNopLogger(), nil)
}
35 changes: 4 additions & 31 deletions pkg/pattern/iter/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,45 +91,20 @@ func TestReadMetricsBatch(t *testing.T) {
name string
pattern string
seriesIter []loki_iter.SampleIterator
batchSize int
expected *logproto.QuerySamplesResponse
}{
{
name: "ReadBatch empty iterator",
name: "ReadMetrics empty iterator",
seriesIter: singleSeriesIterator(logproto.Series{
Labels: "",
Samples: []logproto.Sample{},
}),
batchSize: 2,
expected: &logproto.QuerySamplesResponse{
Series: []logproto.Series{},
},
},
{
name: "ReadBatch less than batchSize",
seriesIter: singleSeriesIterator(logproto.Series{
Labels: `{foo="bar"}`,
Samples: []logproto.Sample{
{Timestamp: 10, Value: 2},
{Timestamp: 20, Value: 4},
{Timestamp: 30, Value: 6},
},
}),
batchSize: 2,
expected: &logproto.QuerySamplesResponse{
Series: []logproto.Series{
{
Labels: `{foo="bar"}`,
Samples: []logproto.Sample{
{Timestamp: 10, Value: 2},
{Timestamp: 20, Value: 4},
},
},
},
},
},
{
name: "ReadBatch more than batchSize",
name: "ReadMetrics reads all",
seriesIter: singleSeriesIterator(logproto.Series{
Labels: `{foo="bar"}`,
Samples: []logproto.Sample{
Expand All @@ -138,7 +113,6 @@ func TestReadMetricsBatch(t *testing.T) {
{Timestamp: 30, Value: 6},
},
}),
batchSize: 4,
expected: &logproto.QuerySamplesResponse{
Series: []logproto.Series{
{
Expand All @@ -153,7 +127,7 @@ func TestReadMetricsBatch(t *testing.T) {
},
},
{
name: "ReadBatch multiple series",
name: "ReadMetrics multiple series",
seriesIter: []loki_iter.SampleIterator{
loki_iter.NewSeriesIterator(logproto.Series{
Labels: `{foo="bar"}`,
Expand Down Expand Up @@ -181,7 +155,6 @@ func TestReadMetricsBatch(t *testing.T) {
StreamHash: labels.StableHash([]labels.Label{{Name: "foo", Value: "bar"}}),
}),
},
batchSize: 8,
expected: &logproto.QuerySamplesResponse{
Series: []logproto.Series{
{
Expand Down Expand Up @@ -210,7 +183,7 @@ func TestReadMetricsBatch(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
it := NewSumMergeSampleIterator(tt.seriesIter)
got, err := ReadMetricsBatch(it, tt.batchSize, log.NewNopLogger())
got, err := ReadAllSamples(it)
require.NoError(t, err)
sort.Slice(tt.expected.Series, func(i, j int) bool {
return tt.expected.Series[i].Labels < tt.expected.Series[j].Labels
Expand Down
10 changes: 9 additions & 1 deletion pkg/pattern/metric/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ func (c *Chunks) Iterator(
return iter.NewSeriesIterator(series), nil
}

samples := make([]logproto.Sample, 0, maximumSteps)
// samples := make([]logproto.Sample, 0, maximumSteps)
samples := []logproto.Sample{}
for _, chunk := range c.chunks {
ss, err := chunk.ForTypeAndRange(typ, from, through)
if err != nil {
Expand Down Expand Up @@ -221,13 +222,17 @@ type (
type Chunk struct {
Samples Samples
mint, maxt int64
lock sync.Mutex
}

func (c *Chunk) Bounds() (fromT, toT time.Time) {
return time.Unix(0, c.mint), time.Unix(0, c.maxt)
}

func (c *Chunk) AddSample(s Sample) {
c.lock.Lock()
defer c.lock.Unlock()

c.Samples = append(c.Samples, s)
ts := int64(s.Timestamp)

Expand Down Expand Up @@ -264,6 +269,9 @@ func (c *Chunk) ForTypeAndRange(
typ Type,
start, end model.Time,
) ([]logproto.Sample, error) {
c.lock.Lock()
defer c.lock.Unlock()

if typ == Unsupported {
return nil, fmt.Errorf("unsupported metric type")
}
Expand Down
30 changes: 27 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type SingleTenantQuerier struct {
store Store
limits Limits
ingesterQuerier *IngesterQuerier
patternQuerier PatterQuerier
patternQuerier PatternQuerier
deleteGetter deleteGetter
metrics *Metrics
logger log.Logger
Expand Down Expand Up @@ -1038,12 +1038,12 @@ func countLabelsAndCardinality(storeLabelsMap map[string][]string, ingesterLabel
return detectedLabels
}

type PatterQuerier interface {
type PatternQuerier interface {
Patterns(ctx context.Context, req *logproto.QueryPatternsRequest) (*logproto.QueryPatternsResponse, error)
Samples(ctx context.Context, req *logproto.QuerySamplesRequest) (*logproto.QuerySamplesResponse, error)
}

func (q *SingleTenantQuerier) WithPatternQuerier(pq PatterQuerier) {
func (q *SingleTenantQuerier) WithPatternQuerier(pq PatternQuerier) {
q.patternQuerier = pq
}

Expand All @@ -1063,11 +1063,35 @@ func (q *SingleTenantQuerier) SelectMetricSamples(ctx context.Context, req *logp
if q.patternQuerier == nil {
return nil, httpgrpc.Errorf(http.StatusNotFound, "")
}

res, err := q.patternQuerier.Samples(ctx, req)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

mint := int64(0)
maxt := int64(0)
for _, s := range res.Series {
for _, sample := range s.Samples {
if mint == 0 || sample.Timestamp < mint {
mint = sample.Timestamp
}
if maxt == 0 || sample.Timestamp > maxt {
maxt = sample.Timestamp
}
}
}
level.Debug(q.logger).Log("msg", "selecting metric samples",
"start", req.Start,
"end", req.End,
"query", req.Query,
"step", req.Step,
"length", req.End.Sub(req.Start),
"series", len(res.Series),
"mint", mint,
"maxt", maxt,
)

return res, err
}

Expand Down
38 changes: 38 additions & 0 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,41 @@ func (tl mockTenantLimits) TenantLimits(userID string) *validation.Limits {
func (tl mockTenantLimits) AllByUserID() map[string]*validation.Limits {
return tl
}

type patternQuerierMock struct {
util.ExtendedMock
}

func newPatternQuerierMock() *patternQuerierMock {
return &patternQuerierMock{}
}

func (p *patternQuerierMock) Patterns(
ctx context.Context,
req *logproto.QueryPatternsRequest,
) (*logproto.QueryPatternsResponse, error) {
args := p.Called(ctx, req)

resp := args.Get(0)
err := args.Error(1)
if resp == nil {
return nil, err
}

return resp.(*logproto.QueryPatternsResponse), err
}

func (p *patternQuerierMock) Samples(
ctx context.Context,
req *logproto.QuerySamplesRequest,
) (*logproto.QuerySamplesResponse, error) {
args := p.Called(ctx, req)

resp := args.Get(0)
err := args.Error(1)
if resp == nil {
return nil, err
}

return resp.(*logproto.QuerySamplesResponse), err
}
Loading
Loading