Skip to content

Commit

Permalink
Remove querier wait time metric. (#11233)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
We would like to know how long a querier worker is idle to understand if
workstealing would have an impact. The original metric was too noisy and
its cardinality was too high. Instead, we are going to log the wait
time.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)

---------

Co-authored-by: Danny Kopping <[email protected]>
  • Loading branch information
jeschkies and dannykopping authored Dec 6, 2023
1 parent 0945b18 commit 5b8d0e6
Show file tree
Hide file tree
Showing 11 changed files with 25 additions and 26 deletions.
1 change: 1 addition & 0 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
internalHandler := queryrangebase.MergeMiddlewares(internalMiddlewares...).Wrap(handler)

svc, err := querier.InitWorkerService(
logger,
querierWorkerServiceConfig,
prometheus.DefaultRegisterer,
internalHandler,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/frontend_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (fp *frontendProcessor) notifyShutdown(ctx context.Context, conn *grpc.Clie
}

// runOne loops, trying to establish a stream to the frontend to begin request processing.
func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address, _ string) {
client := frontendv1pb.NewFrontendClient(conn)

backoff := backoff.New(ctx, processorBackoffConfig)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/frontend_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestRecvFailDoesntCancelProcess(t *testing.T) {
running.Store(true)
defer running.Store(false)

mgr.processQueriesOnSingleStream(ctx, cc, "test:12345")
mgr.processQueriesOnSingleStream(ctx, cc, "test:12345", "")
}()

test.Poll(t, time.Second, true, func() interface{} {
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/worker/processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package worker

import (
"context"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -64,7 +65,9 @@ func (pm *processorManager) concurrency(n int) {
n = 0
}

workerID := 0
for len(pm.cancels) < n {
workerID++
ctx, cancel := context.WithCancel(pm.ctx)
pm.cancels = append(pm.cancels, cancel)

Expand All @@ -75,7 +78,7 @@ func (pm *processorManager) concurrency(n int) {
pm.currentProcessors.Inc()
defer pm.currentProcessors.Dec()

pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address)
pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address, strconv.Itoa(workerID))
}()
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.Cli
}
}

func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address string) {
func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address, workerID string) {
schedulerClient := sp.schedulerClientFactory(conn)

// Run the querier loop (and so all the queries) in a dedicated context that we call the "execution context".
Expand All @@ -104,7 +104,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con
continue
}

if err := sp.querierLoop(c, address, inflightQuery); err != nil {
if err := sp.querierLoop(c, address, inflightQuery, workerID); err != nil {
// Do not log an error if the query-scheduler is shutting down.
if s, ok := status.FromError(err); !ok || !strings.Contains(s.Message(), schedulerpb.ErrSchedulerIsNotRunning.Error()) {
level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address)
Expand All @@ -119,17 +119,20 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con
}

// process loops processing requests on an established stream.
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool) error {
func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool, workerID string) error {
// Build a child context so we can cancel a query when the stream is closed.
ctx, cancel := context.WithCancel(c.Context())
defer cancel()

for {
start := time.Now()
request, err := c.Recv()
if err != nil {
return err
}

level.Debug(sp.log).Log("msg", "received query", "worker", workerID, "wait_time_sec", time.Since(start).Seconds())

inflightQuery.Store(true)

// Handle the request on a "background" goroutine, so we go back to
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/worker/scheduler_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) {

requestHandler.On("Do", mock.Anything, mock.Anything).Return(&queryrange.LokiResponse{}, nil)

sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1")
sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1")

// We expect at this point, the execution context has been canceled too.
require.Error(t, loopClient.Context().Err())
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) {
}).Return(&queryrange.LokiResponse{}, nil)

startTime := time.Now()
sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1")
sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1")
assert.GreaterOrEqual(t, time.Since(startTime), time.Second)

// We expect at this point, the execution context has been canceled too.
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) {

requestHandler.On("Do", mock.Anything, mock.Anything).Return(&queryrange.LokiResponse{}, nil)

sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1")
sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1")

// We expect no error in the log.
assert.NotContains(t, logs.String(), "error")
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type processor interface {
// This method must react on context being finished, and stop when that happens.
//
// processorManager (not processor) is responsible for starting as many goroutines as needed for each connection.
processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string)
processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address, workerID string)

// notifyShutdown notifies the remote query-frontend or query-scheduler that the querier is
// shutting down.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func getConcurrentProcessors(w *querierWorker) int {

type mockProcessor struct{}

func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _ string) {
func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _, _ string) {
<-ctx.Done()
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/worker_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
Expand Down Expand Up @@ -52,6 +53,7 @@ func (cfg WorkerServiceConfig) QuerierRunningStandalone() bool {
// HTTP router for the Prometheus API routes. Then the external HTTP server will be passed
// as a http.Handler to the frontend worker.
func InitWorkerService(
logger log.Logger,
cfg WorkerServiceConfig,
reg prometheus.Registerer,
handler queryrangebase.Handler,
Expand All @@ -76,7 +78,7 @@ func InitWorkerService(
*(cfg.QuerierWorkerConfig),
cfg.SchedulerRing,
handler,
util_log.Logger,
logger,
reg,
codec,
)
Expand All @@ -102,7 +104,7 @@ func InitWorkerService(
*(cfg.QuerierWorkerConfig),
cfg.SchedulerRing,
handler,
util_log.Logger,
logger,
reg,
codec,
)
Expand Down
14 changes: 3 additions & 11 deletions pkg/queue/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
)

type Metrics struct {
queueLength *prometheus.GaugeVec // Per tenant
discardedRequests *prometheus.CounterVec // Per tenant
enqueueCount *prometheus.CounterVec // Per tenant and level
querierWaitTime *prometheus.HistogramVec // Per querier wait time
queueLength *prometheus.GaugeVec // Per tenant
discardedRequests *prometheus.CounterVec // Per tenant
enqueueCount *prometheus.CounterVec // Per tenant and level
}

func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem string) *Metrics {
Expand All @@ -32,13 +31,6 @@ func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem st
Name: "enqueue_count",
Help: "Total number of enqueued (sub-)queries.",
}, []string{"user", "level"}),
querierWaitTime: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: metricsNamespace,
Subsystem: subsystem,
Name: "querier_wait_seconds",
Help: "Time spend waiting for new requests.",
Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30, 60, 120, 240},
}, []string{"querier"}),
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,7 @@ FindQueue:
// We need to wait if there are no tenants, or no pending requests for given querier.
for (q.queues.hasNoTenantQueues() || querierWait) && ctx.Err() == nil && !q.stopped {
querierWait = false
start := time.Now()
q.cond.Wait(ctx)
q.metrics.querierWaitTime.WithLabelValues(consumerID).Observe(time.Since(start).Seconds())
}

if q.stopped {
Expand Down

0 comments on commit 5b8d0e6

Please sign in to comment.