Skip to content

Commit

Permalink
Query Frontend: Job weights (grafana#4076)
Browse files Browse the repository at this point in the history
The query frontend treats all jobs as the same size when it farms them out to the queriers. This can cause querier instability b/c some jobs actually require quite a bit more resources to execute. By assigning weights to jobs we can reduce the amount each querier is asked to do will hopefully:

reduce querier OOMs/timeouts/retries
reduce querier latency
increase total throughput
Other changes

Removed the roundtripper httpgrpc bridge and pushed the concept of pipeline.Request all the way down into the cortex frontend code. This can be a nice perf improvement b/c translating http -> httpgrpc is costly and we are pushing it to the last moment. Currently for some queries we are translating thousands of jobs and then throwing them away.
Removed redundant parseQuery and createFetchSpansRequest to consolidate on the Compile function in pkg/traceql
Check for context error before going through retry logic in retryWare. This causes retry metrics to be more accurate in the event of many cancelled jobs.
  • Loading branch information
joe-elliott authored Oct 11, 2024
1 parent eca7f9c commit 5aef523
Show file tree
Hide file tree
Showing 26 changed files with 581 additions and 247 deletions.
32 changes: 20 additions & 12 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@ package frontend

import (
"flag"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/tempo/modules/frontend/transport"
"github.com/grafana/tempo/modules/frontend/pipeline"
v1 "github.com/grafana/tempo/modules/frontend/v1"
"github.com/grafana/tempo/pkg/usagestats"
)

var statVersion = usagestats.NewString("frontend_version")

type Config struct {
Config v1.Config `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
Search SearchConfig `yaml:"search"`
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
Metrics MetricsConfig `yaml:"metrics"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
ResponseConsumers int `yaml:"response_consumers"`

Config v1.Config `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
Search SearchConfig `yaml:"search"`
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
Metrics MetricsConfig `yaml:"metrics"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
ResponseConsumers int `yaml:"response_consumers"`
Weights pipeline.WeightsConfig `yaml:"weights"`
// the maximum time limit that tempo will work on an api request. this includes both
// grpc and http requests and applies to all "api" frontend query endpoints such as
// traceql, tag search, tag value search, trace by id and all streaming gRPC endpoints.
Expand All @@ -32,6 +31,9 @@ type Config struct {

// A list of regexes for black listing requests, these will apply for every request regardless the endpoint
URLDenyList []string `yaml:"url_deny_list,omitempty"`

RequestWithWeights bool `yaml:"request_with_weights,omitempty"`
RetryWithWeights bool `yaml:"retry_with_weights,omitempty"`
}

type SearchConfig struct {
Expand Down Expand Up @@ -95,6 +97,12 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
},
SLO: slo,
}
cfg.Weights = pipeline.WeightsConfig{
RequestWithWeights: true,
RetryWithWeights: true,
MaxRegexConditions: 1,
MaxTraceQLConditions: 4,
}

// enable multi tenant queries by default
cfg.MultiTenantQueriesEnabled = true
Expand All @@ -107,12 +115,12 @@ type CortexNoQuerierLimits struct{}
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg v1.Config, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
func InitFrontend(cfg v1.Config, log log.Logger, reg prometheus.Registerer) (pipeline.RoundTripper, *v1.Frontend, error) {
statVersion.Set("v1")
// No scheduler = use original frontend.
fr, err := v1.New(cfg, log, reg)
if err != nil {
return nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil
return fr, fr, nil
}
11 changes: 8 additions & 3 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type QueryFrontend struct {
var tracer = otel.Tracer("modules/frontend")

// New returns a new QueryFrontend
func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempodb.Reader, cacheProvider cache.Provider, apiPrefix string, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader tempodb.Reader, cacheProvider cache.Provider, apiPrefix string, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
level.Info(logger).Log("msg", "creating middleware in query frontend")

if cfg.TraceByID.QueryShards < minQueryShards || cfg.TraceByID.QueryShards > maxQueryShards {
Expand Down Expand Up @@ -90,8 +90,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
return nil, fmt.Errorf("frontend metrics interval should be greater than 0")
}

retryWare := pipeline.NewRetryWare(cfg.MaxRetries, registerer)

retryWare := pipeline.NewRetryWare(cfg.MaxRetries, cfg.Weights.RetryWithWeights, registerer)
cacheWare := pipeline.NewCachingWare(cacheProvider, cache.RoleFrontendSearch, logger)
statusCodeWare := pipeline.NewStatusCodeAdjustWare()
traceIDStatusCodeWare := pipeline.NewStatusCodeAdjustWareWithAllowedCode(http.StatusNotFound)
Expand All @@ -101,6 +100,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
tracePipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
pipeline.NewWeightRequestWare(pipeline.TraceByID, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncTraceIDSharder(&cfg.TraceByID, logger),
},
Expand All @@ -111,6 +111,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLSearch, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncSearchSharder(reader, o, cfg.Search.Sharder, logger),
},
Expand All @@ -120,6 +121,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
searchTagsPipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagsRequest, logger),
},
Expand All @@ -129,6 +131,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
searchTagValuesPipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagValuesRequest, logger),
},
Expand All @@ -140,6 +143,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantUnsupportedMiddleware(cfg, logger),
},
[]pipeline.Middleware{statusCodeWare, retryWare},
Expand All @@ -150,6 +154,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, logger),
},
Expand Down
26 changes: 12 additions & 14 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math"
"net/http"
"time"

