From 35b5c320f21e9cb663bbc2ac5640e846e502651b Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Tue, 31 Oct 2023 17:32:37 +0100 Subject: [PATCH 1/2] Bring back RED metrics for querier when processing scheduler requests. --- pkg/loki/loki.go | 3 +- pkg/loki/modules.go | 11 +++--- pkg/querier/queryrange/codec.go | 19 +++++++++++ pkg/querier/queryrange/instrument.go | 50 +++++++++++++++++----------- 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 99717057566cf..54a0a52275dcc 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -332,7 +332,8 @@ type Loki struct { HTTPAuthMiddleware middleware.Interface - Codec Codec + Codec Codec + Metrics *server.Metrics } // New makes a new Loki. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index fb9d32cfdde3b..3edaaee75db11 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -142,7 +142,9 @@ func (t *Loki) initServer() (services.Service, error) { // Loki handles signals on its own. DisableSignalHandling(&t.Cfg.Server) - serv, err := server.New(t.Cfg.Server) + + t.Metrics = server.NewServerMetrics(t.Cfg.Server) + serv, err := server.NewWithMetrics(t.Cfg.Server, t.Metrics) if err != nil { return nil, err } @@ -515,12 +517,7 @@ func (t *Loki) initQuerier() (services.Service, error) { internalHandler := queryrangebase.MergeMiddlewares( serverutil.RecoveryMiddleware, - queryrange.Instrument{ - QueryHandlerMetrics: queryrange.NewQueryHandlerMetrics( - prometheus.DefaultRegisterer, - t.Cfg.MetricsNamespace, - ), - }, + queryrange.Instrument{Metrics: t.Metrics}, ).Wrap(handler) svc, err := querier.InitWorkerService( diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 05f1f4379922f..42a5afe53ca82 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -702,6 +702,25 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht } } +func (c Codec) Path(r queryrangebase.Request) string { + switch request := r.(type) { + case *LokiRequest: + return "loki/api/v1/query_range" + case *LokiSeriesRequest: + return "loki/api/v1/series" + case *LabelRequest: + return request.Path() // NOTE: this could be either /label or /label/{name}/values endpoint. So forward the original path as it is. + case *LokiInstantRequest: + return "/loki/api/v1/query" + case *logproto.IndexStatsRequest: + return "/loki/api/v1/index/stats" + case *logproto.VolumeRequest: + return "/loki/api/v1/index/volume_range" + } + + return "other" +} + func (p RequestProtobufCodec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) { req, err := p.Codec.EncodeRequest(ctx, r) if err != nil { diff --git a/pkg/querier/queryrange/instrument.go b/pkg/querier/queryrange/instrument.go index 577bb1fd1f254..f4e3b7ec51e01 100644 --- a/pkg/querier/queryrange/instrument.go +++ b/pkg/querier/queryrange/instrument.go @@ -2,10 +2,15 @@ package queryrange import ( "context" - "fmt" + "strconv" + "time" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/grafana/dskit/grpcutil" + "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/instrument" + "github.com/grafana/dskit/middleware" + + "github.com/grafana/dskit/server" "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" ) @@ -14,22 +19,8 @@ const ( gRPC = "gRPC" ) -type QueryHandlerMetrics struct { - InflightRequests *prometheus.GaugeVec -} - -func NewQueryHandlerMetrics(registerer prometheus.Registerer, metricsNamespace string) *QueryHandlerMetrics { - return &QueryHandlerMetrics{ - InflightRequests: promauto.With(registerer).NewGaugeVec(prometheus.GaugeOpts{ - Namespace: metricsNamespace, - Name: "inflight_requests", - Help: "Current number of inflight requests.", - }, []string{"method", "route"}), - } -} - type Instrument struct { - *QueryHandlerMetrics + *server.Metrics } var _ queryrangebase.Middleware = Instrument{} @@ -37,11 +28,30 @@ var _ queryrangebase.Middleware = Instrument{} // Wrap implements the queryrangebase.Middleware func (i Instrument) Wrap(next queryrangebase.Handler) queryrangebase.Handler { return queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - route := fmt.Sprintf("%T", r) + route := DefaultCodec.Path(r) + route = middleware.MakeLabelValue(route) inflight := i.InflightRequests.WithLabelValues(gRPC, route) inflight.Inc() defer inflight.Dec() - return next.Do(ctx, r) + begin := time.Now() + result, err := next.Do(ctx, r) + i.observe(ctx, route, err, time.Since(begin)) + + return result, err }) } + +func (i Instrument) observe(ctx context.Context, method string, err error, duration time.Duration) { + respStatus := "success" + if err != nil { + if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { + respStatus = strconv.Itoa(int(errResp.Code)) + } else if grpcutil.IsCanceled(err) { + respStatus = "cancel" + } else { + respStatus = "error" + } + } + instrument.ObserveWithExemplar(ctx, i.RequestDuration.WithLabelValues(gRPC, method, respStatus, "false"), duration.Seconds()) +} From c9c7266966445d725bf4ec42266b664b701e9617 Mon Sep 17 00:00:00 2001 From: Karsten Jeschkies Date: Wed, 1 Nov 2023 10:46:24 +0100 Subject: [PATCH 2/2] Handle errors better --- pkg/querier/queryrange/codec.go | 4 ++++ pkg/querier/queryrange/instrument.go | 15 ++++++--------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 42a5afe53ca82..7a604780219cc 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -497,6 +497,10 @@ func (Codec) DecodeHTTPGrpcRequest(ctx context.Context, r *httpgrpc.HTTPRequest) // DecodeHTTPGrpcResponse decodes an httpgrp.HTTPResponse to queryrangebase.Response. func (Codec) DecodeHTTPGrpcResponse(r *httpgrpc.HTTPResponse, req queryrangebase.Request) (queryrangebase.Response, error) { + if r.Code/100 != 2 { + return nil, httpgrpc.Errorf(int(r.Code), string(r.Body)) + } + headers := make(http.Header) for _, header := range r.Headers { headers[header.Key] = header.Values diff --git a/pkg/querier/queryrange/instrument.go b/pkg/querier/queryrange/instrument.go index f4e3b7ec51e01..8c32fad4ca304 100644 --- a/pkg/querier/queryrange/instrument.go +++ b/pkg/querier/queryrange/instrument.go @@ -5,7 +5,6 @@ import ( "strconv" "time" - "github.com/grafana/dskit/grpcutil" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/instrument" "github.com/grafana/dskit/middleware" @@ -16,7 +15,7 @@ import ( ) const ( - gRPC = "gRPC" + method = "GET" ) type Instrument struct { @@ -30,7 +29,7 @@ func (i Instrument) Wrap(next queryrangebase.Handler) queryrangebase.Handler { return queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { route := DefaultCodec.Path(r) route = middleware.MakeLabelValue(route) - inflight := i.InflightRequests.WithLabelValues(gRPC, route) + inflight := i.InflightRequests.WithLabelValues(method, route) inflight.Inc() defer inflight.Dec() @@ -42,16 +41,14 @@ func (i Instrument) Wrap(next queryrangebase.Handler) queryrangebase.Handler { }) } -func (i Instrument) observe(ctx context.Context, method string, err error, duration time.Duration) { - respStatus := "success" +func (i Instrument) observe(ctx context.Context, route string, err error, duration time.Duration) { + respStatus := "200" if err != nil { if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok { respStatus = strconv.Itoa(int(errResp.Code)) - } else if grpcutil.IsCanceled(err) { - respStatus = "cancel" } else { - respStatus = "error" + respStatus = "500" } } - instrument.ObserveWithExemplar(ctx, i.RequestDuration.WithLabelValues(gRPC, method, respStatus, "false"), duration.Seconds()) + instrument.ObserveWithExemplar(ctx, i.RequestDuration.WithLabelValues(method, route, respStatus, "false"), duration.Seconds()) }