From 5b8d0e666de2221f08a4537d7b5eda0be79f939b Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 6 Dec 2023 21:40:04 +0100 Subject: [PATCH] Remove querier wait time metric. (#11233) **What this PR does / why we need it**: We would like to know how long a querier worker is idle to understand if workstealing would have an impact. The original metric was too noisy and its cardinality was too high. Instead, we are going to log the wait time. **Checklist** - [ ] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [ ] Tests updated - [ ] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --------- Co-authored-by: Danny Kopping --- pkg/loki/modules.go | 1 + pkg/querier/worker/frontend_processor.go | 2 +- pkg/querier/worker/frontend_processor_test.go | 2 +- pkg/querier/worker/processor_manager.go | 5 ++++- pkg/querier/worker/scheduler_processor.go | 9 ++++++--- pkg/querier/worker/scheduler_processor_test.go | 6 +++--- pkg/querier/worker/worker.go | 2 +- pkg/querier/worker/worker_test.go | 2 +- pkg/querier/worker_service.go | 6 ++++-- pkg/queue/metrics.go | 14 +++----------- pkg/queue/queue.go | 2 -- 11 files changed, 25 insertions(+), 26 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 246c2ef78241..e7848ef701a2 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -530,6 +530,7 @@ func (t *Loki) initQuerier() (services.Service, error) { internalHandler := queryrangebase.MergeMiddlewares(internalMiddlewares...).Wrap(handler) svc, err := querier.InitWorkerService( + logger, querierWorkerServiceConfig, prometheus.DefaultRegisterer, internalHandler, diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 3e77c3f0e91a..45c61862d059 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -58,7 +58,7 @@ func (fp *frontendProcessor) notifyShutdown(ctx context.Context, conn *grpc.Clie } // runOne loops, trying to establish a stream to the frontend to begin request processing. -func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) { +func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address, _ string) { client := frontendv1pb.NewFrontendClient(conn) backoff := backoff.New(ctx, processorBackoffConfig) diff --git a/pkg/querier/worker/frontend_processor_test.go b/pkg/querier/worker/frontend_processor_test.go index e446500dd804..cecdb7bfe27d 100644 --- a/pkg/querier/worker/frontend_processor_test.go +++ b/pkg/querier/worker/frontend_processor_test.go @@ -39,7 +39,7 @@ func TestRecvFailDoesntCancelProcess(t *testing.T) { running.Store(true) defer running.Store(false) - mgr.processQueriesOnSingleStream(ctx, cc, "test:12345") + mgr.processQueriesOnSingleStream(ctx, cc, "test:12345", "") }() test.Poll(t, time.Second, true, func() interface{} { diff --git a/pkg/querier/worker/processor_manager.go b/pkg/querier/worker/processor_manager.go index 5d675c88a657..3a2c8c338865 100644 --- a/pkg/querier/worker/processor_manager.go +++ b/pkg/querier/worker/processor_manager.go @@ -2,6 +2,7 @@ package worker import ( "context" + "strconv" "sync" "time" @@ -64,7 +65,9 @@ func (pm *processorManager) concurrency(n int) { n = 0 } + workerID := 0 for len(pm.cancels) < n { + workerID++ ctx, cancel := context.WithCancel(pm.ctx) pm.cancels = append(pm.cancels, cancel) @@ -75,7 +78,7 @@ func (pm *processorManager) concurrency(n int) { pm.currentProcessors.Inc() defer pm.currentProcessors.Dec() - pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address) + pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address, strconv.Itoa(workerID)) }() } diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 15e3985b60fb..16d0e59d1ed1 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -83,7 +83,7 @@ func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.Cli } } -func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address string) { +func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address, workerID string) { schedulerClient := sp.schedulerClientFactory(conn) // Run the querier loop (and so all the queries) in a dedicated context that we call the "execution context". @@ -104,7 +104,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con continue } - if err := sp.querierLoop(c, address, inflightQuery); err != nil { + if err := sp.querierLoop(c, address, inflightQuery, workerID); err != nil { // Do not log an error if the query-scheduler is shutting down. if s, ok := status.FromError(err); !ok || !strings.Contains(s.Message(), schedulerpb.ErrSchedulerIsNotRunning.Error()) { level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address) @@ -119,17 +119,20 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con } // process loops processing requests on an established stream. -func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool) error { +func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool, workerID string) error { // Build a child context so we can cancel a query when the stream is closed. ctx, cancel := context.WithCancel(c.Context()) defer cancel() for { + start := time.Now() request, err := c.Recv() if err != nil { return err } + level.Debug(sp.log).Log("msg", "received query", "worker", workerID, "wait_time_sec", time.Since(start).Seconds()) + inflightQuery.Store(true) // Handle the request on a "background" goroutine, so we go back to diff --git a/pkg/querier/worker/scheduler_processor_test.go b/pkg/querier/worker/scheduler_processor_test.go index b1971bdd7607..154ba1ae4fa7 100644 --- a/pkg/querier/worker/scheduler_processor_test.go +++ b/pkg/querier/worker/scheduler_processor_test.go @@ -41,7 +41,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) { requestHandler.On("Do", mock.Anything, mock.Anything).Return(&queryrange.LokiResponse{}, nil) - sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1") + sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1") // We expect at this point, the execution context has been canceled too. require.Error(t, loopClient.Context().Err()) @@ -91,7 +91,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) { }).Return(&queryrange.LokiResponse{}, nil) startTime := time.Now() - sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1") + sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1") assert.GreaterOrEqual(t, time.Since(startTime), time.Second) // We expect at this point, the execution context has been canceled too. @@ -122,7 +122,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) { requestHandler.On("Do", mock.Anything, mock.Anything).Return(&queryrange.LokiResponse{}, nil) - sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1") + sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1") // We expect no error in the log. assert.NotContains(t, logs.String(), "error") diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index a7bebfbfccf1..b2e50b205d14 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -70,7 +70,7 @@ type processor interface { // This method must react on context being finished, and stop when that happens. // // processorManager (not processor) is responsible for starting as many goroutines as needed for each connection. - processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) + processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address, workerID string) // notifyShutdown notifies the remote query-frontend or query-scheduler that the querier is // shutting down. diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index 2f1ccb98d309..68791b214f17 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -88,7 +88,7 @@ func getConcurrentProcessors(w *querierWorker) int { type mockProcessor struct{} -func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _ string) { +func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _, _ string) { <-ctx.Done() } diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index d0837e418065..5dba31f3eebc 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -3,6 +3,7 @@ package querier import ( "fmt" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" @@ -52,6 +53,7 @@ func (cfg WorkerServiceConfig) QuerierRunningStandalone() bool { // HTTP router for the Prometheus API routes. Then the external HTTP server will be passed // as a http.Handler to the frontend worker. func InitWorkerService( + logger log.Logger, cfg WorkerServiceConfig, reg prometheus.Registerer, handler queryrangebase.Handler, @@ -76,7 +78,7 @@ func InitWorkerService( *(cfg.QuerierWorkerConfig), cfg.SchedulerRing, handler, - util_log.Logger, + logger, reg, codec, ) @@ -102,7 +104,7 @@ func InitWorkerService( *(cfg.QuerierWorkerConfig), cfg.SchedulerRing, handler, - util_log.Logger, + logger, reg, codec, ) diff --git a/pkg/queue/metrics.go b/pkg/queue/metrics.go index 5d00edb1a3b1..769fb51c2370 100644 --- a/pkg/queue/metrics.go +++ b/pkg/queue/metrics.go @@ -6,10 +6,9 @@ import ( ) type Metrics struct { - queueLength *prometheus.GaugeVec // Per tenant - discardedRequests *prometheus.CounterVec // Per tenant - enqueueCount *prometheus.CounterVec // Per tenant and level - querierWaitTime *prometheus.HistogramVec // Per querier wait time + queueLength *prometheus.GaugeVec // Per tenant + discardedRequests *prometheus.CounterVec // Per tenant + enqueueCount *prometheus.CounterVec // Per tenant and level } func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem string) *Metrics { @@ -32,13 +31,6 @@ func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem st Name: "enqueue_count", Help: "Total number of enqueued (sub-)queries.", }, []string{"user", "level"}), - querierWaitTime: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ - Namespace: metricsNamespace, - Subsystem: subsystem, - Name: "querier_wait_seconds", - Help: "Time spend waiting for new requests.", - Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30, 60, 120, 240}, - }, []string{"querier"}), } } diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 4af4d2c903d7..aab4631e86e4 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -176,9 +176,7 @@ FindQueue: // We need to wait if there are no tenants, or no pending requests for given querier. for (q.queues.hasNoTenantQueues() || querierWait) && ctx.Err() == nil && !q.stopped { querierWait = false - start := time.Now() q.cond.Wait(ctx) - q.metrics.querierWaitTime.WithLabelValues(consumerID).Observe(time.Since(start).Seconds()) } if q.stopped {