Skip to content

Commit

Permalink
[k174] Propagate query metrics and cache num information. (#11179)
Browse files Browse the repository at this point in the history
Backport 979530b from #11176

---

**What this PR does / why we need it**:
#10858 removed the extraction of the
query time header on the querier side and the generation of the cache
number. This change adds them back and uses the headers of the response
format.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)
- [ ] If the change is deprecating or removing a configuration option,
update the `deprecated-config.yaml` and `deleted-config.yaml` files
respectively in the `tools/deprecated-config-checker` directory.
[Example
PR](0d4416a)

Co-authored-by: Karsten Jeschkies <[email protected]>
  • Loading branch information
grafanabot and jeschkies authored Nov 8, 2023
1 parent 2515506 commit 968f37b
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 6 deletions.
11 changes: 9 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,10 +513,17 @@ func (t *Loki) initQuerier() (services.Service, error) {
t.Server.HTTP.Path("/loki/api/v1/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler)))
t.Server.HTTP.Path("/api/prom/tail").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.querierAPI.TailHandler)))

internalHandler := queryrangebase.MergeMiddlewares(
internalMiddlewares := []queryrangebase.Middleware{
serverutil.RecoveryMiddleware,
queryrange.Instrument{Metrics: t.Metrics},
).Wrap(handler)
}
if t.supportIndexDeleteRequest() && t.Cfg.CompactorConfig.RetentionEnabled {
internalMiddlewares = append(
internalMiddlewares,
queryrangebase.CacheGenNumberContextSetterMiddleware(t.cacheGenerationLoader),
)
}
internalHandler := queryrangebase.MergeMiddlewares(internalMiddlewares...).Wrap(handler)

svc, err := querier.InitWorkerService(
querierWorkerServiceConfig,
Expand Down
20 changes: 17 additions & 3 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ func (Codec) DecodeHTTPGrpcRequest(ctx context.Context, r *httpgrpc.HTTPRequest)
ctx = httpreq.InjectQueryTags(ctx, queryTags)
}

// Add query metrics
if queueTimeHeader := httpReq.Header.Get(string(httpreq.QueryQueueTimeHTTPHeader)); queueTimeHeader != "" {
queueTime, err := time.ParseDuration(queueTimeHeader)
if err == nil {
ctx = context.WithValue(ctx, httpreq.QueryQueueTimeHTTPHeader, queueTime)
}
}

// If there is not encoding flags in the context, we try the HTTP request.
if encFlags := httpreq.ExtractEncodingFlagsFromCtx(ctx); encFlags == nil {
encFlags = httpreq.ExtractEncodingFlagsFromProto(r)
Expand Down Expand Up @@ -524,13 +532,19 @@ func (Codec) EncodeHTTPGrpcResponse(_ context.Context, req *httpgrpc.HTTPRequest
return nil, err
}

return &httpgrpc.HTTPResponse{
httpRes := &httpgrpc.HTTPResponse{
Code: int32(http.StatusOK),
Body: buf.Bytes(),
Headers: []*httpgrpc.Header{
{Key: "Content-Type", Values: []string{"application/json; charset=UTF-8"}},
},
}, nil
}

for _, h := range res.GetHeaders() {
httpRes.Headers = append(httpRes.Headers, &httpgrpc.Header{Key: h.Name, Values: h.Values})
}

return httpRes, nil
}

func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*http.Request, error) {
Expand Down Expand Up @@ -1593,7 +1607,7 @@ func NewEmptyResponse(r queryrangebase.Request) (queryrangebase.Response, error)
},
}, nil
case *logproto.IndexStatsRequest:
return &logproto.IndexStatsResponse{}, nil
return &IndexStatsResponse{}, nil
case *logproto.VolumeRequest:
return &VolumeResponse{}, nil
default:
Expand Down
3 changes: 3 additions & 0 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,6 +1421,9 @@ func (badResponse) Reset() {}
func (badResponse) String() string { return "noop" }
func (badResponse) ProtoMessage() {}
func (badResponse) GetHeaders() []*queryrangebase.PrometheusResponseHeader { return nil }
func (b badResponse) WithHeaders([]queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
return b
}

type badReader struct{}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func Test_MaxQueryLookBack_Types(t *testing.T) {
From: model.Time(now.UnixMilli()),
Through: model.Time(now.Add(-90 * time.Minute).UnixMilli()),
},
expectedResponse: &logproto.IndexStatsResponse{},
expectedResponse: &IndexStatsResponse{},
},
{
request: &logproto.VolumeRequest{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ type Response interface {
proto.Message
// GetHeaders returns the HTTP headers in the response.
GetHeaders() []*PrometheusResponseHeader
WithHeaders([]PrometheusResponseHeader) Response
}
25 changes: 25 additions & 0 deletions pkg/querier/queryrange/queryrangebase/middleware.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package queryrangebase

import (
"context"
"net/http"

"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/tenant"

"github.com/grafana/loki/pkg/querier/queryrange/queryrangebase/definitions"
)

const (
Expand All @@ -28,3 +31,25 @@ func CacheGenNumberHeaderSetterMiddleware(cacheGenNumbersLoader CacheGenNumberLo
})
})
}

func CacheGenNumberContextSetterMiddleware(cacheGenNumbersLoader CacheGenNumberLoader) Middleware {
return MiddlewareFunc(func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, req Request) (Response, error) {
userIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}

cacheGenNumber := cacheGenNumbersLoader.GetResultsCacheGenNumber(userIDs)

res, err := next.Do(ctx, req)
if err != nil {
return nil, err
}
header := definitions.PrometheusResponseHeader{
Name: ResultsCacheGenNumberHeaderName,
Values: []string{cacheGenNumber}}
return res.WithHeaders([]definitions.PrometheusResponseHeader{header}), nil
})
})
}
18 changes: 18 additions & 0 deletions pkg/querier/queryrange/queryrangebase/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,24 @@ func (resp *PrometheusResponse) minTime() int64 {
return result[0].Samples[0].TimestampMs
}

func convertPrometheusResponseHeadersToPointers(h []PrometheusResponseHeader) []*PrometheusResponseHeader {
if h == nil {
return nil
}

resp := make([]*PrometheusResponseHeader, len(h))
for i := range h {
resp[i] = &h[i]
}

return resp
}

func (resp *PrometheusResponse) WithHeaders(h []PrometheusResponseHeader) Response {
resp.Headers = convertPrometheusResponseHeadersToPointers(h)
return resp
}

// NewEmptyPrometheusResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusResponse() *PrometheusResponse {
return &PrometheusResponse{
Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/queryrange/views.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ func (v *LokiSeriesResponseView) GetHeaders() []*queryrangebase.PrometheusRespon
return v.headers
}

func (v *LokiSeriesResponseView) WithHeaders(h []queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
v.headers = convertPrometheusResponseHeadersToPointers(h)
return v
}

// Implement proto.Message
func (v *LokiSeriesResponseView) Reset() {}
func (v *LokiSeriesResponseView) String() string { return "" }
Expand Down Expand Up @@ -240,6 +245,10 @@ func (v *MergedSeriesResponseView) GetHeaders() []*queryrangebase.PrometheusResp
return v.headers
}

func (v *MergedSeriesResponseView) WithHeaders([]queryrangebase.PrometheusResponseHeader) queryrangebase.Response {
return v
}

// Implement proto.Message
func (v *MergedSeriesResponseView) Reset() {}
func (v *MergedSeriesResponseView) String() string { return "" }
Expand Down

0 comments on commit 968f37b

Please sign in to comment.