Skip to content

Commit

Permalink
Improve observability of index queries (grafana#11064)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Requests to the following APIs previously had limited or incorrect
logging, which made understanding their behaviour at runtime difficult.

[GET
/loki/api/v1/labels](https://grafana.com/docs/loki/latest/reference/api/#list-labels-within-a-range-of-time)
[GET
/loki/api/v1/label/<name>/values](https://grafana.com/docs/loki/latest/reference/api/#list-label-values-within-a-range-of-time)
[GET
/loki/api/v1/series](https://grafana.com/docs/loki/latest/reference/api/#list-series)
[GET
/loki/api/v1/index/stats](https://grafana.com/docs/loki/latest/reference/api/#index-stats)
[GET
/loki/api/v1/index/volume](https://grafana.com/docs/loki/latest/reference/api/#volume)
[GET
/loki/api/v1/index/volume_range](https://grafana.com/docs/loki/latest/reference/api/#volume)

All of these APIs now have querier and query-frontend logs; sharding and
time-based splitting are applied to these requests, and it's valuable to
see all requests.
  • Loading branch information
Danny Kopping authored and rhnasc committed Apr 12, 2024
1 parent 3b87713 commit 3935fb8
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 76 deletions.
167 changes: 108 additions & 59 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)

const (
Expand All @@ -28,6 +30,7 @@ const (
QueryTypeLimited = "limited"
QueryTypeLabels = "labels"
QueryTypeSeries = "series"
QueryTypeStats = "stats"
QueryTypeVolume = "volume"

latencyTypeSlow = "slow"
Expand Down Expand Up @@ -89,7 +92,7 @@ func RecordRangeAndInstantQueryMetrics(
result promql_parser.Value,
) {
var (
logger = util_log.WithContext(ctx, log)
logger = fixLogger(ctx, log)
rt = string(GetRangeType(p))
latencyType = latencyTypeFast
returnedLines = 0
Expand All @@ -111,7 +114,7 @@ func RecordRangeAndInstantQueryMetrics(

queryTags, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string) // it's ok to be empty.

logValues := make([]interface{}, 0, 30)
logValues := make([]interface{}, 0, 50)

logValues = append(logValues, []interface{}{
"latency", latencyType, // this can be used to filter log lines.
Expand Down Expand Up @@ -197,7 +200,7 @@ func RecordLabelQueryMetrics(
stats logql_stats.Result,
) {
var (
logger = util_log.WithContext(ctx, log)
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeLabels
)
Expand All @@ -211,49 +214,42 @@ func RecordLabelQueryMetrics(
level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"splits", stats.Summary.Splits,
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"label", label,
"query", query,
"splits", stats.Summary.Splits,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"query_hash", HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned,
)

sharded := strconv.FormatBool(false)
if stats.Summary.Shards > 1 {
sharded = strconv.FormatBool(true)
}
execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

bytesPerSecond.WithLabelValues(status, queryType, "", latencyType, sharded).
Observe(float64(stats.Summary.BytesProcessedPerSecond))
execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
chunkDownloadLatency.WithLabelValues(status, queryType, "").
Observe(stats.ChunksDownloadTime().Seconds())
duplicatesTotal.Add(float64(stats.TotalDuplicates()))
chunkDownloadedTotal.WithLabelValues(status, queryType, "").
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))
// fixLogger forces the given logger to include a caller=metrics.go kv pair.
// The given logger might be a spanlogger instance, in which case it only logs caller=spanlogger.go:<line>.
// We use `caller=metrics.go` when querying our logs for performance issues, and some logs were missing.
func fixLogger(ctx context.Context, logger log.Logger) log.Logger {
nl := util_log.WithContext(ctx, logger)
if _, ok := logger.(*spanlogger.SpanLogger); ok {
return log.With(nl, "caller", "metrics.go")
}
return nl
}

func PrintMatches(matches []string) string {
// not using comma (,) as separator as matcher may already have comma (e.g: `{a="b", c="d"}`)
return strings.Join(matches, ":")
}

func RecordSeriesQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
match []string,
status string,
stats logql_stats.Result,
) {
func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, match []string, status string, shards []string, stats logql_stats.Result) {
var (
logger = util_log.WithContext(ctx, log)
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeSeries
)
Expand All @@ -264,46 +260,72 @@ func RecordSeriesQueryMetrics(
latencyType = latencyTypeSlow
}

// we also log queries, useful for troubleshooting slow queries.
level.Info(logger).Log(
shard := extractShard(shards)

logValues := make([]interface{}, 0, 15)
logValues = append(logValues,
"latency", latencyType,
"query_type", queryType,
"splits", stats.Summary.Splits,
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", PrintMatches(match),
"splits", stats.Summary.Splits,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_entries", stats.Summary.TotalEntriesReturned,
"query_hash", HashedQuery(PrintMatches(match)),
"total_entries", stats.Summary.TotalEntriesReturned)

if shard != nil {
logValues = append(logValues,
"shard_num", shard.Shard,
"shard_count", shard.Of,
)
}

level.Info(logger).Log(logValues...)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, stats logql_stats.Result) {
var (
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeStats
)

sharded := strconv.FormatBool(false)
if stats.Summary.Shards > 1 {
sharded = strconv.FormatBool(true)
// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}
bytesPerSecond.WithLabelValues(status, queryType, "", latencyType, sharded).
Observe(float64(stats.Summary.BytesProcessedPerSecond))
execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
chunkDownloadLatency.WithLabelValues(status, queryType, "").
Observe(stats.ChunksDownloadTime().Seconds())
duplicatesTotal.Add(float64(stats.TotalDuplicates()))
chunkDownloadedTotal.WithLabelValues(status, queryType, "").
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))

logValues := make([]interface{}, 0, 15)
logValues = append(logValues,
"latency", latencyType,
"query_type", queryType,
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"query", query,
"query_hash", HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

func RecordVolumeQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
query string,
status string,
stats logql_stats.Result,
) {
func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, limit uint32, step time.Duration, status string, stats logql_stats.Result) {
var (
logger = util_log.WithContext(ctx, log)
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeVolume
)
Expand All @@ -314,23 +336,36 @@ func RecordVolumeQueryMetrics(
latencyType = latencyTypeSlow
}

rangeType := "range"
if step == 0 {
rangeType = "instant"
}

level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"range_type", rangeType,
"step", step,
"limit", limit,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"splits", stats.Summary.Splits,
"total_entries", stats.Summary.TotalEntriesReturned,
// cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats
"cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested,
"cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound,
"cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored,
"cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(),
)

execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

func recordUsageStats(queryType string, stats logql_stats.Result) {
Expand Down Expand Up @@ -391,3 +426,17 @@ func tagsToKeyValues(queryTags string) []interface{} {

return res
}

func extractShard(shards []string) *astmapper.ShardAnnotation {
if len(shards) == 0 {
return nil
}

var shard astmapper.ShardAnnotation
shard, err := astmapper.ParseShard(shards[0])
if err != nil {
return nil
}

return &shard
}
10 changes: 5 additions & 5 deletions pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func TestLogLabelsQuery(t *testing.T) {
TotalEntriesReturned: 12,
},
})
require.Equal(t,
require.Regexp(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo query= splits=0 throughput=100kB total_bytes=100kB total_entries=12\n",
"level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels splits=0 start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 label=foo query= query_hash=2166136261 total_entries=12\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand All @@ -127,17 +127,17 @@ func TestLogSeriesQuery(t *testing.T) {
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", stats.Result{
RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", []string{}, stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
TotalEntriesReturned: 10,
},
})
require.Equal(t,
require.Regexp(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series length=1h0m0s duration=25.25s status=200 match=\"{container_name=~\\\"prometheus.*\\\", component=\\\"server\\\"}:{app=\\\"loki\\\"}\" splits=0 throughput=100kB total_bytes=100kB total_entries=10\n",
"level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series splits=0 start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 match=\"{container_name=.*\"}:{app=.*}\" query_hash=23523089 total_entries=10\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
Loading

0 comments on commit 3935fb8

Please sign in to comment.