diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 246c2ef782417..e7848ef701a25 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -530,6 +530,7 @@ func (t *Loki) initQuerier() (services.Service, error) { internalHandler := queryrangebase.MergeMiddlewares(internalMiddlewares...).Wrap(handler) svc, err := querier.InitWorkerService( + logger, querierWorkerServiceConfig, prometheus.DefaultRegisterer, internalHandler, diff --git a/pkg/querier/worker/frontend_processor.go b/pkg/querier/worker/frontend_processor.go index 3e77c3f0e91a8..45c61862d0598 100644 --- a/pkg/querier/worker/frontend_processor.go +++ b/pkg/querier/worker/frontend_processor.go @@ -58,7 +58,7 @@ func (fp *frontendProcessor) notifyShutdown(ctx context.Context, conn *grpc.Clie } // runOne loops, trying to establish a stream to the frontend to begin request processing. -func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) { +func (fp *frontendProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address, _ string) { client := frontendv1pb.NewFrontendClient(conn) backoff := backoff.New(ctx, processorBackoffConfig) diff --git a/pkg/querier/worker/frontend_processor_test.go b/pkg/querier/worker/frontend_processor_test.go index e446500dd804b..cecdb7bfe27d3 100644 --- a/pkg/querier/worker/frontend_processor_test.go +++ b/pkg/querier/worker/frontend_processor_test.go @@ -39,7 +39,7 @@ func TestRecvFailDoesntCancelProcess(t *testing.T) { running.Store(true) defer running.Store(false) - mgr.processQueriesOnSingleStream(ctx, cc, "test:12345") + mgr.processQueriesOnSingleStream(ctx, cc, "test:12345", "") }() test.Poll(t, time.Second, true, func() interface{} { diff --git a/pkg/querier/worker/processor_manager.go b/pkg/querier/worker/processor_manager.go index 5d675c88a6576..3a2c8c338865d 100644 --- a/pkg/querier/worker/processor_manager.go +++ b/pkg/querier/worker/processor_manager.go @@ -2,6 +2,7 @@ package worker import ( "context" + "strconv" "sync" "time" @@ -64,7 +65,9 @@ func (pm *processorManager) concurrency(n int) { n = 0 } + workerID := 0 for len(pm.cancels) < n { + workerID++ ctx, cancel := context.WithCancel(pm.ctx) pm.cancels = append(pm.cancels, cancel) @@ -75,7 +78,7 @@ func (pm *processorManager) concurrency(n int) { pm.currentProcessors.Inc() defer pm.currentProcessors.Dec() - pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address) + pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address, strconv.Itoa(workerID)) }() } diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index 15e3985b60fbd..16d0e59d1ed14 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -83,7 +83,7 @@ func (sp *schedulerProcessor) notifyShutdown(ctx context.Context, conn *grpc.Cli } } -func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address string) { +func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Context, conn *grpc.ClientConn, address, workerID string) { schedulerClient := sp.schedulerClientFactory(conn) // Run the querier loop (and so all the queries) in a dedicated context that we call the "execution context". @@ -104,7 +104,7 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con continue } - if err := sp.querierLoop(c, address, inflightQuery); err != nil { + if err := sp.querierLoop(c, address, inflightQuery, workerID); err != nil { // Do not log an error if the query-scheduler is shutting down. if s, ok := status.FromError(err); !ok || !strings.Contains(s.Message(), schedulerpb.ErrSchedulerIsNotRunning.Error()) { level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address) @@ -119,17 +119,20 @@ func (sp *schedulerProcessor) processQueriesOnSingleStream(workerCtx context.Con } // process loops processing requests on an established stream. -func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool) error { +func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string, inflightQuery *atomic.Bool, workerID string) error { // Build a child context so we can cancel a query when the stream is closed. ctx, cancel := context.WithCancel(c.Context()) defer cancel() for { + start := time.Now() request, err := c.Recv() if err != nil { return err } + level.Debug(sp.log).Log("msg", "received query", "worker", workerID, "wait_time_sec", time.Since(start).Seconds()) + inflightQuery.Store(true) // Handle the request on a "background" goroutine, so we go back to diff --git a/pkg/querier/worker/scheduler_processor_test.go b/pkg/querier/worker/scheduler_processor_test.go index b1971bdd76077..154ba1ae4fa73 100644 --- a/pkg/querier/worker/scheduler_processor_test.go +++ b/pkg/querier/worker/scheduler_processor_test.go @@ -41,7 +41,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) { requestHandler.On("Do", mock.Anything, mock.Anything).Return(&queryrange.LokiResponse{}, nil) - sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1") + sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1") // We expect at this point, the execution context has been canceled too. require.Error(t, loopClient.Context().Err()) @@ -91,7 +91,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) { }).Return(&queryrange.LokiResponse{}, nil) startTime := time.Now() - sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1") + sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1") assert.GreaterOrEqual(t, time.Since(startTime), time.Second) // We expect at this point, the execution context has been canceled too. @@ -122,7 +122,7 @@ func TestSchedulerProcessor_processQueriesOnSingleStream(t *testing.T) { requestHandler.On("Do", mock.Anything, mock.Anything).Return(&queryrange.LokiResponse{}, nil) - sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1") + sp.processQueriesOnSingleStream(workerCtx, nil, "127.0.0.1", "1") // We expect no error in the log. assert.NotContains(t, logs.String(), "error") diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index a7bebfbfccf14..b2e50b205d143 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -70,7 +70,7 @@ type processor interface { // This method must react on context being finished, and stop when that happens. // // processorManager (not processor) is responsible for starting as many goroutines as needed for each connection. - processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) + processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address, workerID string) // notifyShutdown notifies the remote query-frontend or query-scheduler that the querier is // shutting down. diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index 2f1ccb98d3097..68791b214f178 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -88,7 +88,7 @@ func getConcurrentProcessors(w *querierWorker) int { type mockProcessor struct{} -func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _ string) { +func (m mockProcessor) processQueriesOnSingleStream(ctx context.Context, _ *grpc.ClientConn, _, _ string) { <-ctx.Done() } diff --git a/pkg/querier/worker_service.go b/pkg/querier/worker_service.go index d0837e4180652..5dba31f3eebc4 100644 --- a/pkg/querier/worker_service.go +++ b/pkg/querier/worker_service.go @@ -3,6 +3,7 @@ package querier import ( "fmt" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" @@ -52,6 +53,7 @@ func (cfg WorkerServiceConfig) QuerierRunningStandalone() bool { // HTTP router for the Prometheus API routes. Then the external HTTP server will be passed // as a http.Handler to the frontend worker. func InitWorkerService( + logger log.Logger, cfg WorkerServiceConfig, reg prometheus.Registerer, handler queryrangebase.Handler, @@ -76,7 +78,7 @@ func InitWorkerService( *(cfg.QuerierWorkerConfig), cfg.SchedulerRing, handler, - util_log.Logger, + logger, reg, codec, ) @@ -102,7 +104,7 @@ func InitWorkerService( *(cfg.QuerierWorkerConfig), cfg.SchedulerRing, handler, - util_log.Logger, + logger, reg, codec, ) diff --git a/pkg/queue/metrics.go b/pkg/queue/metrics.go index 5d00edb1a3b16..769fb51c23708 100644 --- a/pkg/queue/metrics.go +++ b/pkg/queue/metrics.go @@ -6,10 +6,9 @@ import ( ) type Metrics struct { - queueLength *prometheus.GaugeVec // Per tenant - discardedRequests *prometheus.CounterVec // Per tenant - enqueueCount *prometheus.CounterVec // Per tenant and level - querierWaitTime *prometheus.HistogramVec // Per querier wait time + queueLength *prometheus.GaugeVec // Per tenant + discardedRequests *prometheus.CounterVec // Per tenant + enqueueCount *prometheus.CounterVec // Per tenant and level } func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem string) *Metrics { @@ -32,13 +31,6 @@ func NewMetrics(registerer prometheus.Registerer, metricsNamespace, subsystem st Name: "enqueue_count", Help: "Total number of enqueued (sub-)queries.", }, []string{"user", "level"}), - querierWaitTime: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{ - Namespace: metricsNamespace, - Subsystem: subsystem, - Name: "querier_wait_seconds", - Help: "Time spend waiting for new requests.", - Buckets: []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, 30, 60, 120, 240}, - }, []string{"querier"}), } } diff --git a/pkg/queue/queue.go b/pkg/queue/queue.go index 4af4d2c903d75..aab4631e86e4f 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/queue.go @@ -176,9 +176,7 @@ FindQueue: // We need to wait if there are no tenants, or no pending requests for given querier. for (q.queues.hasNoTenantQueues() || querierWait) && ctx.Err() == nil && !q.stopped { querierWait = false - start := time.Now() q.cond.Wait(ctx) - q.metrics.querierWaitTime.WithLabelValues(consumerID).Observe(time.Since(start).Seconds()) } if q.stopped {