Skip to content

Commit

Permalink
Add volume logs
Browse files Browse the repository at this point in the history
Signed-off-by: Danny Kopping <[email protected]>
  • Loading branch information
Danny Kopping committed Oct 27, 2023
1 parent a66d1cb commit 6887bc2
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
35 changes: 22 additions & 13 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func RecordLabelQueryMetrics(
"status", status,
"label", label,
"query", query,
"query_hash", HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned,
)

Expand Down Expand Up @@ -272,6 +273,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", PrintMatches(match),
"query_hash", HashedQuery(PrintMatches(match)),
"total_entries", stats.Summary.TotalEntriesReturned)

if shard != nil {
Expand Down Expand Up @@ -311,21 +313,15 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"query", query,
"query_hash", HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

func RecordVolumeQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
query string,
status string,
stats logql_stats.Result,
) {
func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, limit uint32, step time.Duration, status string, stats logql_stats.Result) {
var (
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
Expand All @@ -338,23 +334,36 @@ func RecordVolumeQueryMetrics(
latencyType = latencyTypeSlow
}

rangeType := "range"
if step == 0 {
rangeType = "instant"
}

level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"range_type", rangeType,
"step", step,
"limit", limit,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"splits", stats.Summary.Splits,
"total_entries", stats.Summary.TotalEntriesReturned,
"cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested,
"cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound,
"cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(),
// cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats
"cache_result_req", stats.Caches.VolumeResult.EntriesRequested,
"cache_result_hit", stats.Caches.VolumeResult.EntriesFound,
"cache_result_stored", stats.Caches.VolumeResult.EntriesStored,
"cache_result_download_time", stats.Caches.VolumeResult.CacheDownloadTime(),
)

execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

func recordUsageStats(queryType string, stats logql_stats.Result) {
Expand Down
22 changes: 18 additions & 4 deletions pkg/querier/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func (q *QuerierAPI) LabelHandler(ctx context.Context, req *logproto.LabelReques
if resp != nil {
resLength = len(resp.Values)
}
// record stats about the label query
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
Expand Down Expand Up @@ -261,7 +260,6 @@ func (q *QuerierAPI) SeriesHandler(ctx context.Context, req *logproto.SeriesRequ
resLength = len(resp.Series)
}

// record stats about the label query
statResult := statsCtx.Result(time.Since(start), queueTime, resLength)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
Expand All @@ -285,15 +283,13 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu
statsCtx, ctx := stats.NewContext(ctx)

// TODO(karsten): we might want to change IndexStats to receive a logproto.IndexStatsRequest instead
// TODO(owen-d): log metadata, record stats?
resp, err := q.querier.IndexStats(ctx, req)
if resp == nil {
// Some stores don't implement this
resp = &index_stats.Stats{}
}

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
// record stats about the label query
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))
Expand All @@ -313,6 +309,12 @@ func (q *QuerierAPI) IndexStatsHandler(ctx context.Context, req *loghttp.RangeQu
// VolumeHandler queries the index label volumes related to the passed matchers and given time range.
// Returns either N values where N is the time range / step and a single value for a time range depending on the request.
func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) {
timer := prometheus.NewTimer(logql.QueryTime.WithLabelValues(logql.QueryTypeVolume))
defer timer.ObserveDuration()

start := time.Now()
statsCtx, ctx := stats.NewContext(ctx)

resp, err := q.querier.Volume(ctx, req)
if err != nil {
return nil, err
Expand All @@ -321,6 +323,18 @@ func (q *QuerierAPI) VolumeHandler(ctx context.Context, req *logproto.VolumeRequ
return &logproto.VolumeResponse{Volumes: []logproto.Volume{}}, nil
}

queueTime, _ := ctx.Value(httpreq.QueryQueueTimeHTTPHeader).(time.Duration)
statResult := statsCtx.Result(time.Since(start), queueTime, 1)
log := spanlogger.FromContext(ctx)
statResult.Log(level.Debug(log))

status := 200
if err != nil {
status, _ = serverutil.ClientHTTPStatusAndError(err)
}

logql.RecordVolumeQueryMetrics(ctx, log, req.From.Time(), req.Through.Time(), req.GetQuery(), uint32(req.GetLimit()), time.Duration(req.GetStep()), strconv.Itoa(status), statResult)

return resp, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func recordQueryMetrics(data *queryData) {
case queryTypeStats:
logql.RecordStatsQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics)
case queryTypeVolume:
logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.status, *data.statistics)
logql.RecordVolumeQueryMetrics(data.ctx, logger, data.params.Start(), data.params.End(), data.params.Query(), data.params.Limit(), data.params.Step(), data.status, *data.statistics)
default:
level.Error(logger).Log("msg", "failed to record query metrics", "err", fmt.Errorf("expected one of the *LokiRequest, *LokiInstantRequest, *LokiSeriesRequest, *LokiLabelNamesRequest, got %s", data.queryType))
}
Expand Down

0 comments on commit 6887bc2

Please sign in to comment.