Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inflight-logging: Add extra metadata to inflight requests logging #11243

Merged
merged 7 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ BUILD_IMAGE_VERSION ?= 0.31.2
# Docker image info
IMAGE_PREFIX ?= grafana

BUILD_IMAGE_PREFIX ?= grafana

IMAGE_TAG ?= $(shell ./tools/image-tag)

# Version info for binaries
Expand Down Expand Up @@ -102,7 +104,7 @@ RM := --rm
TTY := --tty

DOCKER_BUILDKIT ?= 1
BUILD_IMAGE = BUILD_IMAGE=$(IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION)
BUILD_IMAGE = BUILD_IMAGE=$(BUILD_IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION)
PUSH_OCI=docker push
TAG_OCI=docker tag
ifeq ($(CI), true)
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/grafana/regexp"

logutil "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/queryutil"
"github.com/grafana/loki/pkg/util/validation"
)

Expand Down Expand Up @@ -43,7 +44,7 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool {
for _, b := range blocks {

if b.Hash > 0 {
if b.Hash == HashedQuery(query) {
if b.Hash == queryutil.HashedQuery(query) {
level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query)
return qb.block(b, typ, logger)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/logql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
logutil "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/queryutil"
"github.com/grafana/loki/pkg/util/server"
"github.com/grafana/loki/pkg/util/spanlogger"
"github.com/grafana/loki/pkg/util/validation"
Expand Down Expand Up @@ -219,7 +220,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) {
)

if q.logExecQuery {
queryHash := HashedQuery(q.params.Query())
queryHash := queryutil.HashedQuery(q.params.Query())
if GetRangeType(q.params) == InstantType {
level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "instant", "query", q.params.Query(), "query_hash", queryHash)
} else {
Expand Down
18 changes: 6 additions & 12 deletions pkg/logql/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package logql

import (
"context"
"hash/fnv"
"strconv"
"strings"
"time"
Expand All @@ -22,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/queryutil"
"github.com/grafana/loki/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -120,7 +120,7 @@ func RecordRangeAndInstantQueryMetrics(
logValues = append(logValues, []interface{}{
"latency", latencyType, // this can be used to filter log lines.
"query", p.Query(),
"query_hash", HashedQuery(p.Query()),
"query_hash", queryutil.HashedQuery(p.Query()),
"query_type", queryType,
"range_type", rt,
"length", p.End().Sub(p.Start()),
Expand Down Expand Up @@ -187,12 +187,6 @@ func RecordRangeAndInstantQueryMetrics(
recordUsageStats(queryType, stats)
}

func HashedQuery(query string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(query))
return h.Sum32()
}

func RecordLabelQueryMetrics(
ctx context.Context,
log log.Logger,
Expand Down Expand Up @@ -225,7 +219,7 @@ func RecordLabelQueryMetrics(
"status", status,
"label", label,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", queryutil.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned,
)

Expand Down Expand Up @@ -276,7 +270,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"match", PrintMatches(match),
"query_hash", HashedQuery(PrintMatches(match)),
"query_hash", queryutil.HashedQuery(PrintMatches(match)),
"total_entries", stats.Summary.TotalEntriesReturned)

if shard != nil {
Expand Down Expand Up @@ -316,7 +310,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim
"duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))),
"status", status,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", queryutil.HashedQuery(query),
"total_entries", stats.Summary.TotalEntriesReturned)

level.Info(logger).Log(logValues...)
Expand Down Expand Up @@ -346,7 +340,7 @@ func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end ti
"latency", latencyType,
"query_type", queryType,
"query", query,
"query_hash", HashedQuery(query),
"query_hash", queryutil.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
Expand Down
20 changes: 19 additions & 1 deletion pkg/querier/queryrange/queryrangebase/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/queryutil"
)

type RetryMiddlewareMetrics struct {
Expand Down Expand Up @@ -73,6 +74,11 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
MaxRetries: 0,
}
bk := backoff.New(ctx, cfg)

start := req.GetStart()
end := req.GetEnd()
query := req.GetQuery()

for ; tries < r.maxRetries; tries++ {
if ctx.Err() != nil {
return nil, ctx.Err()
Expand All @@ -86,7 +92,19 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) {
httpResp, ok := httpgrpc.HTTPResponseFromError(err)
if !ok || httpResp.Code/100 == 5 {
lastErr = err
level.Error(util_log.WithContext(ctx, r.log)).Log("msg", "error processing request", "try", tries, "query", req.GetQuery(), "retry_in", bk.NextDelay(), "err", err)
level.Error(util_log.WithContext(ctx, r.log)).Log(
"msg", "error processing request",
"try", tries,
"query", query,
"query_hash", queryutil.HashedQuery(query),
"start", start.Format(time.RFC3339Nano),
"end", end.Format(time.RFC3339Nano),
"start_delta", time.Since(start),
"end_delta", time.Since(end),
"length", end.Sub(start),
"retry_in", bk.NextDelay(),
"err", err,
)
bk.Wait()
continue
}
Expand Down
18 changes: 15 additions & 3 deletions pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util/constants"
logutil "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/queryutil"
)

// Config is the configuration for the queryrange tripperware
Expand Down Expand Up @@ -247,8 +248,19 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

queryHash := logql.HashedQuery(op.Query)
level.Info(logger).Log("msg", "executing query", "type", "range", "query", op.Query, "length", op.EndTs.Sub(op.StartTs), "step", op.Step, "query_hash", queryHash)
queryHash := queryutil.HashedQuery(op.Query)
level.Info(logger).Log(
"msg", "executing query",
"type", "range",
"query", op.Query,
"start", op.StartTs.Format(time.RFC3339Nano),
"end", op.EndTs.Format(time.RFC3339Nano),
"start_delta", time.Since(op.StartTs),
"end_delta", time.Since(op.EndTs),
"length", op.EndTs.Sub(op.StartTs),
"step", op.Step,
"query_hash", queryHash,
)

switch e := expr.(type) {
case syntax.SampleExpr:
Expand Down Expand Up @@ -296,7 +308,7 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response,
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

queryHash := logql.HashedQuery(op.Query)
queryHash := queryutil.HashedQuery(op.Query)
level.Info(logger).Log("msg", "executing query", "type", "instant", "query", op.Query, "query_hash", queryHash)

switch expr.(type) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/template"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
ruler "github.com/grafana/loki/pkg/ruler/base"
"github.com/grafana/loki/pkg/ruler/rulespb"
"github.com/grafana/loki/pkg/ruler/util"
"github.com/grafana/loki/pkg/util/queryutil"
)

// RulesLimits is the one function we need from limits.Overrides, and
Expand Down Expand Up @@ -60,7 +60,7 @@ type RulesLimits interface {
// and passing an altered timestamp.
func queryFunc(evaluator Evaluator, checker readyChecker, userID string, logger log.Logger) rules.QueryFunc {
return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
hash := logql.HashedQuery(qs)
hash := queryutil.HashedQuery(qs)
detail := rules.FromOriginContext(ctx)
detailLog := log.With(logger, "rule_name", detail.Name, "rule_type", detail.Kind, "query", qs, "query_hash", hash)

Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/evaluator_jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util/queryutil"
)

// EvaluatorWithJitter wraps a given Evaluator. It applies a consistent jitter based on a rule's query string by hashing
Expand Down Expand Up @@ -44,7 +44,7 @@ func NewEvaluatorWithJitter(inner Evaluator, maxJitter time.Duration, hasher has
}

func (e *EvaluatorWithJitter) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) {
logger := log.With(e.logger, "query", qs, "query_hash", logql.HashedQuery(qs))
logger := log.With(e.logger, "query", qs, "query_hash", queryutil.HashedQuery(qs))
jitter := e.calculateJitter(qs, logger)

if jitter > 0 {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ruler/evaluator_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import (
"google.golang.org/grpc/keepalive"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/util/build"
"github.com/grafana/loki/pkg/util/constants"
"github.com/grafana/loki/pkg/util/httpreq"
"github.com/grafana/loki/pkg/util/queryutil"
"github.com/grafana/loki/pkg/util/spanlogger"
)

Expand Down Expand Up @@ -220,7 +220,7 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim
args.Set("time", ts.Format(time.RFC3339Nano))
}
body := []byte(args.Encode())
hash := logql.HashedQuery(query)
hash := queryutil.HashedQuery(query)

req := httpgrpc.HTTPRequest{
Method: http.MethodPost,
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/queryutil/query.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package queryutil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This package doesn't match the naming convention we have in pkg/util. Maybe just move it to the util package, since we have pkg/util/hash_fp.go already.


import "hash/fnv"

// HashedQuery returns a unique hash value for the given `query`.
func HashedQuery(query string) uint32 {
h := fnv.New32()
_, _ = h.Write([]byte(query))
return h.Sum32()
}
Loading