Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove querier wait time metric. #11233

Merged
merged 9 commits into from
Dec 6, 2023
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
internalHandler := queryrangebase.MergeMiddlewares(internalMiddlewares...).Wrap(handler)

svc, err := querier.InitWorkerService(
logger,
querierWorkerServiceConfig,
prometheus.DefaultRegisterer,
internalHandler,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (fp *frontendProcessor) notifyShutdown(ctx context.Context, conn *grpc.Clie
}

// runOne loops, trying to establish a stream to the frontend to begin request processing.
func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address, _ string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the _ param intentional here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, processQueriesOnSingleStream implements the processor interface and we don't use the worker ID in the case of the frontend processor.

client := frontendv1pb.NewFrontendClient(conn)

backoff := backoff.New(ctx, processorBackoffConfig)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/frontend_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestRecvFailDoesntCancelProcess(t *testing.T) {
running.Store(true)
defer running.Store(false)

mgr.processQueriesOnSingleStream(ctx, cc, "test:12345")
mgr.processQueriesOnSingleStream(ctx, cc, "test:12345", "1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is "1" necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Since it's ignored we don't need it here.

}()

test.Poll(t, time.Second, true, func() interface{} {
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/worker/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package worker

import (
"context"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -64,7 +65,9 @@ func (pm *processorManager) concurrency(n int) {
n = 0
}

workerId := 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am guessing the workerId does not matter much here. but there is a chance here of re-using workerIds if the concurrency value changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I wonder what happens then. Are we restarting the querier?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when more schedulers get added? which means each worker gets a smaller concurrency value.
not something that is often done in practice, so it should be alright I think

for len(pm.cancels) < n {
workerId++
ctx, cancel := context.WithCancel(pm.ctx)
pm.cancels = append(pm.cancels, cancel)

Expand All @@ -75,7 +78,7 @@ func (pm *processorManager) concurrency(n int) {
pm.currentProcessors.Inc()
defer pm.currentProcessors.Dec()

pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address)
pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address, strconv.Itoa(workerId))
}()
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.Cli
}
}

func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address string) {
func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address, workerId string) {
schedulerClient := sp.schedulerClientFactory(conn)

// Run the querier loop (and so all the queries) in a dedicated context that we call the "execution context".
Expand All @@ -104,7 +104,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con
continue
}

if err := sp.querierLoop(c, address, inflightQuery); err != nil {
if err := sp.querierLoop(c, address, inflightQuery, workerId); err != nil {
// Do not log an error if the query-scheduler is shutting down.
if s, ok := status.FromError(err); !ok || !strings.Contains(s.Message(), schedulerpb.ErrSchedulerIsNotRunning.Error()) {
level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address)
Expand All @@ -119,17 +119,20 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con
}

// process loops processing requests on an established stream.
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool) error {
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool, workerId string) error {
// Build a child context so we can cancel a query when the stream is closed.
ctx, cancel := context.WithCancel(c.Context())
defer cancel()

for {
start := time.Now()
request, err := c.Recv()
if err != nil {
return err
}

level.Debug(sp.log).Log("msg", "received query", "worker", workerId, "wait-time", time.Since(start))
jeschkies marked this conversation as resolved.
Show resolved Hide resolved

inflightQuery.Store(true)

// Handle the request on a "background" goroutine, so we go back to
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/worker/scheduler_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) {

requestHandler.On("Do", mock.Anything, mock.Anything).Return(&queryrange.LokiResponse{}, nil)

sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1")
sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1")

// We expect at this point, the execution context has been canceled too.
require.Error(t, loopClient.Context().Err())
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) {
}).Return(&queryrange.LokiResponse{}, nil)

startTime := time.Now()
sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1")
sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1")
assert.GreaterOrEqual(t, time.Since(startTime), time.Second)

// We expect at this point, the execution context has been canceled too.
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) {

requestHandler.On("Do", mock.Anything, mock.Anything).Return(&queryrange.LokiResponse{}, nil)

sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1")
sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1")

// We expect no error in the log.
assert.NotContains(t, logs.String(), "error")
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type processor interface {
// This method must react on context being finished, and stop when that happens.
//
// processorManager (not processor) is responsible for starting as many goroutines as needed for each connection.
processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string)
processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address, workerId string)

// notifyShutdown notifies the remote query-frontend or query-scheduler that the querier is
// shutting down.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func getConcurrentProcessors(w *querierWorker) int {

type mockProcessor struct{}

func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _ string) {
func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _, _ string) {
<-ctx.Done()
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -52,6 +53,7 @@ func (cfg WorkerServiceConfig) QuerierRunningStandalone() bool {
// HTTP router for the Prometheus API routes. Then the external HTTP server will be passed
// as a http.Handler to the frontend worker.
func InitWorkerService(
logger log.Logger,
cfg WorkerServiceConfig,
reg prometheus.Registerer,
handler queryrangebase.Handler,
Expand All @@ -76,7 +78,7 @@ func InitWorkerService(
*(cfg.QuerierWorkerConfig),
cfg.SchedulerRing,
handler,
util_log.Logger,
logger,
reg,
codec,
)
Expand All @@ -102,7 +104,7 @@ func InitWorkerService(
*(cfg.QuerierWorkerConfig),
cfg.SchedulerRing,
handler,
util_log.Logger,
logger,
reg,
codec,
)
Expand Down
14 changes: 3 additions & 11 deletions pkg/queue/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
)

type Metrics struct {
queueLength *prometheus.GaugeVec // Per tenant
discardedRequests *prometheus.CounterVec // Per tenant
enqueueCount *prometheus.CounterVec // Per tenant and level
querierWaitTime *prometheus.HistogramVec // Per querier wait time
queueLength *prometheus.GaugeVec // Per tenant
discardedRequests *prometheus.CounterVec // Per tenant
enqueueCount *prometheus.CounterVec // Per tenant and level
}

func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem string) *Metrics {
Expand All @@ -32,13 +31,6 @@ func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem st
Name: "enqueue_count",
Help: "Total number of enqueued (sub-)queries.",
}, []string{"user", "level"}),
querierWaitTime: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: subsystem,
Name: "querier_wait_seconds",
Help: "Time spend waiting for new requests.",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30, 60, 120, 240},
}, []string{"querier"}),
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,7 @@ FindQueue:
// We need to wait if there are no tenants, or no pending requests for given querier.
for (q.queues.hasNoTenantQueues() || querierWait) && ctx.Err() == nil && !q.stopped {
querierWait = false
start := time.Now()
q.cond.Wait(ctx)
q.metrics.querierWaitTime.WithLabelValues(consumerID).Observe(time.Since(start).Seconds())
}

if q.stopped {
Expand Down
Loading