Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve observability of index queries #11064

Merged
merged 8 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 108 additions & 59 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
"github.com/grafana/loki/pkg/logql/syntax"
"github.com/grafana/loki/pkg/logqlmodel"
logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/querier/astmapper"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/spanlogger"
)

const (
Expand All @@ -28,6 +30,7 @@ const (
QueryTypeLimited = "limited"
QueryTypeLabels = "labels"
QueryTypeSeries = "series"
QueryTypeStats = "stats"
QueryTypeVolume = "volume"

latencyTypeSlow = "slow"
Expand Down Expand Up @@ -89,7 +92,7 @@ func RecordRangeAndInstantQueryMetrics(
result promql_parser.Value,
) {
var (
logger = util_log.WithContext(ctx, log)
logger = fixLogger(ctx, log)
rt = string(GetRangeType(p))
latencyType = latencyTypeFast
returnedLines = 0
Expand All @@ -111,7 +114,7 @@ func RecordRangeAndInstantQueryMetrics(

queryTags, _ := ctx.Value(httpreq.QueryTagsHTTPHeader).(string) // it's ok to be empty.

logValues := make([]interface{}, 0, 30)
logValues := make([]interface{}, 0, 50)

logValues = append(logValues, []interface{}{
"latency", latencyType, // this can be used to filter log lines.
Expand Down Expand Up @@ -197,7 +200,7 @@ func RecordLabelQueryMetrics(
stats logql_stats.Result,
) {
var (
logger = util_log.WithContext(ctx, log)
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeLabels
)
Expand All @@ -211,49 +214,42 @@ func RecordLabelQueryMetrics(
level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"splits", stats.Summary.Splits,
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"label", label,
"query", query,
"splits", stats.Summary.Splits,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
"query_hash", HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned,
)

sharded := strconv.FormatBool(false)
if stats.Summary.Shards > 1 {
sharded = strconv.FormatBool(true)
}
execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

bytesPerSecond.WithLabelValues(status, queryType, "", latencyType, sharded).
Observe(float64(stats.Summary.BytesProcessedPerSecond))
execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
chunkDownloadLatency.WithLabelValues(status, queryType, "").
Observe(stats.ChunksDownloadTime().Seconds())
duplicatesTotal.Add(float64(stats.TotalDuplicates()))
chunkDownloadedTotal.WithLabelValues(status, queryType, "").
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))
// fixLogger forces the given logger to include a caller=metrics.go kv pair.
// The given logger might be a spanlogger instance, in which case it only logs caller=spanlogger.go:<line>.
// We use `caller=metrics.go` when querying our logs for performance issues, and some logs were missing.
func fixLogger(ctx context.Context, logger log.Logger) log.Logger {
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
nl := util_log.WithContext(ctx, logger)
if _, ok := logger.(*spanlogger.SpanLogger); ok {
return log.With(nl, "caller", "metrics.go")
}
return nl
}

func PrintMatches(matches []string) string {
// not using comma (,) as separator as matcher may already have comma (e.g: `{a="b", c="d"}`)
return strings.Join(matches, ":")
}

func RecordSeriesQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
match []string,
status string,
stats logql_stats.Result,
) {
func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, match []string, status string, shards []string, stats logql_stats.Result) {
var (
logger = util_log.WithContext(ctx, log)
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeSeries
)
Expand All @@ -264,46 +260,72 @@ func RecordSeriesQueryMetrics(
latencyType = latencyTypeSlow
}

// we also log queries, useful for troubleshooting slow queries.
level.Info(logger).Log(
shard := extractShard(shards)

logValues := make([]interface{}, 0, 15)
logValues = append(logValues,
"latency", latencyType,
"query_type", queryType,
"splits", stats.Summary.Splits,
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", PrintMatches(match),
"splits", stats.Summary.Splits,
"throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1),
"total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1),
"total_entries", stats.Summary.TotalEntriesReturned,
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
"query_hash", HashedQuery(PrintMatches(match)),
"total_entries", stats.Summary.TotalEntriesReturned)

if shard != nil {
logValues = append(logValues,
"shard_num", shard.Shard,
"shard_count", shard.Of,
)
}
dannykopping marked this conversation as resolved.
Show resolved Hide resolved

level.Info(logger).Log(logValues...)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, stats logql_stats.Result) {
var (
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeStats
)

sharded := strconv.FormatBool(false)
if stats.Summary.Shards > 1 {
sharded = strconv.FormatBool(true)
// Tag throughput metric by latency type based on a threshold.
// Latency below the threshold is fast, above is slow.
if stats.Summary.ExecTime > slowQueryThresholdSecond {
latencyType = latencyTypeSlow
}
bytesPerSecond.WithLabelValues(status, queryType, "", latencyType, sharded).
Observe(float64(stats.Summary.BytesProcessedPerSecond))
execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
chunkDownloadLatency.WithLabelValues(status, queryType, "").
Observe(stats.ChunksDownloadTime().Seconds())
duplicatesTotal.Add(float64(stats.TotalDuplicates()))
chunkDownloadedTotal.WithLabelValues(status, queryType, "").
Add(float64(stats.TotalChunksDownloaded()))
ingesterLineTotal.Add(float64(stats.Ingester.TotalLinesSent))
slim-bean marked this conversation as resolved.
Show resolved Hide resolved

logValues := make([]interface{}, 0, 15)
logValues = append(logValues,
"latency", latencyType,
"query_type", queryType,
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"query", query,
"query_hash", HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)

execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

func RecordVolumeQueryMetrics(
ctx context.Context,
log log.Logger,
start, end time.Time,
query string,
status string,
stats logql_stats.Result,
) {
func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, limit uint32, step time.Duration, status string, stats logql_stats.Result) {
var (
logger = util_log.WithContext(ctx, log)
logger = fixLogger(ctx, log)
latencyType = latencyTypeFast
queryType = QueryTypeVolume
)
Expand All @@ -314,23 +336,36 @@ func RecordVolumeQueryMetrics(
latencyType = latencyTypeSlow
}

rangeType := "range"
if step == 0 {
rangeType = "instant"
}

level.Info(logger).Log(
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
dannykopping marked this conversation as resolved.
Show resolved Hide resolved
"end_delta", time.Since(end),
"range_type", rangeType,
"step", step,
"limit", limit,
"length", end.Sub(start),
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"splits", stats.Summary.Splits,
"total_entries", stats.Summary.TotalEntriesReturned,
// cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats
"cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested,
"cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound,
"cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored,
"cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(),
slim-bean marked this conversation as resolved.
Show resolved Hide resolved
)

execLatency.WithLabelValues(status, queryType, "").
Observe(stats.Summary.ExecTime)
execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime)
}

func recordUsageStats(queryType string, stats logql_stats.Result) {
Expand Down Expand Up @@ -391,3 +426,17 @@ func tagsToKeyValues(queryTags string) []interface{} {

return res
}

func extractShard(shards []string) *astmapper.ShardAnnotation {
if len(shards) == 0 {
return nil
}

var shard astmapper.ShardAnnotation
shard, err := astmapper.ParseShard(shards[0])
if err != nil {
return nil
}

return &shard
}
10 changes: 5 additions & 5 deletions pkg/logql/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ func TestLogLabelsQuery(t *testing.T) {
TotalEntriesReturned: 12,
},
})
require.Equal(t,
require.Regexp(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels length=1h0m0s duration=25.25s status=200 label=foo query= splits=0 throughput=100kB total_bytes=100kB total_entries=12\n",
"level=info org_id=foo traceID=%s sampled=true latency=slow query_type=labels start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 label=foo query= query_hash=2166136261 total_entries=12\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand All @@ -127,17 +127,17 @@ func TestLogSeriesQuery(t *testing.T) {
sp := opentracing.StartSpan("")
ctx := opentracing.ContextWithSpan(user.InjectOrgID(context.Background(), "foo"), sp)
now := time.Now()
RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", stats.Result{
RecordSeriesQueryMetrics(ctx, logger, now.Add(-1*time.Hour), now, []string{`{container_name=~"prometheus.*", component="server"}`, `{app="loki"}`}, "200", []string{}, stats.Result{
Summary: stats.Summary{
BytesProcessedPerSecond: 100000,
ExecTime: 25.25,
TotalBytesProcessed: 100000,
TotalEntriesReturned: 10,
},
})
require.Equal(t,
require.Regexp(t,
fmt.Sprintf(
"level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series length=1h0m0s duration=25.25s status=200 match=\"{container_name=~\\\"prometheus.*\\\", component=\\\"server\\\"}:{app=\\\"loki\\\"}\" splits=0 throughput=100kB total_bytes=100kB total_entries=10\n",
"level=info org_id=foo traceID=%s sampled=true latency=slow query_type=series start=.* end=.* start_delta=1h0m0.* end_delta=.* length=1h0m0s duration=25.25s status=200 match=\"{container_name=.*\"}:{app=.*}\" query_hash=23523089 total_entries=10\n",
sp.Context().(jaeger.SpanContext).SpanID().String(),
),
buf.String())
Expand Down
Loading