"github.com/go-kit/log" //nolint:all deprecated
Expand Down Expand Up @@ -69,7 +68,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
return pipeline.NewBadRequest(err), nil
}

expr, _, _, _, err := traceql.NewEngine().Compile(req.Query)
expr, _, _, _, err := traceql.Compile(req.Query)
if err != nil {
return pipeline.NewBadRequest(err), nil
}
Expand All @@ -89,7 +88,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
// Note: this is checked after alignment for consistency.
maxDuration := s.maxDuration(tenantID)
if maxDuration != 0 && time.Duration(req.End-req.Start)*time.Nanosecond > maxDuration {
err = fmt.Errorf(fmt.Sprintf("range specified by start and end (%s) exceeds %s. received start=%d end=%d", time.Duration(req.End-req.Start), maxDuration, req.Start, req.End))
err = fmt.Errorf("range specified by start and end (%s) exceeds %s. received start=%d end=%d", time.Duration(req.End-req.Start), maxDuration, req.Start, req.End)
return pipeline.NewBadRequest(err), nil
}

Expand All @@ -99,14 +98,14 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
cutoff = time.Now().Add(-s.cfg.QueryBackendAfter)
)

generatorReq := s.generatorRequest(*req, r, tenantID, cutoff)
generatorReq := s.generatorRequest(ctx, tenantID, pipelineRequest, *req, cutoff)
reqCh := make(chan pipeline.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics

if generatorReq != nil {
reqCh <- pipeline.NewHTTPRequest(generatorReq)
reqCh <- generatorReq
}

totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, r, *req, cutoff, targetBytesPerRequest, reqCh)
totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, pipelineRequest, *req, cutoff, targetBytesPerRequest, reqCh)

span.SetAttributes(attribute.Int64("totalJobs", int64(totalJobs)))
span.SetAttributes(attribute.Int64("totalBlocks", int64(totalBlocks)))
Expand Down Expand Up @@ -158,7 +157,7 @@ func (s *queryRangeSharder) exemplarsPerShard(total uint32) uint32 {
return uint32(math.Ceil(float64(s.cfg.MaxExemplars)*1.2)) / total
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, targetBytesPerRequest int, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, targetBytesPerRequest int, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand Down Expand Up @@ -204,7 +203,7 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string
return
}

func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- pipeline.Request) {
func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- pipeline.Request) {
defer close(reqCh)

queryHash := hashForQueryRangeRequest(&searchReq)
Expand All @@ -230,7 +229,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)
subR := parent.HTTPRequest().Clone(ctx)

dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
Expand Down Expand Up @@ -268,7 +267,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
subR = api.BuildQueryRangeRequest(subR, queryRangeReq, dedColsJSON)

prepareRequestForQueriers(subR, tenantID)
pipelineR := pipeline.NewHTTPRequest(subR)
pipelineR := parent.CloneFromHTTPRequest(subR)

// TODO: Handle sampling rate
key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch))
Expand All @@ -292,9 +291,8 @@ func max(a, b uint32) uint32 {
return b
}

