From 6887bc2a648e67fa9ea1cb82d2987bffef25b4ee Mon Sep 17 00:00:00 2001 From: Danny Kopping Date: Thu, 26 Oct 2023 14:21:38 +0200 Subject: [PATCH] Add volume logs Signed-off-by: Danny Kopping --- pkg/logql/metrics.go | 35 +++++++++++++++++++++------------ pkg/querier/http.go | 22 +++++++++++++++++---- pkg/querier/queryrange/stats.go | 2 +- 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 907038cd74631..68e217a613eb8 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -223,6 +223,7 @@ func RecordLabelQueryMetrics( "status", status, "label", label, "query", query, + "query_hash", HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned, ) @@ -272,6 +273,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "match", PrintMatches(match), + "query_hash", HashedQuery(PrintMatches(match)), "total_entries", stats.Summary.TotalEntriesReturned) if shard != nil { @@ -311,6 +313,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "query", query, + "query_hash", HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned) level.Info(logger).Log(logValues...) @@ -318,14 +321,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } -func RecordVolumeQueryMetrics( - ctx context.Context, - log log.Logger, - start, end time.Time, - query string, - status string, - stats logql_stats.Result, -) { +func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, limit uint32, step time.Duration, status string, stats logql_stats.Result) { var ( logger = fixLogger(ctx, log) latencyType = latencyTypeFast @@ -338,23 +334,36 @@ func RecordVolumeQueryMetrics( latencyType = latencyTypeSlow } + rangeType := "range" + if step == 0 { + rangeType = "instant" + } + level.Info(logger).Log( "latency", latencyType, "query_type", queryType, "query", query, "query_hash", HashedQuery(query), + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "range_type", rangeType, + "step", step, + "limit", limit, "length", end.Sub(start), "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "splits", stats.Summary.Splits, "total_entries", stats.Summary.TotalEntriesReturned, - "cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, - "cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, - "cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), + // cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats + "cache_result_req", stats.Caches.VolumeResult.EntriesRequested, + "cache_result_hit", stats.Caches.VolumeResult.EntriesFound, + "cache_result_stored", stats.Caches.VolumeResult.EntriesStored, + "cache_result_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), ) - execLatency.WithLabelValues(status, queryType, ""). - Observe(stats.Summary.ExecTime) + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } func recordUsageStats(queryType string, stats logql_stats.Result) { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 7004672fd4ac9..3bf777659a898 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -111,7 +111,6 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques if resp != nil { resLength = len(resp.Values) } - // record stats about the label query statResult := statsCtx.Result(time.Since(start), queueTime, resLength) log := spanlogger.FromContext(ctx) statResult.Log(level.Debug(log)) @@ -261,7 +260,6 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ resLength = len(resp.Series) } - // record stats about the label query statResult := statsCtx.Result(time.Since(start), queueTime, resLength) log := spanlogger.FromContext(ctx) statResult.Log(level.Debug(log)) @@ -285,7 +283,6 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu statsCtx, ctx := stats.NewContext(ctx) // TODO(karsten): we might want to change IndexStats to receive a logproto.IndexStatsRequest instead - // TODO(owen-d): log metadata, record stats? resp, err := q.querier.IndexStats(ctx, req) if resp == nil { // Some stores don't implement this @@ -293,7 +290,6 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu } queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) - // record stats about the label query statResult := statsCtx.Result(time.Since(start), queueTime, 1) log := spanlogger.FromContext(ctx) statResult.Log(level.Debug(log)) @@ -313,6 +309,12 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu // VolumeHandler queries the index label volumes related to the passed matchers and given time range. // Returns either N values where N is the time range / step and a single value for a time range depending on the request. func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) { + timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeVolume)) + defer timer.ObserveDuration() + + start := time.Now() + statsCtx, ctx := stats.NewContext(ctx) + resp, err := q.querier.Volume(ctx, req) if err != nil { return nil, err @@ -321,6 +323,18 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ return &logproto.VolumeResponse{Volumes: []logproto.Volume{}}, nil } + queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration) + statResult := statsCtx.Result(time.Since(start), queueTime, 1) + log := spanlogger.FromContext(ctx) + statResult.Log(level.Debug(log)) + + status := 200 + if err != nil { + status, _ = serverutil.ClientHTTPStatusAndError(err) + } + + logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult) + return resp, nil } diff --git a/pkg/querier/queryrange/stats.go b/pkg/querier/queryrange/stats.go index f9e066607d3e9..0233d886c98f2 100644 --- a/pkg/querier/queryrange/stats.go +++ b/pkg/querier/queryrange/stats.go @@ -59,7 +59,7 @@ func recordQueryMetrics(data *queryData) { case queryTypeStats: logql.RecordStatsQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) case queryTypeVolume: - logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics) + logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.params.Limit(), data.params.Step(), data.status, *data.statistics) default: level.Error(logger).Log("msg", "failed to record query metrics", "err", fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, got %s", data.queryType)) }