From 3935fb8d70812d6ad25c3299d68e75a6bf1ea2be Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 27 Oct 2023 15:45:09 +0200 Subject: [PATCH] Improve observability of index queries (#11064) **What this PR does / why we need it**: Requests to the following APIs previously had limited or incorrect logging, which made understanding their behaviour at runtime difficult. [GET /loki/api/v1/labels](https://grafana.com/docs/loki/latest/reference/api/#list-labels-within-a-range-of-time) [GET /loki/api/v1/label//values](https://grafana.com/docs/loki/latest/reference/api/#list-label-values-within-a-range-of-time) [GET /loki/api/v1/series](https://grafana.com/docs/loki/latest/reference/api/#list-series) [GET /loki/api/v1/index/stats](https://grafana.com/docs/loki/latest/reference/api/#index-stats) [GET /loki/api/v1/index/volume](https://grafana.com/docs/loki/latest/reference/api/#volume) [GET /loki/api/v1/index/volume_range](https://grafana.com/docs/loki/latest/reference/api/#volume) All of these APIs now have querier and query-frontend logs; sharding and time-based splitting are applied to these requests, and it's valuable to see all requests. --- pkg/logql/metrics.go | 167 ++++++++++++++++++---------- pkg/logql/metrics_test.go | 10 +- pkg/querier/http.go | 45 +++++++- pkg/querier/queryrange/codec.go | 32 ++++++ pkg/querier/queryrange/roundtrip.go | 23 +++- pkg/querier/queryrange/stats.go | 17 ++- 6 files changed, 218 insertions(+), 76 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 341fb97fbe87e..049001b935fa6 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -18,8 +18,10 @@ import ( "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/util/spanlogger" ) const ( @@ -28,6 +30,7 @@ const ( QueryTypeLimited = "limited" QueryTypeLabels = "labels" QueryTypeSeries = "series" + QueryTypeStats = "stats" QueryTypeVolume = "volume" latencyTypeSlow = "slow" @@ -89,7 +92,7 @@ func RecordRangeAndInstantQueryMetrics( result promql_parser.Value, ) { var ( - logger = util_log.WithContext(ctx, log) + logger = fixLogger(ctx, log) rt = string(GetRangeType(p)) latencyType = latencyTypeFast returnedLines = 0 @@ -111,7 +114,7 @@ func RecordRangeAndInstantQueryMetrics( queryTags, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string) // it's ok to be empty. - logValues := make([]interface{}, 0, 30) + logValues := make([]interface{}, 0, 50) logValues = append(logValues, []interface{}{ "latency", latencyType, // this can be used to filter log lines. @@ -197,7 +200,7 @@ func RecordLabelQueryMetrics( stats logql_stats.Result, ) { var ( - logger = util_log.WithContext(ctx, log) + logger = fixLogger(ctx, log) latencyType = latencyTypeFast queryType = QueryTypeLabels ) @@ -211,32 +214,32 @@ func RecordLabelQueryMetrics( level.Info(logger).Log( "latency", latencyType, "query_type", queryType, + "splits", stats.Summary.Splits, + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), "length", end.Sub(start), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "label", label, "query", query, - "splits", stats.Summary.Splits, - "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), - "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), + "query_hash", HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned, ) - sharded := strconv.FormatBool(false) - if stats.Summary.Shards > 1 { - sharded = strconv.FormatBool(true) - } + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) +} - bytesPerSecond.WithLabelValues(status, queryType, "", latencyType, sharded). - Observe(float64(stats.Summary.BytesProcessedPerSecond)) - execLatency.WithLabelValues(status, queryType, ""). - Observe(stats.Summary.ExecTime) - chunkDownloadLatency.WithLabelValues(status, queryType, ""). - Observe(stats.ChunksDownloadTime().Seconds()) - duplicatesTotal.Add(float64(stats.TotalDuplicates())) - chunkDownloadedTotal.WithLabelValues(status, queryType, ""). - Add(float64(stats.TotalChunksDownloaded())) - ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) +// fixLogger forces the given logger to include a caller=metrics.go kv pair. +// The given logger might be a spanlogger instance, in which case it only logs caller=spanlogger.go:. +// We use `caller=metrics.go` when querying our logs for performance issues, and some logs were missing. +func fixLogger(ctx context.Context, logger log.Logger) log.Logger { + nl := util_log.WithContext(ctx, logger) + if _, ok := logger.(*spanlogger.SpanLogger); ok { + return log.With(nl, "caller", "metrics.go") + } + return nl } func PrintMatches(matches []string) string { @@ -244,16 +247,9 @@ func PrintMatches(matches []string) string { return strings.Join(matches, ":") } -func RecordSeriesQueryMetrics( - ctx context.Context, - log log.Logger, - start, end time.Time, - match []string, - status string, - stats logql_stats.Result, -) { +func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, match []string, status string, shards []string, stats logql_stats.Result) { var ( - logger = util_log.WithContext(ctx, log) + logger = fixLogger(ctx, log) latencyType = latencyTypeFast queryType = QueryTypeSeries ) @@ -264,46 +260,72 @@ func RecordSeriesQueryMetrics( latencyType = latencyTypeSlow } - // we also log queries, useful for troubleshooting slow queries. - level.Info(logger).Log( + shard := extractShard(shards) + + logValues := make([]interface{}, 0, 15) + logValues = append(logValues, "latency", latencyType, "query_type", queryType, + "splits", stats.Summary.Splits, + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), "length", end.Sub(start), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "match", PrintMatches(match), - "splits", stats.Summary.Splits, - "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), - "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), - "total_entries", stats.Summary.TotalEntriesReturned, + "query_hash", HashedQuery(PrintMatches(match)), + "total_entries", stats.Summary.TotalEntriesReturned) + + if shard != nil { + logValues = append(logValues, + "shard_num", shard.Shard, + "shard_count", shard.Of, + ) + } + + level.Info(logger).Log(logValues...) + + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) +} + +func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, stats logql_stats.Result) { + var ( + logger = fixLogger(ctx, log) + latencyType = latencyTypeFast + queryType = QueryTypeStats ) - sharded := strconv.FormatBool(false) - if stats.Summary.Shards > 1 { - sharded = strconv.FormatBool(true) + // Tag throughput metric by latency type based on a threshold. + // Latency below the threshold is fast, above is slow. + if stats.Summary.ExecTime > slowQueryThresholdSecond { + latencyType = latencyTypeSlow } - bytesPerSecond.WithLabelValues(status, queryType, "", latencyType, sharded). - Observe(float64(stats.Summary.BytesProcessedPerSecond)) - execLatency.WithLabelValues(status, queryType, ""). - Observe(stats.Summary.ExecTime) - chunkDownloadLatency.WithLabelValues(status, queryType, ""). - Observe(stats.ChunksDownloadTime().Seconds()) - duplicatesTotal.Add(float64(stats.TotalDuplicates())) - chunkDownloadedTotal.WithLabelValues(status, queryType, ""). - Add(float64(stats.TotalChunksDownloaded())) - ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) + + logValues := make([]interface{}, 0, 15) + logValues = append(logValues, + "latency", latencyType, + "query_type", queryType, + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "length", end.Sub(start), + "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), + "status", status, + "query", query, + "query_hash", HashedQuery(query), + "total_entries", stats.Summary.TotalEntriesReturned) + + level.Info(logger).Log(logValues...) + + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } -func RecordVolumeQueryMetrics( - ctx context.Context, - log log.Logger, - start, end time.Time, - query string, - status string, - stats logql_stats.Result, -) { +func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, limit uint32, step time.Duration, status string, stats logql_stats.Result) { var ( - logger = util_log.WithContext(ctx, log) + logger = fixLogger(ctx, log) latencyType = latencyTypeFast queryType = QueryTypeVolume ) @@ -314,23 +336,36 @@ func RecordVolumeQueryMetrics( latencyType = latencyTypeSlow } + rangeType := "range" + if step == 0 { + rangeType = "instant" + } + level.Info(logger).Log( "latency", latencyType, "query_type", queryType, "query", query, "query_hash", HashedQuery(query), + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "range_type", rangeType, + "step", step, + "limit", limit, "length", end.Sub(start), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "splits", stats.Summary.Splits, "total_entries", stats.Summary.TotalEntriesReturned, + // cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats "cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, "cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, + "cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored, "cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), ) - execLatency.WithLabelValues(status, queryType, ""). - Observe(stats.Summary.ExecTime) + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } func recordUsageStats(queryType string, stats logql_stats.Result) { @@ -391,3 +426,17 @@ func tagsToKeyValues(queryTags string) []interface{} { return res } + +func extractShard(shards []string) *astmapper.ShardAnnotation { + if len(shards) == 0 { + return nil + } + + var shard astmapper.ShardAnnotation + shard, err := astmapper.ParseShard(shards[0]) + if err != nil { + return nil + } + + return &shard +} diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index e3cd64bdf0b8f..950a16bb39a73 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -109,9 +109,9 @@ func TestLogLabelsQuery(t *testing.T) { TotalEntriesReturned: 12, }, }) - require.Equal(t, + require.Regexp(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo query= splits=0 throughput=100kB total_bytes=100kB total_entries=12\n", + "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels splits=0 start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 label=foo query= query_hash=2166136261 total_entries=12\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) @@ -127,7 +127,7 @@ func TestLogSeriesQuery(t *testing.T) { sp := opentracing.StartSpan("") ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp) now := time.Now() - RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", stats.Result{ + RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", []string{}, stats.Result{ Summary: stats.Summary{ BytesProcessedPerSecond: 100000, ExecTime: 25.25, @@ -135,9 +135,9 @@ func TestLogSeriesQuery(t *testing.T) { TotalEntriesReturned: 10, }, }) - require.Equal(t, + require.Regexp(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series length=1h0m0s duration=25.25s status=200 match=\"{container_name=~\\\"prometheus.*\\\", component=\\\"server\\\"}:{app=\\\"loki\\\"}\" splits=0 throughput=100kB total_bytes=100kB total_entries=10\n", + "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series splits=0 start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 match=\"{container_name=.*\"}:{app=.*}\" query_hash=23523089 total_entries=10\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 61b2f640b44b7..3bf777659a898 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -98,7 +98,7 @@ func (q *QuerierAPI) InstantQueryHandler(ctx context.Context, req *queryrange.Lo // LabelHandler is a http.HandlerFunc for handling label queries. func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) { - timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues("labels")) + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeLabels)) defer timer.ObserveDuration() start := time.Now() @@ -111,7 +111,6 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques if resp != nil { resLength = len(resp.Values) } - // record stats about the label query statResult := statsCtx.Result(time.Since(start), queueTime, resLength) log := spanlogger.FromContext(ctx) statResult.Log(level.Debug(log)) @@ -247,7 +246,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { // SeriesHandler returns the list of time series that match a certain label set. // See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, stats.Result, error) { - timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues("series")) + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeSeries)) defer timer.ObserveDuration() start := time.Now() @@ -261,7 +260,6 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ resLength = len(resp.Series) } - // record stats about the label query statResult := statsCtx.Result(time.Since(start), queueTime, resLength) log := spanlogger.FromContext(ctx) statResult.Log(level.Debug(log)) @@ -271,21 +269,38 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), statResult) + logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) return resp, statResult, err } // IndexStatsHandler queries the index for the data statistics related to a query func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQuery) (*logproto.IndexStatsResponse, error) { + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeStats)) + defer timer.ObserveDuration() + + start := time.Now() + statsCtx, ctx := stats.NewContext(ctx) + // TODO(karsten): we might want to change IndexStats to receive a logproto.IndexStatsRequest instead - // TODO(owen-d): log metadata, record stats? resp, err := q.querier.IndexStats(ctx, req) if resp == nil { // Some stores don't implement this resp = &index_stats.Stats{} } + queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) + statResult := statsCtx.Result(time.Since(start), queueTime, 1) + log := spanlogger.FromContext(ctx) + statResult.Log(level.Debug(log)) + + status := 200 + if err != nil { + status, _ = serverutil.ClientHTTPStatusAndError(err) + } + + logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) + return resp, err } @@ -294,6 +309,12 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu // VolumeHandler queries the index label volumes related to the passed matchers and given time range. // Returns either N values where N is the time range / step and a single value for a time range depending on the request. func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeVolume)) + defer timer.ObserveDuration() + + start := time.Now() + statsCtx, ctx := stats.NewContext(ctx) + resp, err := q.querier.Volume(ctx, req) if err != nil { return nil, err @@ -302,6 +323,18 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ return &logproto.VolumeResponse{Volumes: []logproto.Volume{}}, nil } + queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) + statResult := statsCtx.Result(time.Since(start), queueTime, 1) + log := spanlogger.FromContext(ctx) + statResult.Log(level.Debug(log)) + + status := 200 + if err != nil { + status, _ = serverutil.ClientHTTPStatusAndError(err) + } + + logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) + return resp, nil } diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 27b15d6698aa7..05f1f4379922f 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -1357,6 +1357,10 @@ func ParamsFromRequest(req queryrangebase.Request) (logql.Params, error) { return ¶msLabelWrapper{ LabelRequest: r, }, nil + case *logproto.IndexStatsRequest: + return ¶msStatsWrapper{ + IndexStatsRequest: r, + }, nil default: return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, got (%T)", r) } @@ -1476,6 +1480,34 @@ func (p paramsLabelWrapper) Shards() []string { return make([]string, 0) } +type paramsStatsWrapper struct { + *logproto.IndexStatsRequest +} + +func (p paramsStatsWrapper) Query() string { + return p.GetQuery() +} + +func (p paramsStatsWrapper) Start() time.Time { + return p.From.Time() +} + +func (p paramsStatsWrapper) End() time.Time { + return p.Through.Time() +} + +func (p paramsStatsWrapper) Step() time.Duration { + return time.Duration(p.GetStep() * 1e6) +} +func (p paramsStatsWrapper) Interval() time.Duration { return 0 } +func (p paramsStatsWrapper) Direction() logproto.Direction { + return logproto.FORWARD +} +func (p paramsStatsWrapper) Limit() uint32 { return 0 } +func (p paramsStatsWrapper) Shards() []string { + return make([]string, 0) +} + func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryrangebase.PrometheusResponseHeader { var promHeaders []queryrangebase.PrometheusResponseHeader for h, hv := range httpHeaders { diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 60bdebe9f7dc3..3dd03750e3b1d 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -807,6 +807,22 @@ func NewVolumeTripperware( ), nil } +func statsTripperware(nextTW base.Middleware) base.Middleware { + return base.MiddlewareFunc(func(next base.Handler) base.Handler { + return base.HandlerFunc(func(ctx context.Context, r base.Request) (base.Response, error) { + cacheMiddlewares := []base.Middleware{ + StatsCollectorMiddleware(), + nextTW, + } + + // wrap nextRT with our new middleware + return base.MergeMiddlewares( + cacheMiddlewares..., + ).Wrap(next).Do(ctx, r) + }) + }) +} + func volumeRangeTripperware(nextTW base.Middleware) base.Middleware { return base.MiddlewareFunc(func(next base.Handler) base.Handler { return base.HandlerFunc(func(ctx context.Context, r base.Request) (base.Response, error) { @@ -888,7 +904,7 @@ func NewIndexStatsTripperware( } } - return sharedIndexTripperware( + tw, err := sharedIndexTripperware( cacheMiddleware, cfg, merger, @@ -897,6 +913,11 @@ func NewIndexStatsTripperware( metrics, schema, ) + if err != nil { + return nil, err + } + + return statsTripperware(tw), nil } func sharedIndexTripperware( diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 7e0dc49fe57df..0233d886c98f2 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -9,13 +9,13 @@ import ( "strconv" "time" - "github.com/grafana/loki/pkg/logproto" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/middleware" promql_parser "github.com/prometheus/prometheus/promql/parser" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" @@ -33,6 +33,7 @@ const ( queryTypeMetric = "metric" queryTypeSeries = "series" queryTypeLabel = "label" + queryTypeStats = "stats" queryTypeVolume = "volume" ) @@ -54,9 +55,11 @@ func recordQueryMetrics(data *queryData) { case queryTypeLabel: logql.RecordLabelQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.label, data.params.Query(), data.status, *data.statistics) case queryTypeSeries: - logql.RecordSeriesQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.match, data.status, *data.statistics) + logql.RecordSeriesQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.match, data.status, []string{}, *data.statistics) + case queryTypeStats: + logql.RecordStatsQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) case queryTypeVolume: - logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) + logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.params.Limit(), data.params.Step(), data.status, *data.statistics) default: level.Error(logger).Log("msg", "failed to record query metrics", "err", fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, got %s", data.queryType)) } @@ -79,7 +82,7 @@ type queryData struct { result promql_parser.Value status string queryType string - match []string // used in `series` query. + match []string // used in `series` query label string // used in `labels` query recorded bool @@ -153,6 +156,10 @@ func StatsCollectorMiddleware() queryrangebase.Middleware { responseStats = &r.Statistics // TODO: this is always nil. See codec.DecodeResponse totalEntries = len(r.Data) queryType = queryTypeLabel + case *IndexStatsResponse: + responseStats = &stats.Result{} // TODO: support stats in proto + totalEntries = 1 + queryType = queryTypeStats default: level.Warn(logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp)) }