Skip to content

Commit

Permalink
Bring back RED metrics for querier when processing scheduler requests. (
Browse files Browse the repository at this point in the history
#11097)

**What this PR does / why we need it**:
A previous change removed the RED metrics for the querier. This adds
them back as part of a middleware.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)
  • Loading branch information
jeschkies committed Nov 1, 2023
1 parent d22aa45 commit 14d303a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 3 deletions.
3 changes: 2 additions & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,8 @@ type Loki struct {

HTTPAuthMiddleware middleware.Interface

Codec Codec
Codec Codec
Metrics *server.Metrics
}

// New makes a new Loki.
Expand Down
11 changes: 9 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ func (t *Loki) initServer() (services.Service, error) {

// Loki handles signals on its own.
DisableSignalHandling(&t.Cfg.Server)
serv, err := server.New(t.Cfg.Server)

t.Metrics = server.NewServerMetrics(t.Cfg.Server)
serv, err := server.NewWithMetrics(t.Cfg.Server, t.Metrics)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -511,10 +513,15 @@ func (t *Loki) initQuerier() (services.Service, error) {
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler)))
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler)))

internalHandler := queryrangebase.MergeMiddlewares(
serverutil.RecoveryMiddleware,
queryrange.Instrument{Metrics: t.Metrics},
).Wrap(handler)

svc, err := querier.InitWorkerService(
querierWorkerServiceConfig,
prometheus.DefaultRegisterer,
serverutil.RecoveryMiddleware.Wrap(handler),
internalHandler,
t.Codec,
)
if err != nil {
Expand Down
23 changes: 23 additions & 0 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,10 @@ func (Codec) DecodeHTTPGrpcRequest(ctx context.Context, r *httpgrpc.HTTPRequest)

// DecodeHTTPGrpcResponse decodes an httpgrp.HTTPResponse to queryrangebase.Response.
func (Codec) DecodeHTTPGrpcResponse(r *httpgrpc.HTTPResponse, req queryrangebase.Request) (queryrangebase.Response, error) {
if r.Code/100 != 2 {
return nil, httpgrpc.Errorf(int(r.Code), string(r.Body))
}

headers := make(http.Header)
for _, header := range r.Headers {
headers[header.Key] = header.Values
Expand Down Expand Up @@ -702,6 +706,25 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht
}
}

func (c Codec) Path(r queryrangebase.Request) string {
switch request := r.(type) {
case *LokiRequest:
return "loki/api/v1/query_range"
case *LokiSeriesRequest:
return "loki/api/v1/series"
case *LabelRequest:
return request.Path() // NOTE: this could be either /label or /label/{name}/values endpoint. So forward the original path as it is.
case *LokiInstantRequest:
return "/loki/api/v1/query"
case *logproto.IndexStatsRequest:
return "/loki/api/v1/index/stats"
case *logproto.VolumeRequest:
return "/loki/api/v1/index/volume_range"
}

return "other"
}

func (p RequestProtobufCodec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) {
req, err := p.Codec.EncodeRequest(ctx, r)
if err != nil {
Expand Down
54 changes: 54 additions & 0 deletions pkg/querier/queryrange/instrument.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package queryrange

import (
"context"
"strconv"
"time"

"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/instrument"
"github.com/grafana/dskit/middleware"

"github.com/grafana/dskit/server"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase"
)

const (
method = "GET"
)

type Instrument struct {
*server.Metrics
}

var _ queryrangebase.Middleware = Instrument{}

// Wrap implements the queryrangebase.Middleware
func (i Instrument) Wrap(next queryrangebase.Handler) queryrangebase.Handler {
return queryrangebase.HandlerFunc(func(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
route := DefaultCodec.Path(r)
route = middleware.MakeLabelValue(route)
inflight := i.InflightRequests.WithLabelValues(method, route)
inflight.Inc()
defer inflight.Dec()

begin := time.Now()
result, err := next.Do(ctx, r)
i.observe(ctx, route, err, time.Since(begin))

return result, err
})
}

func (i Instrument) observe(ctx context.Context, route string, err error, duration time.Duration) {
respStatus := "200"
if err != nil {
if errResp, ok := httpgrpc.HTTPResponseFromError(err); ok {
respStatus = strconv.Itoa(int(errResp.Code))
} else {
respStatus = "500"
}
}
instrument.ObserveWithExemplar(ctx, i.RequestDuration.WithLabelValues(method, route, respStatus, "false"), duration.Seconds())
}

0 comments on commit 14d303a

Please sign in to comment.