Skip to content

Commit

Permalink
Handle both request type on the querier side
Browse files Browse the repository at this point in the history
  • Loading branch information
jeschkies committed Oct 27, 2023
1 parent 23ac9c8 commit 8307ecf
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 13 deletions.
57 changes: 48 additions & 9 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/loki/pkg/lokifrontend/frontend/v2/frontendv2pb"
"github.com/grafana/loki/pkg/querier/queryrange"
querier_stats "github.com/grafana/loki/pkg/querier/stats"
"github.com/grafana/loki/pkg/scheduler/schedulerpb"
httpgrpcutil "github.com/grafana/loki/pkg/util/httpgrpc"
Expand Down Expand Up @@ -153,7 +154,12 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
}
logger := util_log.WithContext(ctx, sp.log)

sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.GetHttpRequest())
switch r := request.Request.(type) {
case *schedulerpb.SchedulerToQuerier_HttpRequest:
sp.runHTTPRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, r.HttpRequest)
case *schedulerpb.SchedulerToQuerier_QueryRequest:
sp.runQueryRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, r.QueryRequest)
}
sp.metrics.inflightRequests.Dec()
// Report back to scheduler that processing of the query has finished.
if err := c.Send(&schedulerpb.QuerierToScheduler{}); err != nil {
Expand All @@ -163,7 +169,34 @@ func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_Quer
}
}

func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest) {
func (sp *schedulerProcessor) runQueryRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *queryrange.QueryRequest) {
var stats *querier_stats.Stats
if statsEnabled {
stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
}

// TODO: handle errors
r, _ := queryrange.QueryRequestUnwrap(request)
resp, _ := sp.handler.Do(ctx, r)

response, _ := queryrange.QueryResponseWrap(resp)

logger = log.With(logger, "frontend", frontendAddress)

// TODO: Ensure responses that are too big are not retried.

result := &frontendv2pb.QueryResultRequest{
QueryID: queryID,
Response: &frontendv2pb.QueryResultRequest_QueryResponse{
QueryResponse: response,
},
Stats: stats,
}

sp.reply(ctx, logger, frontendAddress, result)
}

func (sp *schedulerProcessor) runHTTPRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest) {
var stats *querier_stats.Stats
if statsEnabled {
stats, ctx = querier_stats.ContextWithEmptyStats(ctx)
Expand All @@ -184,20 +217,26 @@ func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger,
}
}

result := &frontendv2pb.QueryResultRequest{
QueryID: queryID,
Response: &frontendv2pb.QueryResultRequest_HttpResponse{
HttpResponse: response,
},
Stats: stats,
}

sp.reply(ctx, logger, frontendAddress, result)
}

func (sp *schedulerProcessor) reply(ctx context.Context, logger log.Logger, frontendAddress string, result *frontendv2pb.QueryResultRequest) {
runPoolWithBackoff(
ctx,
logger,
sp.frontendPool,
frontendAddress,
func(c client.PoolClient) error {
// Response is empty and uninteresting.
_, err := c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{
QueryID: queryID,
Response: &frontendv2pb.QueryResultRequest_HttpResponse{
HttpResponse: response,
},
Stats: stats,
})
_, err := c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, result)
if err != nil {
level.Error(logger).Log("msg", "error notifying frontend about finished query", "err", err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker/scheduler_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) {
QueryID: 1,
Request: &schedulerpb.SchedulerToQuerier_HttpRequest{
HttpRequest: &httpgrpc.HTTPRequest{
Method: "GET",
Url: `/loki/api/v1/query_range?query={foo="bar"}&step=10&limit=200&direction=FORWARD`,
Method: "GET",
Url: `/loki/api/v1/query_range?query={foo="bar"}&step=10&limit=200&direction=FORWARD`,
},
},
FrontendAddress: "127.0.0.2",
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func newExecutionContext(workerCtx context.Context, logger log.Logger) (execCtx
return
}

// handle converts the request and applies it to the handler.
func handle(ctx context.Context, request *httpgrpc.HTTPRequest, handler RequestHandler, codec GRPCCodec) *httpgrpc.HTTPResponse {
// handleHTTPRequest converts the request and applies it to the handler.
func handleHTTPRequest(ctx context.Context, request *httpgrpc.HTTPRequest, handler RequestHandler, codec GRPCCodec) *httpgrpc.HTTPResponse {
req, ctx, err := codec.DecodeHTTPGrpcRequest(ctx, request)
if err != nil {
response, ok := httpgrpc.HTTPResponseFromError(err)
Expand Down

0 comments on commit 8307ecf

Please sign in to comment.