From 23ac9c8f61821f9d51f42d0489a96700b5873b25 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Fri, 27 Oct 2023 13:53:27 +0200 Subject: [PATCH] Enable protobuf encoding if feature flag is set. --- pkg/lokifrontend/frontend/v2/frontend.go | 47 ++++++++++++------- .../frontend/v2/frontend_scheduler_worker.go | 7 +++ .../queryrange/queryrangebase/roundtrip.go | 5 -- pkg/querier/queryrange/roundtrip.go | 3 -- .../loki-boltdb-storage-s3/config/loki.yaml | 2 +- 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/pkg/lokifrontend/frontend/v2/frontend.go b/pkg/lokifrontend/frontend/v2/frontend.go index 7785c5b27758..ba42cabce87f 100644 --- a/pkg/lokifrontend/frontend/v2/frontend.go +++ b/pkg/lokifrontend/frontend/v2/frontend.go @@ -51,6 +51,9 @@ type Config struct { // If set, address is not computed from interfaces. Addr string `yaml:"address" doc:"hidden"` Port int `doc:"hidden"` + + // Defines the encoding for requests to and responses from the scheduduler and querier. + Encoding string `yaml:"encoding"` } func (cfg *Config) RegisterFlags(f *flag.FlagSet) { @@ -65,6 +68,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.Port, "frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).") cfg.GRPCClientConfig.RegisterFlagsWithPrefix("frontend.grpc-client-config", f) + + f.StringVar(&cfg.Encoding, "frontend.encoding", "json", "Defines the encoding for requests to and responses from the scheduduler and querier. Can be 'json' or 'protobuf' (defaults to 'json').") } // Frontend implements GrpcRoundTripper. It queues HTTP requests, @@ -276,25 +281,8 @@ func (f *Frontend) Do(ctx context.Context, req queryrangebase.Request) (queryran ctx, cancel := context.WithCancel(ctx) defer cancel() - // For backwards comaptibility we are sending both encodings - httpReq, err := f.codec.EncodeRequest(ctx, req) - if err != nil { - return nil, fmt.Errorf("connot convert request to HTTP request: %w", err) - } - - if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil { - return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) - } - httpgrpcReq, err := server.HTTPRequest(httpReq) - if err != nil { - return nil, fmt.Errorf("connot convert HTTP request to gRPC request: %w", err) - } - - // TODO: check feature flag to set freq.queryRequest = QueryRequestWrap(req) - freq := &frontendRequest{ queryID: f.lastQueryID.Inc(), - request: httpgrpcReq, tenantID: tenantID, actor: httpreq.ExtractActorPath(ctx), statsEnabled: stats.IsEnabled(ctx), @@ -307,6 +295,31 @@ func (f *Frontend) Do(ctx context.Context, req queryrangebase.Request) (queryran response: make(chan *frontendv2pb.QueryResultRequest, 1), } + if f.cfg.Encoding == "protobuf" { + // TODO: add metadata + freq.queryRequest, err = queryrange.QueryRequestWrap(req) + if err != nil { + return nil, fmt.Errorf("cannot wrap request: %w", err) + } + } else { + + // For backwards comaptibility we are sending both encodings + httpReq, err := f.codec.EncodeRequest(ctx, req) + if err != nil { + return nil, fmt.Errorf("cannot convert request to HTTP request: %w", err) + } + + if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil { + return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + } + + freq.request, err = server.HTTPRequest(httpReq) + if err != nil { + return nil, fmt.Errorf("cannot convert HTTP request to gRPC request: %w", err) + } + } + + cancelCh, err := f.enqueue(ctx, freq) defer f.requests.delete(freq.queryID) if err != nil { diff --git a/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go b/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go index 61a89269d78d..74f30a59b161 100644 --- a/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go @@ -294,6 +294,13 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro FrontendAddress: w.frontendAddr, StatsEnabled: req.statsEnabled, } + + if req.queryRequest != nil { + msg.Request = &schedulerpb.FrontendToScheduler_QueryRequest{ + QueryRequest: req.queryRequest, + } + } + err := loop.Send(msg) if err != nil { req.enqueue <- enqueueResult{status: failed} diff --git a/pkg/querier/queryrange/queryrangebase/roundtrip.go b/pkg/querier/queryrange/queryrangebase/roundtrip.go index a2dc31be0bbc..847d311323c1 100644 --- a/pkg/querier/queryrange/queryrangebase/roundtrip.go +++ b/pkg/querier/queryrange/queryrangebase/roundtrip.go @@ -38,9 +38,6 @@ type Config struct { CacheResults bool `yaml:"cache_results"` MaxRetries int `yaml:"max_retries"` ShardedQueries bool `yaml:"parallelise_shardable_queries"` - - // Required format for querier responses - RequiredQueryResponseFormat string `yaml:"required_query_response_format"` } // RegisterFlags adds the flags required to config this to the given FlagSet. @@ -50,8 +47,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.") f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.") - f.StringVar(&cfg.RequiredQueryResponseFormat, "frontend.required-query-response-format", "json", "The downstream querier is required to answer in the accepted format. Can be 'json' or 'protobuf'. Note: Both will still be routed over GRPC.") - cfg.ResultsCacheConfig.RegisterFlags(f) } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 91c098dd933d..3db5d15ee506 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -149,9 +149,6 @@ func NewMiddleware( } var codec base.Codec = DefaultCodec - if cfg.RequiredQueryResponseFormat == "protobuf" { - codec = &RequestProtobufCodec{} - } indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, statsCache, cacheGenNumLoader, retentionEnabled, metrics) diff --git a/tools/dev/loki-boltdb-storage-s3/config/loki.yaml b/tools/dev/loki-boltdb-storage-s3/config/loki.yaml index be676796d472..770b1563eec3 100644 --- a/tools/dev/loki-boltdb-storage-s3/config/loki.yaml +++ b/tools/dev/loki-boltdb-storage-s3/config/loki.yaml @@ -28,6 +28,7 @@ frontend: compress_responses: true log_queries_longer_than: 5s max_outstanding_per_tenant: 512 + encoding: protobuf frontend_worker: grpc_client_config: max_send_msg_size: 1.048576e+08 @@ -98,7 +99,6 @@ query_range: cache: memcached_client: addresses: memcached:11211 - required_query_response_format: protobuf schema_config: configs: - from: "2020-07-30"