From a3467341b57647ca4546c51a8bd1bb3b753b8641 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 13:40:19 +0200 Subject: [PATCH 1/8] Always include caller=metrics.go in stats log lines Signed-off-by: Danny Kopping --- pkg/logql/metrics.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 341fb97fbe87e..18bb2e20e9385 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -20,6 +20,7 @@ import ( logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" + "github.com/grafana/loki/pkg/util/spanlogger" ) const ( @@ -89,7 +90,7 @@ func RecordRangeAndInstantQueryMetrics( result promql_parser.Value, ) { var ( - logger = util_log.WithContext(ctx, log) + logger = fixLogger(ctx, log) rt = string(GetRangeType(p)) latencyType = latencyTypeFast returnedLines = 0 @@ -197,7 +198,7 @@ func RecordLabelQueryMetrics( stats logql_stats.Result, ) { var ( - logger = util_log.WithContext(ctx, log) + logger = fixLogger(ctx, log) latencyType = latencyTypeFast queryType = QueryTypeLabels ) @@ -239,6 +240,17 @@ func RecordLabelQueryMetrics( ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) } +// fixLogger forces the given logger to include a caller=metrics.go kv pair. +// The given logger might be a spanlogger instance, in which case it only logs caller=spanlogger.go:. +// We use `caller=metrics.go` when querying our logs for performance issues, and some logs were missing. +func fixLogger(ctx context.Context, logger log.Logger) log.Logger { + nl := util_log.WithContext(ctx, logger) + if _, ok := logger.(*spanlogger.SpanLogger); ok { + return log.With(nl, "caller", "metrics.go") + } + return nl +} + func PrintMatches(matches []string) string { // not using comma (,) as separator as matcher may already have comma (e.g: `{a="b", c="d"}`) return strings.Join(matches, ":") @@ -253,7 +265,7 @@ func RecordSeriesQueryMetrics( stats logql_stats.Result, ) { var ( - logger = util_log.WithContext(ctx, log) + logger = fixLogger(ctx, log) latencyType = latencyTypeFast queryType = QueryTypeSeries ) @@ -303,7 +315,7 @@ func RecordVolumeQueryMetrics( stats logql_stats.Result, ) { var ( - logger = util_log.WithContext(ctx, log) + logger = fixLogger(ctx, log) latencyType = latencyTypeFast queryType = QueryTypeVolume ) From 55cdbc685375777d79e0d09a601f36c2f52a3cdd Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Wed, 25 Oct 2023 20:33:02 +0200 Subject: [PATCH 2/8] Including relevant detail in labels & series log lines Signed-off-by: Danny Kopping --- pkg/logql/metrics.go | 87 +++++++++++++++------------------ pkg/logql/metrics_test.go | 2 +- pkg/querier/http.go | 2 +- pkg/querier/queryrange/stats.go | 8 +-- 4 files changed, 46 insertions(+), 53 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 18bb2e20e9385..8639aafacf316 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/logql/syntax" "github.com/grafana/loki/pkg/logqlmodel" logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/querier/astmapper" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" @@ -112,7 +113,7 @@ func RecordRangeAndInstantQueryMetrics( queryTags, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string) // it's ok to be empty. - logValues := make([]interface{}, 0, 30) + logValues := make([]interface{}, 0, 50) logValues = append(logValues, []interface{}{ "latency", latencyType, // this can be used to filter log lines. @@ -212,32 +213,19 @@ func RecordLabelQueryMetrics( level.Info(logger).Log( "latency", latencyType, "query_type", queryType, + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), "length", end.Sub(start), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "label", label, "query", query, - "splits", stats.Summary.Splits, - "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), - "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), "total_entries", stats.Summary.TotalEntriesReturned, ) - sharded := strconv.FormatBool(false) - if stats.Summary.Shards > 1 { - sharded = strconv.FormatBool(true) - } - - bytesPerSecond.WithLabelValues(status, queryType, "", latencyType, sharded). - Observe(float64(stats.Summary.BytesProcessedPerSecond)) - execLatency.WithLabelValues(status, queryType, ""). - Observe(stats.Summary.ExecTime) - chunkDownloadLatency.WithLabelValues(status, queryType, ""). - Observe(stats.ChunksDownloadTime().Seconds()) - duplicatesTotal.Add(float64(stats.TotalDuplicates())) - chunkDownloadedTotal.WithLabelValues(status, queryType, ""). - Add(float64(stats.TotalChunksDownloaded())) - ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } // fixLogger forces the given logger to include a caller=metrics.go kv pair. @@ -256,14 +244,7 @@ func PrintMatches(matches []string) string { return strings.Join(matches, ":") } -func RecordSeriesQueryMetrics( - ctx context.Context, - log log.Logger, - start, end time.Time, - match []string, - status string, - stats logql_stats.Result, -) { +func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, match []string, status string, shards []string, stats logql_stats.Result) { var ( logger = fixLogger(ctx, log) latencyType = latencyTypeFast @@ -276,34 +257,32 @@ func RecordSeriesQueryMetrics( latencyType = latencyTypeSlow } - // we also log queries, useful for troubleshooting slow queries. - level.Info(logger).Log( + shard := extractShard(shards) + + logValues := make([]interface{}, 0, 15) + logValues = append(logValues, "latency", latencyType, "query_type", queryType, + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), "length", end.Sub(start), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "match", PrintMatches(match), - "splits", stats.Summary.Splits, - "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), - "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), - "total_entries", stats.Summary.TotalEntriesReturned, - ) + "total_entries", stats.Summary.TotalEntriesReturned) - sharded := strconv.FormatBool(false) - if stats.Summary.Shards > 1 { - sharded = strconv.FormatBool(true) + if shard != nil { + logValues = append(logValues, + "shard_num", shard.Shard, + "shard_count", shard.Of, + ) } - bytesPerSecond.WithLabelValues(status, queryType, "", latencyType, sharded). - Observe(float64(stats.Summary.BytesProcessedPerSecond)) - execLatency.WithLabelValues(status, queryType, ""). - Observe(stats.Summary.ExecTime) - chunkDownloadLatency.WithLabelValues(status, queryType, ""). - Observe(stats.ChunksDownloadTime().Seconds()) - duplicatesTotal.Add(float64(stats.TotalDuplicates())) - chunkDownloadedTotal.WithLabelValues(status, queryType, ""). - Add(float64(stats.TotalChunksDownloaded())) - ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent)) + + level.Info(logger).Log(logValues...) + + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } func RecordVolumeQueryMetrics( @@ -403,3 +382,17 @@ func tagsToKeyValues(queryTags string) []interface{} { return res } + +func extractShard(shards []string) *astmapper.ShardAnnotation { + if len(shards) == 0 { + return nil + } + + var shard astmapper.ShardAnnotation + shard, err := astmapper.ParseShard(shards[0]) + if err != nil { + return nil + } + + return &shard +} diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index e3cd64bdf0b8f..42b88a49a7398 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -127,7 +127,7 @@ func TestLogSeriesQuery(t *testing.T) { sp := opentracing.StartSpan("") ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp) now := time.Now() - RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", stats.Result{ + RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", []string{}, stats.Result{ Summary: stats.Summary{ BytesProcessedPerSecond: 100000, ExecTime: 25.25, diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 61b2f640b44b7..f3f7dc34770c2 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -271,7 +271,7 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ status, _ = serverutil.ClientHTTPStatusAndError(err) } - logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), statResult) + logql.RecordSeriesQueryMetrics(ctx, log, req.Start, req.End, req.Groups, strconv.Itoa(status), req.GetShards(), statResult) return resp, statResult, err } diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 7e0dc49fe57df..49f49254530a3 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -9,13 +9,13 @@ import ( "strconv" "time" - "github.com/grafana/loki/pkg/logproto" - "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/grafana/dskit/middleware" promql_parser "github.com/prometheus/prometheus/promql/parser" + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" @@ -54,7 +54,7 @@ func recordQueryMetrics(data *queryData) { case queryTypeLabel: logql.RecordLabelQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.label, data.params.Query(), data.status, *data.statistics) case queryTypeSeries: - logql.RecordSeriesQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.match, data.status, *data.statistics) + logql.RecordSeriesQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.match, data.status, []string{}, *data.statistics) case queryTypeVolume: logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) default: @@ -79,7 +79,7 @@ type queryData struct { result promql_parser.Value status string queryType string - match []string // used in `series` query. + match []string // used in `series` query label string // used in `labels` query recorded bool From a66d1cbb18ec3da991f11b96ce33076a8e82ca00 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Thu, 26 Oct 2023 11:56:05 +0200 Subject: [PATCH 3/8] Adding logs for index stats requests Signed-off-by: Danny Kopping --- pkg/logql/metrics.go | 33 +++++++++++++++++++++++++++++ pkg/querier/http.go | 23 ++++++++++++++++++-- pkg/querier/queryrange/codec.go | 32 ++++++++++++++++++++++++++++ pkg/querier/queryrange/roundtrip.go | 23 +++++++++++++++++++- pkg/querier/queryrange/stats.go | 7 ++++++ 5 files changed, 115 insertions(+), 3 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 8639aafacf316..907038cd74631 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -30,6 +30,7 @@ const ( QueryTypeLimited = "limited" QueryTypeLabels = "labels" QueryTypeSeries = "series" + QueryTypeStats = "stats" QueryTypeVolume = "volume" latencyTypeSlow = "slow" @@ -285,6 +286,38 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } +func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, stats logql_stats.Result) { + var ( + logger = fixLogger(ctx, log) + latencyType = latencyTypeFast + queryType = QueryTypeStats + ) + + // Tag throughput metric by latency type based on a threshold. + // Latency below the threshold is fast, above is slow. + if stats.Summary.ExecTime > slowQueryThresholdSecond { + latencyType = latencyTypeSlow + } + + logValues := make([]interface{}, 0, 15) + logValues = append(logValues, + "latency", latencyType, + "query_type", queryType, + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "length", end.Sub(start), + "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), + "status", status, + "query", query, + "total_entries", stats.Summary.TotalEntriesReturned) + + level.Info(logger).Log(logValues...) + + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) +} + func RecordVolumeQueryMetrics( ctx context.Context, log log.Logger, diff --git a/pkg/querier/http.go b/pkg/querier/http.go index f3f7dc34770c2..7004672fd4ac9 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -98,7 +98,7 @@ func (q *QuerierAPI) InstantQueryHandler(ctx context.Context, req *queryrange.Lo // LabelHandler is a http.HandlerFunc for handling label queries. func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) { - timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues("labels")) + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeLabels)) defer timer.ObserveDuration() start := time.Now() @@ -247,7 +247,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { // SeriesHandler returns the list of time series that match a certain label set. // See https://prometheus.io/docs/prometheus/latest/querying/api/#finding-series-by-label-matchers func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, stats.Result, error) { - timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues("series")) + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeSeries)) defer timer.ObserveDuration() start := time.Now() @@ -278,6 +278,12 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ // IndexStatsHandler queries the index for the data statistics related to a query func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQuery) (*logproto.IndexStatsResponse, error) { + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeStats)) + defer timer.ObserveDuration() + + start := time.Now() + statsCtx, ctx := stats.NewContext(ctx) + // TODO(karsten): we might want to change IndexStats to receive a logproto.IndexStatsRequest instead // TODO(owen-d): log metadata, record stats? resp, err := q.querier.IndexStats(ctx, req) @@ -286,6 +292,19 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu resp = &index_stats.Stats{} } + queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) + // record stats about the label query + statResult := statsCtx.Result(time.Since(start), queueTime, 1) + log := spanlogger.FromContext(ctx) + statResult.Log(level.Debug(log)) + + status := 200 + if err != nil { + status, _ = serverutil.ClientHTTPStatusAndError(err) + } + + logql.RecordStatsQueryMetrics(ctx, log, req.Start, req.End, req.Query, strconv.Itoa(status), statResult) + return resp, err } diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 27b15d6698aa7..05f1f4379922f 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -1357,6 +1357,10 @@ func ParamsFromRequest(req queryrangebase.Request) (logql.Params, error) { return ¶msLabelWrapper{ LabelRequest: r, }, nil + case *logproto.IndexStatsRequest: + return ¶msStatsWrapper{ + IndexStatsRequest: r, + }, nil default: return nil, fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, got (%T)", r) } @@ -1476,6 +1480,34 @@ func (p paramsLabelWrapper) Shards() []string { return make([]string, 0) } +type paramsStatsWrapper struct { + *logproto.IndexStatsRequest +} + +func (p paramsStatsWrapper) Query() string { + return p.GetQuery() +} + +func (p paramsStatsWrapper) Start() time.Time { + return p.From.Time() +} + +func (p paramsStatsWrapper) End() time.Time { + return p.Through.Time() +} + +func (p paramsStatsWrapper) Step() time.Duration { + return time.Duration(p.GetStep() * 1e6) +} +func (p paramsStatsWrapper) Interval() time.Duration { return 0 } +func (p paramsStatsWrapper) Direction() logproto.Direction { + return logproto.FORWARD +} +func (p paramsStatsWrapper) Limit() uint32 { return 0 } +func (p paramsStatsWrapper) Shards() []string { + return make([]string, 0) +} + func httpResponseHeadersToPromResponseHeaders(httpHeaders http.Header) []queryrangebase.PrometheusResponseHeader { var promHeaders []queryrangebase.PrometheusResponseHeader for h, hv := range httpHeaders { diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 60bdebe9f7dc3..3dd03750e3b1d 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -807,6 +807,22 @@ func NewVolumeTripperware( ), nil } +func statsTripperware(nextTW base.Middleware) base.Middleware { + return base.MiddlewareFunc(func(next base.Handler) base.Handler { + return base.HandlerFunc(func(ctx context.Context, r base.Request) (base.Response, error) { + cacheMiddlewares := []base.Middleware{ + StatsCollectorMiddleware(), + nextTW, + } + + // wrap nextRT with our new middleware + return base.MergeMiddlewares( + cacheMiddlewares..., + ).Wrap(next).Do(ctx, r) + }) + }) +} + func volumeRangeTripperware(nextTW base.Middleware) base.Middleware { return base.MiddlewareFunc(func(next base.Handler) base.Handler { return base.HandlerFunc(func(ctx context.Context, r base.Request) (base.Response, error) { @@ -888,7 +904,7 @@ func NewIndexStatsTripperware( } } - return sharedIndexTripperware( + tw, err := sharedIndexTripperware( cacheMiddleware, cfg, merger, @@ -897,6 +913,11 @@ func NewIndexStatsTripperware( metrics, schema, ) + if err != nil { + return nil, err + } + + return statsTripperware(tw), nil } func sharedIndexTripperware( diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index 49f49254530a3..f9e066607d3e9 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -33,6 +33,7 @@ const ( queryTypeMetric = "metric" queryTypeSeries = "series" queryTypeLabel = "label" + queryTypeStats = "stats" queryTypeVolume = "volume" ) @@ -55,6 +56,8 @@ func recordQueryMetrics(data *queryData) { logql.RecordLabelQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.label, data.params.Query(), data.status, *data.statistics) case queryTypeSeries: logql.RecordSeriesQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.match, data.status, []string{}, *data.statistics) + case queryTypeStats: + logql.RecordStatsQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) case queryTypeVolume: logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) default: @@ -153,6 +156,10 @@ func StatsCollectorMiddleware() queryrangebase.Middleware { responseStats = &r.Statistics // TODO: this is always nil. See codec.DecodeResponse totalEntries = len(r.Data) queryType = queryTypeLabel + case *IndexStatsResponse: + responseStats = &stats.Result{} // TODO: support stats in proto + totalEntries = 1 + queryType = queryTypeStats default: level.Warn(logger).Log("msg", fmt.Sprintf("cannot compute stats, unexpected type: %T", resp)) } From 6887bc2a648e67fa9ea1cb82d2987bffef25b4ee Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Thu, 26 Oct 2023 14:21:38 +0200 Subject: [PATCH 4/8] Add volume logs Signed-off-by: Danny Kopping --- pkg/logql/metrics.go | 35 +++++++++++++++++++++------------ pkg/querier/http.go | 22 +++++++++++++++++---- pkg/querier/queryrange/stats.go | 2 +- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 907038cd74631..68e217a613eb8 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -223,6 +223,7 @@ func RecordLabelQueryMetrics( "status", status, "label", label, "query", query, + "query_hash", HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned, ) @@ -272,6 +273,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "match", PrintMatches(match), + "query_hash", HashedQuery(PrintMatches(match)), "total_entries", stats.Summary.TotalEntriesReturned) if shard != nil { @@ -311,6 +313,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "query", query, + "query_hash", HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned) level.Info(logger).Log(logValues...) @@ -318,14 +321,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } -func RecordVolumeQueryMetrics( - ctx context.Context, - log log.Logger, - start, end time.Time, - query string, - status string, - stats logql_stats.Result, -) { +func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, limit uint32, step time.Duration, status string, stats logql_stats.Result) { var ( logger = fixLogger(ctx, log) latencyType = latencyTypeFast @@ -338,23 +334,36 @@ func RecordVolumeQueryMetrics( latencyType = latencyTypeSlow } + rangeType := "range" + if step == 0 { + rangeType = "instant" + } + level.Info(logger).Log( "latency", latencyType, "query_type", queryType, "query", query, "query_hash", HashedQuery(query), + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "range_type", rangeType, + "step", step, + "limit", limit, "length", end.Sub(start), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "splits", stats.Summary.Splits, "total_entries", stats.Summary.TotalEntriesReturned, - "cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, - "cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, - "cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), + // cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats + "cache_result_req", stats.Caches.VolumeResult.EntriesRequested, + "cache_result_hit", stats.Caches.VolumeResult.EntriesFound, + "cache_result_stored", stats.Caches.VolumeResult.EntriesStored, + "cache_result_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), ) - execLatency.WithLabelValues(status, queryType, ""). - Observe(stats.Summary.ExecTime) + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } func recordUsageStats(queryType string, stats logql_stats.Result) { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 7004672fd4ac9..3bf777659a898 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -111,7 +111,6 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques if resp != nil { resLength = len(resp.Values) } - // record stats about the label query statResult := statsCtx.Result(time.Since(start), queueTime, resLength) log := spanlogger.FromContext(ctx) statResult.Log(level.Debug(log)) @@ -261,7 +260,6 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ resLength = len(resp.Series) } - // record stats about the label query statResult := statsCtx.Result(time.Since(start), queueTime, resLength) log := spanlogger.FromContext(ctx) statResult.Log(level.Debug(log)) @@ -285,7 +283,6 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu statsCtx, ctx := stats.NewContext(ctx) // TODO(karsten): we might want to change IndexStats to receive a logproto.IndexStatsRequest instead - // TODO(owen-d): log metadata, record stats? resp, err := q.querier.IndexStats(ctx, req) if resp == nil { // Some stores don't implement this @@ -293,7 +290,6 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu } queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) - // record stats about the label query statResult := statsCtx.Result(time.Since(start), queueTime, 1) log := spanlogger.FromContext(ctx) statResult.Log(level.Debug(log)) @@ -313,6 +309,12 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu // VolumeHandler queries the index label volumes related to the passed matchers and given time range. // Returns either N values where N is the time range / step and a single value for a time range depending on the request. func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeVolume)) + defer timer.ObserveDuration() + + start := time.Now() + statsCtx, ctx := stats.NewContext(ctx) + resp, err := q.querier.Volume(ctx, req) if err != nil { return nil, err @@ -321,6 +323,18 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ return &logproto.VolumeResponse{Volumes: []logproto.Volume{}}, nil } + queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) + statResult := statsCtx.Result(time.Since(start), queueTime, 1) + log := spanlogger.FromContext(ctx) + statResult.Log(level.Debug(log)) + + status := 200 + if err != nil { + status, _ = serverutil.ClientHTTPStatusAndError(err) + } + + logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) + return resp, nil } diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index f9e066607d3e9..0233d886c98f2 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -59,7 +59,7 @@ func recordQueryMetrics(data *queryData) { case queryTypeStats: logql.RecordStatsQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) case queryTypeVolume: - logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) + logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.params.Limit(), data.params.Step(), data.status, *data.statistics) default: level.Error(logger).Log("msg", "failed to record query metrics", "err", fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, got %s", data.queryType)) } From ba72587b7b5e639eea63f685a12553c88d9039b4 Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 27 Oct 2023 09:35:56 +0200 Subject: [PATCH 5/8] Fixing tests Signed-off-by: Danny Kopping --- pkg/logql/metrics_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 42b88a49a7398..d75abdb8b041d 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -109,9 +109,9 @@ func TestLogLabelsQuery(t *testing.T) { TotalEntriesReturned: 12, }, }) - require.Equal(t, + require.Regexp(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo query= splits=0 throughput=100kB total_bytes=100kB total_entries=12\n", + "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 label=foo query= query_hash=2166136261 total_entries=12\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) @@ -135,9 +135,9 @@ func TestLogSeriesQuery(t *testing.T) { TotalEntriesReturned: 10, }, }) - require.Equal(t, + require.Regexp(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series length=1h0m0s duration=25.25s status=200 match=\"{container_name=~\\\"prometheus.*\\\", component=\\\"server\\\"}:{app=\\\"loki\\\"}\" splits=0 throughput=100kB total_bytes=100kB total_entries=10\n", + "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 match=\"{container_name=.*\"}:{app=.*}\" query_hash=23523089 total_entries=10\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) From 9cabf56c7ab4ae82e7f2d5b0c8cf0ad4f794c09d Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 27 Oct 2023 14:50:25 +0200 Subject: [PATCH 6/8] Adding splits back into stats for labels & series queries Signed-off-by: Danny Kopping --- pkg/logql/metrics.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 68e217a613eb8..09432e146e1f6 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -214,6 +214,7 @@ func RecordLabelQueryMetrics( level.Info(logger).Log( "latency", latencyType, "query_type", queryType, + "splits", stats.Summary.Splits, "start", start.Format(time.RFC3339Nano), "end", end.Format(time.RFC3339Nano), "start_delta", time.Since(start), @@ -265,6 +266,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti logValues = append(logValues, "latency", latencyType, "query_type", queryType, + "splits", stats.Summary.Splits, "start", start.Format(time.RFC3339Nano), "end", end.Format(time.RFC3339Nano), "start_delta", time.Since(start), From 94c819a7741913451e0500a8ef9edf46085d36ce Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 27 Oct 2023 15:06:54 +0200 Subject: [PATCH 7/8] Restore "volume" name in results fields for disambiguation Signed-off-by: Danny Kopping --- pkg/logql/metrics.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 09432e146e1f6..049001b935fa6 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -359,10 +359,10 @@ func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end ti "splits", stats.Summary.Splits, "total_entries", stats.Summary.TotalEntriesReturned, // cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats - "cache_result_req", stats.Caches.VolumeResult.EntriesRequested, - "cache_result_hit", stats.Caches.VolumeResult.EntriesFound, - "cache_result_stored", stats.Caches.VolumeResult.EntriesStored, - "cache_result_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), + "cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, + "cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, + "cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored, + "cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), ) execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) From 8c48386c80eeb0b4663078939c04e36c96f1418b Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Fri, 27 Oct 2023 15:31:10 +0200 Subject: [PATCH 8/8] Fixing tests Signed-off-by: Danny Kopping --- pkg/logql/metrics_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index d75abdb8b041d..950a16bb39a73 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -111,7 +111,7 @@ func TestLogLabelsQuery(t *testing.T) { }) require.Regexp(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 label=foo query= query_hash=2166136261 total_entries=12\n", + "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels splits=0 start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 label=foo query= query_hash=2166136261 total_entries=12\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String()) @@ -137,7 +137,7 @@ func TestLogSeriesQuery(t *testing.T) { }) require.Regexp(t, fmt.Sprintf( - "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 match=\"{container_name=.*\"}:{app=.*}\" query_hash=23523089 total_entries=10\n", + "level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series splits=0 start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 match=\"{container_name=.*\"}:{app=.*}\" query_hash=23523089 total_entries=10\n", sp.Context().(jaeger.SpanContext).SpanID().String(), ), buf.String())