From f04d0db43457353dddc71239808583d1c9cbfdbd Mon Sep 17 00:00:00 2001 From: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> Date: Fri, 5 Jan 2024 14:43:45 +0100 Subject: [PATCH] Add metrics.go log line when reading from an ingester (#11571) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit **What this PR does / why we need it**: Add a metrics.go log line when a querier reads from an ingester. The output is like this: ``` level=info ts=2024-01-03T13:59:06.919965546Z caller=metrics.go:275 component=ingester org_id=fake traceID=0b216e7d014c5f87 latency=fast query_type=ingester_series start=2024-01-03T13:00:00Z end=2024-01-03T13:59:06.834Z start_delta=59m6.91996413s end_delta=85.964255ms length=59m6.834s duration=110.792µs status=200 query= query_hash=2166136261 total_entries=9 ``` **Checklist** - [X] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [X] Tests updated - [X] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/setup/upgrade/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) - [ ] If the change is deprecating or removing a configuration option, update the `deprecated-config.yaml` and `deleted-config.yaml` files respectively in the `tools/deprecated-config-checker` directory. [Example PR](https://github.com/grafana/loki/pull/10840/commits/0d4416a4b03739583349934b96f272fb4f685d15) --------- Signed-off-by: Michel Hollands --- CHANGELOG.md | 3 +- pkg/ingester/ingester.go | 37 ++++++++++++++++-- pkg/ingester/instance.go | 24 +++++++----- pkg/ingester/instance_test.go | 20 +++++----- pkg/logql/metrics.go | 74 +++++++++++++++++++++++++++++++---- 5 files changed, 126 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4002d76c032e9..9b9d26a935f37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ ##### Enhancements -* [11363](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown +* [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester +* [11477](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown * [11363](https://github.com/grafana/loki/pull/11363) **kavirajk**: bugfix(memcached): Make memcached batch fetch truely context aware. * [11319](https://github.com/grafana/loki/pull/11319) **someStrangerFromTheAbyss**: Helm: Add extraContainers to the write pods. * [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index adff06187c648..a6d252a733ec0 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -871,10 +871,13 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / } // Query the ingests for log streams matching a set of matchers. -func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { +func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) (err error) { // initialize stats collection for ingester queries. _, ctx := stats.NewContext(queryServer.Context()) + start := time.Now().UTC() + var lines int32 + if req.Plan == nil { parsed, err := syntax.ParseLogSelector(req.Selector, true) if err != nil { @@ -885,6 +888,17 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie } } + defer func() { + status := "successful" + if err != nil { + status = "failed" + } + statsCtx := stats.FromContext(ctx) + execTime := time.Since(start) + logql.RecordIngesterStreamsQueryMetrics(ctx, i.logger, req.Start, req.End, req.Selector, status, req.Limit, lines, req.Shards, + statsCtx.Result(execTime, time.Duration(0), 0)) + }() + instanceID, err := tenant.TenantID(ctx) if err != nil { return err @@ -926,14 +940,17 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie batchLimit = -1 } - return sendBatches(ctx, it, queryServer, batchLimit) + lines, err = sendBatches(ctx, it, queryServer, batchLimit) + return err } // QuerySample the ingesters for series from logs matching a set of matchers. -func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error { +func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) (err error) { // initialize stats collection for ingester queries. _, ctx := stats.NewContext(queryServer.Context()) sp := opentracing.SpanFromContext(ctx) + start := time.Now().UTC() + var lines int32 // If the plan is empty we want all series to be returned. if req.Plan == nil { @@ -946,6 +963,17 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log } } + defer func() { + status := "successful" + if err != nil { + status = "failed" + } + statsCtx := stats.FromContext(ctx) + execTime := time.Since(start) + logql.RecordIngesterSeriesQueryMetrics(ctx, i.logger, req.Start, req.End, req.Selector, status, lines, req.Shards, + statsCtx.Result(execTime, time.Duration(0), 0)) + }() + instanceID, err := tenant.TenantID(ctx) if err != nil { return err @@ -984,7 +1012,8 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) - return sendSampleBatches(ctx, it, queryServer) + lines, err = sendSampleBatches(ctx, it, queryServer) + return err } // asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`. diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index f29628d85eeb8..4521daaf20123 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -949,8 +949,9 @@ type QuerierQueryServer interface { Send(res *logproto.QueryResponse) error } -func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error { +func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) (int32, error) { stats := stats.FromContext(ctx) + var lines int32 // send until the limit is reached. for limit != 0 && !isDone(ctx) { @@ -960,7 +961,7 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ } batch, batchSize, err := iter.ReadBatch(i, fetchSize) if err != nil { - return err + return lines, err } if limit > 0 { @@ -969,46 +970,49 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ stats.AddIngesterBatch(int64(batchSize)) batch.Stats = stats.Ingester() + lines += int32(batchSize) if isDone(ctx) { break } if err := queryServer.Send(batch); err != nil && err != context.Canceled { - return err + return lines, err } // We check this after sending an empty batch to make sure stats are sent if len(batch.Streams) == 0 { - return nil + return lines, err } stats.Reset() } - return nil + return lines, nil } -func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) error { +func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) (int32, error) { + var lines int32 sp := opentracing.SpanFromContext(ctx) stats := stats.FromContext(ctx) for !isDone(ctx) { batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize) if err != nil { - return err + return lines, err } stats.AddIngesterBatch(int64(size)) batch.Stats = stats.Ingester() + lines += int32(size) if isDone(ctx) { break } if err := queryServer.Send(batch); err != nil && err != context.Canceled { - return err + return lines, err } // We check this after sending an empty batch to make sure stats are sent if len(batch.Series) == 0 { - return nil + return lines, nil } stats.Reset() @@ -1017,7 +1021,7 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer } } - return nil + return lines, nil } func shouldConsiderStream(stream *stream, reqFrom, reqThrough time.Time) bool { diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 48c3a8b0bccd4..ea36cee5ddc98 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -614,16 +614,16 @@ func Test_Iterator(t *testing.T) { // assert the order is preserved. var res *logproto.QueryResponse - require.NoError(t, - sendBatches(context.TODO(), it, - fakeQueryServer( - func(qr *logproto.QueryResponse) error { - res = qr - return nil - }, - ), - int32(2)), - ) + lines, err := sendBatches(context.TODO(), it, + fakeQueryServer( + func(qr *logproto.QueryResponse) error { + res = qr + return nil + }, + ), + int32(2)) + require.NoError(t, err) + require.Equal(t, int32(2), lines) require.Equal(t, 2, len(res.Streams)) // each entry translated into a unique stream require.Equal(t, 1, len(res.Streams[0].Entries)) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 0a24e74145369..048fe0e0028a2 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -26,13 +26,15 @@ import ( ) const ( - QueryTypeMetric = "metric" - QueryTypeFilter = "filter" - QueryTypeLimited = "limited" - QueryTypeLabels = "labels" - QueryTypeSeries = "series" - QueryTypeStats = "stats" - QueryTypeVolume = "volume" + QueryTypeMetric = "metric" + QueryTypeFilter = "filter" + QueryTypeLimited = "limited" + QueryTypeLabels = "labels" + QueryTypeSeries = "series" + QueryTypeIngesterStreams = "ingester_streams" + QueryTypeIngesterSeries = "ingester_series" + QueryTypeStats = "stats" + QueryTypeVolume = "volume" latencyTypeSlow = "slow" latencyTypeFast = "fast" @@ -247,6 +249,64 @@ func PrintMatches(matches []string) string { return strings.Join(matches, ":") } +func RecordIngesterStreamsQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, limit uint32, returnedLines int32, shards []string, stats logql_stats.Result) { + recordIngesterQueryMetrics(ctx, QueryTypeIngesterStreams, log, start, end, query, status, &limit, returnedLines, shards, stats) +} + +func RecordIngesterSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, returnedLines int32, shards []string, stats logql_stats.Result) { + recordIngesterQueryMetrics(ctx, QueryTypeIngesterSeries, log, start, end, query, status, nil, returnedLines, shards, stats) +} + +func recordIngesterQueryMetrics(ctx context.Context, queryType string, log log.Logger, start, end time.Time, query string, status string, limit *uint32, returnedLines int32, shards []string, stats logql_stats.Result) { + var ( + logger = fixLogger(ctx, log) + latencyType = latencyTypeFast + ) + + // Tag throughput metric by latency type based on a threshold. + // Latency below the threshold is fast, above is slow. + if stats.Summary.ExecTime > slowQueryThresholdSecond { + latencyType = latencyTypeSlow + } + + logValues := make([]interface{}, 0, 23) + logValues = append(logValues, + "latency", latencyType, + "query_type", queryType, + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "length", end.Sub(start), + "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), + "status", status, + "query", query, + "query_hash", util.HashedQuery(query), + "returned_lines", returnedLines, + "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), + "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), + "total_bytes_structured_metadata", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalStructuredMetadataBytesProcessed)), " ", "", 1), + "lines_per_second", stats.Summary.LinesProcessedPerSecond, + "total_lines", stats.Summary.TotalLinesProcessed, + "post_filter_lines", stats.Summary.TotalPostFilterLines, + "total_entries", stats.Summary.TotalEntriesReturned, + "chunk_refs_fetch_time", stats.ChunkRefsFetchTime()) + + if limit != nil { + logValues = append(logValues, + "limit", *limit) + } + shard := extractShard(shards) + if shard != nil { + logValues = append(logValues, + "shard_num", shard.Shard, + "shard_count", shard.Of, + ) + } + + level.Info(logger).Log(logValues...) +} + func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, match []string, status string, shards []string, stats logql_stats.Result) { var ( logger = fixLogger(ctx, log)