func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, cutoff time.Time) *http.Request {
func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) *pipeline.HTTPRequest {
traceql.TrimToAfter(&searchReq, cutoff)

// if start == end then we don't need to query it
if searchReq.Start == searchReq.End {
return nil
Expand All @@ -303,12 +301,12 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest
searchReq.QueryMode = querier.QueryModeRecent
searchReq.Exemplars = uint32(s.cfg.MaxExemplars) // TODO: Review this

subR := parent.Clone(parent.Context())
subR := parent.HTTPRequest().Clone(ctx)
subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators

prepareRequestForQueriers(subR, tenantID)

return subR
return parent.CloneFromHTTPRequest(subR)
}

// maxDuration returns the max search duration allowed for this tenant.
Expand Down
125 changes: 125 additions & 0 deletions modules/frontend/pipeline/async_weight_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package pipeline

import (
"github.com/grafana/tempo/modules/frontend/combiner"
"github.com/grafana/tempo/pkg/traceql"
)

type RequestType int

type WeightRequest interface {
SetWeight(int)
Weight() int
}

type WeightsConfig struct {
RequestWithWeights bool `yaml:"request_with_weights,omitempty"`
RetryWithWeights bool `yaml:"retry_with_weights,omitempty"`
MaxTraceQLConditions int `yaml:"max_traceql_conditions,omitempty"`
MaxRegexConditions int `yaml:"max_regex_conditions,omitempty"`
}

type Weights struct {
DefaultWeight int
TraceQLSearchWeight int
TraceByIDWeight int
MaxTraceQLConditions int
MaxRegexConditions int
}

const (
Default RequestType = iota
TraceByID
TraceQLSearch
TraceQLMetrics
)

type weightRequestWare struct {
requestType RequestType
enabled bool
next AsyncRoundTripper[combiner.PipelineResponse]

weights Weights
}

// It increments the weight of a retriyed request
func IncrementRetriedRequestWeight(r WeightRequest) {
r.SetWeight(r.Weight() + 1)
}

// It returns a new weight request middleware
func NewWeightRequestWare(rt RequestType, cfg WeightsConfig) AsyncMiddleware[combiner.PipelineResponse] {
weights := Weights{
DefaultWeight: 1,
TraceQLSearchWeight: 1,
TraceByIDWeight: 2,
MaxTraceQLConditions: cfg.MaxTraceQLConditions,
MaxRegexConditions: cfg.MaxRegexConditions,
}
return AsyncMiddlewareFunc[combiner.PipelineResponse](func(next AsyncRoundTripper[combiner.PipelineResponse]) AsyncRoundTripper[combiner.PipelineResponse] {
return &weightRequestWare{
requestType: rt,
enabled: cfg.RequestWithWeights,
weights: weights,
next: next,
}
})
}

func (c weightRequestWare) RoundTrip(req Request) (Responses[combiner.PipelineResponse], error) {
c.setWeight(req)
return c.next.RoundTrip(req)
}

func (c weightRequestWare) setWeight(req Request) {
if !c.enabled {
req.SetWeight(c.weights.DefaultWeight)
return
}
switch c.requestType {
case TraceByID:
req.SetWeight(c.weights.TraceByIDWeight)
case TraceQLSearch, TraceQLMetrics:
c.setTraceQLWeight(req)
default:
req.SetWeight(c.weights.DefaultWeight)
}
}

func (c weightRequestWare) setTraceQLWeight(req Request) {
var traceQLQuery string
query := req.HTTPRequest().URL.Query()
if query.Has("q") {
traceQLQuery = query.Get("q")
}
if query.Has("query") {
traceQLQuery = query.Get("query")
}

req.SetWeight(c.weights.TraceQLSearchWeight)

if traceQLQuery == "" {
return
}

_, _, _, spanRequest, err := traceql.Compile(traceQLQuery)
if err != nil || spanRequest == nil {
return
}

conditions := 0
regexConditions := 0

for _, c := range spanRequest.Conditions {
if c.Op != traceql.OpNone {
conditions++
}
if c.Op == traceql.OpRegex || c.Op == traceql.OpNotRegex {
regexConditions++
}
}
complexQuery := regexConditions >= c.weights.MaxRegexConditions || conditions >= c.weights.MaxTraceQLConditions
if complexQuery {
req.SetWeight(c.weights.TraceQLSearchWeight + 1)
}
}
Loading

0 comments on commit 5aef523

Please sign in to comment.