Skip to content

Commit

Permalink
Enable protobuf encoding if feature flag is set.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeschkies committed Oct 27, 2023
1 parent bb897a1 commit 23ac9c8
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 26 deletions.
47 changes: 30 additions & 17 deletions pkg/lokifrontend/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type Config struct {
// If set, address is not computed from interfaces.
Addr string `yaml:"address" doc:"hidden"`
Port int `doc:"hidden"`

// Defines the encoding for requests to and responses from the scheduduler and querier.
Encoding string `yaml:"encoding"`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -65,6 +68,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.Port, "frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).")

cfg.GRPCClientConfig.RegisterFlagsWithPrefix("frontend.grpc-client-config", f)

f.StringVar(&cfg.Encoding, "frontend.encoding", "json", "Defines the encoding for requests to and responses from the scheduduler and querier. Can be 'json' or 'protobuf' (defaults to 'json').")
}

// Frontend implements GrpcRoundTripper. It queues HTTP requests,
Expand Down Expand Up @@ -276,25 +281,8 @@ func (f *Frontend) Do(ctx context.Context, req queryrangebase.Request) (queryran
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// For backwards comaptibility we are sending both encodings
httpReq, err := f.codec.EncodeRequest(ctx, req)
if err != nil {
return nil, fmt.Errorf("connot convert request to HTTP request: %w", err)
}

if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
httpgrpcReq, err := server.HTTPRequest(httpReq)
if err != nil {
return nil, fmt.Errorf("connot convert HTTP request to gRPC request: %w", err)
}

// TODO: check feature flag to set freq.queryRequest = QueryRequestWrap(req)

freq := &frontendRequest{
queryID: f.lastQueryID.Inc(),
request: httpgrpcReq,
tenantID: tenantID,
actor: httpreq.ExtractActorPath(ctx),
statsEnabled: stats.IsEnabled(ctx),
Expand All @@ -307,6 +295,31 @@ func (f *Frontend) Do(ctx context.Context, req queryrangebase.Request) (queryran
response: make(chan *frontendv2pb.QueryResultRequest, 1),
}

if f.cfg.Encoding == "protobuf" {
// TODO: add metadata
freq.queryRequest, err = queryrange.QueryRequestWrap(req)
if err != nil {
return nil, fmt.Errorf("cannot wrap request: %w", err)
}
} else {

// For backwards comaptibility we are sending both encodings
httpReq, err := f.codec.EncodeRequest(ctx, req)
if err != nil {
return nil, fmt.Errorf("cannot convert request to HTTP request: %w", err)
}

if err := user.InjectOrgIDIntoHTTPRequest(ctx, httpReq); err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

freq.request, err = server.HTTPRequest(httpReq)
if err != nil {
return nil, fmt.Errorf("cannot convert HTTP request to gRPC request: %w", err)
}
}


cancelCh, err := f.enqueue(ctx, freq)
defer f.requests.delete(freq.queryID)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions pkg/lokifrontend/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,13 @@ func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFro
FrontendAddress: w.frontendAddr,
StatsEnabled: req.statsEnabled,
}

if req.queryRequest != nil {
msg.Request = &schedulerpb.FrontendToScheduler_QueryRequest{
QueryRequest: req.queryRequest,
}
}

err := loop.Send(msg)
if err != nil {
req.enqueue <- enqueueResult{status: failed}
Expand Down
5 changes: 0 additions & 5 deletions pkg/querier/queryrange/queryrangebase/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ type Config struct {
CacheResults bool `yaml:"cache_results"`
MaxRetries int `yaml:"max_retries"`
ShardedQueries bool `yaml:"parallelise_shardable_queries"`

// Required format for querier responses
RequiredQueryResponseFormat string `yaml:"required_query_response_format"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand All @@ -50,8 +47,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.CacheResults, "querier.cache-results", false, "Cache query results.")
f.BoolVar(&cfg.ShardedQueries, "querier.parallelise-shardable-queries", true, "Perform query parallelisations based on storage sharding configuration and query ASTs. This feature is supported only by the chunks storage engine.")

f.StringVar(&cfg.RequiredQueryResponseFormat, "frontend.required-query-response-format", "json", "The downstream querier is required to answer in the accepted format. Can be 'json' or 'protobuf'. Note: Both will still be routed over GRPC.")

cfg.ResultsCacheConfig.RegisterFlags(f)
}

Expand Down
3 changes: 0 additions & 3 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ func NewMiddleware(
}

var codec base.Codec = DefaultCodec
if cfg.RequiredQueryResponseFormat == "protobuf" {
codec = &RequestProtobufCodec{}
}

indexStatsTripperware, err := NewIndexStatsTripperware(cfg, log, limits, schema, codec, statsCache,
cacheGenNumLoader, retentionEnabled, metrics)
Expand Down
2 changes: 1 addition & 1 deletion tools/dev/loki-boltdb-storage-s3/config/loki.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ frontend:
compress_responses: true
log_queries_longer_than: 5s
max_outstanding_per_tenant: 512
encoding: protobuf
frontend_worker:
grpc_client_config:
max_send_msg_size: 1.048576e+08
Expand Down Expand Up @@ -98,7 +99,6 @@ query_range:
cache:
memcached_client:
addresses: memcached:11211
required_query_response_format: protobuf
schema_config:
configs:
- from: "2020-07-30"
Expand Down

0 comments on commit 23ac9c8

Please sign in to comment.