From d4ac723f603fb3821d7f70166b4e651f90aa7da8 Mon Sep 17 00:00:00 2001 From: Kaviraj Kanagaraj Date: Mon, 20 Nov 2023 11:00:13 +0100 Subject: [PATCH] inflight-logging: Add extra metadata to inflight requests logging (#11243) **What this PR does / why we need it**: logging: Add extra metadata to inflight requests This adds extra metadata (similar to what we have in `metrics.go`) but for queries in in-flight (both started and retrying) Changes: Adds following data 1. Query Hash 2. Start and End time 3. Start and End delta 4. Length of the query 5. Moved the helper util to `queryutil` package because of cyclic dependencies with `logql` package. **Which issue(s) this PR fixes**: Fixes # **Special notes for your reviewer**: Find the screenshots of log entries looks like (both in `retry.go` and `roundtrip.go`) ![Screenshot 2023-11-16 at 13 01 32](https://github.com/grafana/loki/assets/3735252/177e97ed-6ee8-41dd-b088-2e4f49562ba0) ![Screenshot 2023-11-16 at 13 02 15](https://github.com/grafana/loki/assets/3735252/fb328a37-dbe3-483e-b083-f21327858029) **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --------- Signed-off-by: Kaviraj --- CHANGELOG.md | 1 + Makefile | 4 +++- pkg/logql/blocker.go | 3 ++- pkg/logql/blocker_test.go | 5 +++-- pkg/logql/engine.go | 2 +- pkg/logql/engine_test.go | 2 +- pkg/logql/metrics.go | 18 ++++++----------- pkg/logql/metrics_test.go | 7 ++++--- .../queryrange/queryrangebase/retry.go | 20 ++++++++++++++++++- pkg/querier/queryrange/roundtrip.go | 18 ++++++++++++++--- pkg/ruler/compat.go | 8 ++++---- pkg/ruler/evaluator_jitter.go | 4 ++-- pkg/ruler/evaluator_remote.go | 4 ++-- pkg/util/hash_fp.go | 13 +++++++++++- 14 files changed, 75 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65c97f3d8dee5..13136a5b41cb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging. * [11110](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Change the default of the `metrics-namespace` flag to 'loki'. * [11086](https://github.com/grafana/loki/pull/11086) **kandrew5**: Helm: Allow topologySpreadConstraints * [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace. diff --git a/Makefile b/Makefile index b1cacb1135333..ee022ba2129f0 100644 --- a/Makefile +++ b/Makefile @@ -42,6 +42,8 @@ BUILD_IMAGE_VERSION ?= 0.31.2 # Docker image info IMAGE_PREFIX ?= grafana +BUILD_IMAGE_PREFIX ?= grafana + IMAGE_TAG ?= $(shell ./tools/image-tag) # Version info for binaries @@ -102,7 +104,7 @@ RM := --rm TTY := --tty DOCKER_BUILDKIT ?= 1 -BUILD_IMAGE = BUILD_IMAGE=$(IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION) +BUILD_IMAGE = BUILD_IMAGE=$(BUILD_IMAGE_PREFIX)/loki-build-image:$(BUILD_IMAGE_VERSION) PUSH_OCI=docker push TAG_OCI=docker tag ifeq ($(CI), true) diff --git a/pkg/logql/blocker.go b/pkg/logql/blocker.go index d38a640456c30..cbfdc6bf49e3b 100644 --- a/pkg/logql/blocker.go +++ b/pkg/logql/blocker.go @@ -8,6 +8,7 @@ import ( "github.com/go-kit/log/level" "github.com/grafana/regexp" + "github.com/grafana/loki/pkg/util" logutil "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/validation" ) @@ -43,7 +44,7 @@ func (qb *queryBlocker) isBlocked(ctx context.Context, tenant string) bool { for _, b := range blocks { if b.Hash > 0 { - if b.Hash == HashedQuery(query) { + if b.Hash == util.HashedQuery(query) { level.Warn(logger).Log("msg", "query blocker matched with hash policy", "hash", b.Hash, "query", query) return qb.block(b, typ, logger) } diff --git a/pkg/logql/blocker_test.go b/pkg/logql/blocker_test.go index 3dc3b72c81599..e0dc00bf622e7 100644 --- a/pkg/logql/blocker_test.go +++ b/pkg/logql/blocker_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/validation" ) @@ -124,7 +125,7 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) { "correct FNV32 hash matches", defaultQuery, []*validation.BlockedQuery{ { - Hash: HashedQuery(defaultQuery), + Hash: util.HashedQuery(defaultQuery), }, }, logqlmodel.ErrBlocked, }, @@ -132,7 +133,7 @@ func TestEngine_ExecWithBlockedQueries(t *testing.T) { "incorrect FNV32 hash does not match", defaultQuery, []*validation.BlockedQuery{ { - Hash: HashedQuery(defaultQuery) + 1, + Hash: util.HashedQuery(defaultQuery) + 1, }, }, nil, }, diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 1b85ea05ea760..af680a33b9a97 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -219,7 +219,7 @@ func (q *query) Exec(ctx context.Context) (logqlmodel.Result, error) { ) if q.logExecQuery { - queryHash := HashedQuery(q.params.Query()) + queryHash := util.HashedQuery(q.params.Query()) if GetRangeType(q.params) == InstantType { level.Info(logutil.WithContext(ctx, q.logger)).Log("msg", "executing query", "type", "instant", "query", q.params.Query(), "query_hash", queryHash) } else { diff --git a/pkg/logql/engine_test.go b/pkg/logql/engine_test.go index ef7d5e0538e3d..548400644a31e 100644 --- a/pkg/logql/engine_test.go +++ b/pkg/logql/engine_test.go @@ -2669,7 +2669,7 @@ func TestHashingStability(t *testing.T) { {`sum (count_over_time({app="myapp",env="myenv"} |= "error" |= "metrics.go" | logfmt [10s])) by(query_hash)`}, } { params.qs = test.qs - expectedQueryHash := HashedQuery(test.qs) + expectedQueryHash := util.HashedQuery(test.qs) // check that both places will end up having the same query hash, even though they're emitting different log lines. require.Regexp(t, diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 3ba3a9c61535d..94a4c2f9dd408 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -2,7 +2,6 @@ package logql import ( "context" - "hash/fnv" "strconv" "strings" "time" @@ -19,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/logqlmodel" logql_stats "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/querier/astmapper" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" @@ -120,7 +120,7 @@ func RecordRangeAndInstantQueryMetrics( logValues = append(logValues, []interface{}{ "latency", latencyType, // this can be used to filter log lines. "query", p.Query(), - "query_hash", HashedQuery(p.Query()), + "query_hash", util.HashedQuery(p.Query()), "query_type", queryType, "range_type", rt, "length", p.End().Sub(p.Start()), @@ -187,12 +187,6 @@ func RecordRangeAndInstantQueryMetrics( recordUsageStats(queryType, stats) } -func HashedQuery(query string) uint32 { - h := fnv.New32() - _, _ = h.Write([]byte(query)) - return h.Sum32() -} - func RecordLabelQueryMetrics( ctx context.Context, log log.Logger, @@ -225,7 +219,7 @@ func RecordLabelQueryMetrics( "status", status, "label", label, "query", query, - "query_hash", HashedQuery(query), + "query_hash", util.HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned, ) @@ -276,7 +270,7 @@ func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end ti "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "match", PrintMatches(match), - "query_hash", HashedQuery(PrintMatches(match)), + "query_hash", util.HashedQuery(PrintMatches(match)), "total_entries", stats.Summary.TotalEntriesReturned) if shard != nil { @@ -316,7 +310,7 @@ func RecordStatsQueryMetrics(ctx context.Context, log log.Logger, start, end tim "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), "status", status, "query", query, - "query_hash", HashedQuery(query), + "query_hash", util.HashedQuery(query), "total_entries", stats.Summary.TotalEntriesReturned) level.Info(logger).Log(logValues...) @@ -346,7 +340,7 @@ func RecordVolumeQueryMetrics(ctx context.Context, log log.Logger, start, end ti "latency", latencyType, "query_type", queryType, "query", query, - "query_hash", HashedQuery(query), + "query_hash", util.HashedQuery(query), "start", start.Format(time.RFC3339Nano), "end", end.Format(time.RFC3339Nano), "start_delta", time.Since(start), diff --git a/pkg/logql/metrics_test.go b/pkg/logql/metrics_test.go index 950a16bb39a73..06d4e2699494e 100644 --- a/pkg/logql/metrics_test.go +++ b/pkg/logql/metrics_test.go @@ -18,6 +18,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/httpreq" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -191,11 +192,11 @@ func Test_testToKeyValues(t *testing.T) { } func TestQueryHashing(t *testing.T) { - h1 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) - h2 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`) + h1 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) + h2 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= logfmt |= "metrics.go"`) // check that it capture differences of order. require.NotEqual(t, h1, h2) - h3 := HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) + h3 := util.HashedQuery(`{app="myapp",env="myenv"} |= "error" |= "metrics.go" |= logfmt`) // check that it evaluate same queries as same hashes, even if evaluated at different timestamps. require.Equal(t, h1, h3) } diff --git a/pkg/querier/queryrange/queryrangebase/retry.go b/pkg/querier/queryrange/queryrangebase/retry.go index 5dbad8d82582a..d051363771bb9 100644 --- a/pkg/querier/queryrange/queryrangebase/retry.go +++ b/pkg/querier/queryrange/queryrangebase/retry.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/grafana/loki/pkg/util" util_log "github.com/grafana/loki/pkg/util/log" ) @@ -73,6 +74,11 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) { MaxRetries: 0, } bk := backoff.New(ctx, cfg) + + start := req.GetStart() + end := req.GetEnd() + query := req.GetQuery() + for ; tries < r.maxRetries; tries++ { if ctx.Err() != nil { return nil, ctx.Err() @@ -86,7 +92,19 @@ func (r retry) Do(ctx context.Context, req Request) (Response, error) { httpResp, ok := httpgrpc.HTTPResponseFromError(err) if !ok || httpResp.Code/100 == 5 { lastErr = err - level.Error(util_log.WithContext(ctx, r.log)).Log("msg", "error processing request", "try", tries, "query", req.GetQuery(), "retry_in", bk.NextDelay(), "err", err) + level.Error(util_log.WithContext(ctx, r.log)).Log( + "msg", "error processing request", + "try", tries, + "query", query, + "query_hash", util.HashedQuery(query), + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "length", end.Sub(start), + "retry_in", bk.NextDelay(), + "err", err, + ) bk.Wait() continue } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index e2a2ed0021690..2b24ab4a917dc 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -24,6 +24,7 @@ import ( base "github.com/grafana/loki/pkg/querier/queryrange/queryrangebase" "github.com/grafana/loki/pkg/storage/chunk/cache" "github.com/grafana/loki/pkg/storage/config" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/constants" logutil "github.com/grafana/loki/pkg/util/log" ) @@ -247,8 +248,19 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response, return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - queryHash := logql.HashedQuery(op.Query) - level.Info(logger).Log("msg", "executing query", "type", "range", "query", op.Query, "length", op.EndTs.Sub(op.StartTs), "step", op.Step, "query_hash", queryHash) + queryHash := util.HashedQuery(op.Query) + level.Info(logger).Log( + "msg", "executing query", + "type", "range", + "query", op.Query, + "start", op.StartTs.Format(time.RFC3339Nano), + "end", op.EndTs.Format(time.RFC3339Nano), + "start_delta", time.Since(op.StartTs), + "end_delta", time.Since(op.EndTs), + "length", op.EndTs.Sub(op.StartTs), + "step", op.Step, + "query_hash", queryHash, + ) switch e := expr.(type) { case syntax.SampleExpr: @@ -296,7 +308,7 @@ func (r roundTripper) Do(ctx context.Context, req base.Request) (base.Response, return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } - queryHash := logql.HashedQuery(op.Query) + queryHash := util.HashedQuery(op.Query) level.Info(logger).Log("msg", "executing query", "type", "instant", "query", op.Query, "query_hash", queryHash) switch expr.(type) { diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index db6316e9986d0..8f70d314da884 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -24,11 +24,11 @@ import ( "github.com/prometheus/prometheus/rules" "github.com/prometheus/prometheus/template" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/syntax" ruler "github.com/grafana/loki/pkg/ruler/base" "github.com/grafana/loki/pkg/ruler/rulespb" - "github.com/grafana/loki/pkg/ruler/util" + rulerutil "github.com/grafana/loki/pkg/ruler/util" + "github.com/grafana/loki/pkg/util" ) // RulesLimits is the one function we need from limits.Overrides, and @@ -40,7 +40,7 @@ type RulesLimits interface { RulerRemoteWriteURL(userID string) string RulerRemoteWriteTimeout(userID string) time.Duration RulerRemoteWriteHeaders(userID string) map[string]string - RulerRemoteWriteRelabelConfigs(userID string) []*util.RelabelConfig + RulerRemoteWriteRelabelConfigs(userID string) []*rulerutil.RelabelConfig RulerRemoteWriteConfig(userID string, id string) *config.RemoteWriteConfig RulerRemoteWriteQueueCapacity(userID string) int RulerRemoteWriteQueueMinShards(userID string) int @@ -60,7 +60,7 @@ type RulesLimits interface { // and passing an altered timestamp. func queryFunc(evaluator Evaluator, checker readyChecker, userID string, logger log.Logger) rules.QueryFunc { return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) { - hash := logql.HashedQuery(qs) + hash := util.HashedQuery(qs) detail := rules.FromOriginContext(ctx) detailLog := log.With(logger, "rule_name", detail.Name, "rule_type", detail.Kind, "query", qs, "query_hash", hash) diff --git a/pkg/ruler/evaluator_jitter.go b/pkg/ruler/evaluator_jitter.go index ef337c73396be..449ca0e18011c 100644 --- a/pkg/ruler/evaluator_jitter.go +++ b/pkg/ruler/evaluator_jitter.go @@ -10,8 +10,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/util" ) // EvaluatorWithJitter wraps a given Evaluator. It applies a consistent jitter based on a rule's query string by hashing @@ -44,7 +44,7 @@ func NewEvaluatorWithJitter(inner Evaluator, maxJitter time.Duration, hasher has } func (e *EvaluatorWithJitter) Eval(ctx context.Context, qs string, now time.Time) (*logqlmodel.Result, error) { - logger := log.With(e.logger, "query", qs, "query_hash", logql.HashedQuery(qs)) + logger := log.With(e.logger, "query", qs, "query_hash", util.HashedQuery(qs)) jitter := e.calculateJitter(qs, logger) if jitter > 0 { diff --git a/pkg/ruler/evaluator_remote.go b/pkg/ruler/evaluator_remote.go index 4f953876d6c0f..97a0c1ce7f9dd 100644 --- a/pkg/ruler/evaluator_remote.go +++ b/pkg/ruler/evaluator_remote.go @@ -36,8 +36,8 @@ import ( "google.golang.org/grpc/keepalive" "github.com/grafana/loki/pkg/loghttp" - "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/build" "github.com/grafana/loki/pkg/util/constants" "github.com/grafana/loki/pkg/util/httpreq" @@ -220,7 +220,7 @@ func (r *RemoteEvaluator) query(ctx context.Context, orgID, query string, ts tim args.Set("time", ts.Format(time.RFC3339Nano)) } body := []byte(args.Encode()) - hash := logql.HashedQuery(query) + hash := util.HashedQuery(query) req := httpgrpc.HTTPRequest{ Method: http.MethodPost, diff --git a/pkg/util/hash_fp.go b/pkg/util/hash_fp.go index 209b8b45c0646..e7c0253865b65 100644 --- a/pkg/util/hash_fp.go +++ b/pkg/util/hash_fp.go @@ -1,6 +1,10 @@ package util -import "github.com/prometheus/common/model" +import ( + "hash/fnv" + + "github.com/prometheus/common/model" +) // HashFP simply moves entropy from the most significant 48 bits of the // fingerprint into the least significant 16 bits (by XORing) so that a simple @@ -12,3 +16,10 @@ import "github.com/prometheus/common/model" func HashFP(fp model.Fingerprint) uint32 { return uint32(fp ^ (fp >> 32) ^ (fp >> 16)) } + +// HashedQuery returns a unique hash value for the given `query`. +func HashedQuery(query string) uint32 { + h := fnv.New32() + _, _ = h.Write([]byte(query)) + return h.Sum32() +}