diff --git a/CHANGELOG.md b/CHANGELOG.md index 4002d76c032e9..9b9d26a935f37 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,8 @@ ##### Enhancements -* [11363](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown +* [11571](https://github.com/grafana/loki/pull/11571) **MichelHollands**: Add a metrics.go log line for requests from querier to ingester +* [11477](https://github.com/grafana/loki/pull/11477) **MichelHollands**: support GET for /ingester/shutdown * [11363](https://github.com/grafana/loki/pull/11363) **kavirajk**: bugfix(memcached): Make memcached batch fetch truely context aware. * [11319](https://github.com/grafana/loki/pull/11319) **someStrangerFromTheAbyss**: Helm: Add extraContainers to the write pods. * [11243](https://github.com/grafana/loki/pull/11243) **kavirajk**: Inflight-logging: Add extra metadata to inflight requests logging. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index adff06187c648..a6d252a733ec0 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -871,10 +871,13 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / } // Query the ingests for log streams matching a set of matchers. -func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) error { +func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querier_QueryServer) (err error) { // initialize stats collection for ingester queries. _, ctx := stats.NewContext(queryServer.Context()) + start := time.Now().UTC() + var lines int32 + if req.Plan == nil { parsed, err := syntax.ParseLogSelector(req.Selector, true) if err != nil { @@ -885,6 +888,17 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie } } + defer func() { + status := "successful" + if err != nil { + status = "failed" + } + statsCtx := stats.FromContext(ctx) + execTime := time.Since(start) + logql.RecordIngesterStreamsQueryMetrics(ctx, i.logger, req.Start, req.End, req.Selector, status, req.Limit, lines, req.Shards, + statsCtx.Result(execTime, time.Duration(0), 0)) + }() + instanceID, err := tenant.TenantID(ctx) if err != nil { return err @@ -926,14 +940,17 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie batchLimit = -1 } - return sendBatches(ctx, it, queryServer, batchLimit) + lines, err = sendBatches(ctx, it, queryServer, batchLimit) + return err } // QuerySample the ingesters for series from logs matching a set of matchers. -func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) error { +func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer logproto.Querier_QuerySampleServer) (err error) { // initialize stats collection for ingester queries. _, ctx := stats.NewContext(queryServer.Context()) sp := opentracing.SpanFromContext(ctx) + start := time.Now().UTC() + var lines int32 // If the plan is empty we want all series to be returned. if req.Plan == nil { @@ -946,6 +963,17 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log } } + defer func() { + status := "successful" + if err != nil { + status = "failed" + } + statsCtx := stats.FromContext(ctx) + execTime := time.Since(start) + logql.RecordIngesterSeriesQueryMetrics(ctx, i.logger, req.Start, req.End, req.Selector, status, lines, req.Shards, + statsCtx.Result(execTime, time.Duration(0), 0)) + }() + instanceID, err := tenant.TenantID(ctx) if err != nil { return err @@ -984,7 +1012,8 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log defer util.LogErrorWithContext(ctx, "closing iterator", it.Close) - return sendSampleBatches(ctx, it, queryServer) + lines, err = sendSampleBatches(ctx, it, queryServer) + return err } // asyncStoreMaxLookBack returns a max look back period only if active index type is one of async index stores like `boltdb-shipper` and `tsdb`. diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index f29628d85eeb8..4521daaf20123 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -949,8 +949,9 @@ type QuerierQueryServer interface { Send(res *logproto.QueryResponse) error } -func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) error { +func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQueryServer, limit int32) (int32, error) { stats := stats.FromContext(ctx) + var lines int32 // send until the limit is reached. for limit != 0 && !isDone(ctx) { @@ -960,7 +961,7 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ } batch, batchSize, err := iter.ReadBatch(i, fetchSize) if err != nil { - return err + return lines, err } if limit > 0 { @@ -969,46 +970,49 @@ func sendBatches(ctx context.Context, i iter.EntryIterator, queryServer QuerierQ stats.AddIngesterBatch(int64(batchSize)) batch.Stats = stats.Ingester() + lines += int32(batchSize) if isDone(ctx) { break } if err := queryServer.Send(batch); err != nil && err != context.Canceled { - return err + return lines, err } // We check this after sending an empty batch to make sure stats are sent if len(batch.Streams) == 0 { - return nil + return lines, err } stats.Reset() } - return nil + return lines, nil } -func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) error { +func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer logproto.Querier_QuerySampleServer) (int32, error) { + var lines int32 sp := opentracing.SpanFromContext(ctx) stats := stats.FromContext(ctx) for !isDone(ctx) { batch, size, err := iter.ReadSampleBatch(it, queryBatchSampleSize) if err != nil { - return err + return lines, err } stats.AddIngesterBatch(int64(size)) batch.Stats = stats.Ingester() + lines += int32(size) if isDone(ctx) { break } if err := queryServer.Send(batch); err != nil && err != context.Canceled { - return err + return lines, err } // We check this after sending an empty batch to make sure stats are sent if len(batch.Series) == 0 { - return nil + return lines, nil } stats.Reset() @@ -1017,7 +1021,7 @@ func sendSampleBatches(ctx context.Context, it iter.SampleIterator, queryServer } } - return nil + return lines, nil } func shouldConsiderStream(stream *stream, reqFrom, reqThrough time.Time) bool { diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 48c3a8b0bccd4..ea36cee5ddc98 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -614,16 +614,16 @@ func Test_Iterator(t *testing.T) { // assert the order is preserved. var res *logproto.QueryResponse - require.NoError(t, - sendBatches(context.TODO(), it, - fakeQueryServer( - func(qr *logproto.QueryResponse) error { - res = qr - return nil - }, - ), - int32(2)), - ) + lines, err := sendBatches(context.TODO(), it, + fakeQueryServer( + func(qr *logproto.QueryResponse) error { + res = qr + return nil + }, + ), + int32(2)) + require.NoError(t, err) + require.Equal(t, int32(2), lines) require.Equal(t, 2, len(res.Streams)) // each entry translated into a unique stream require.Equal(t, 1, len(res.Streams[0].Entries)) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 0a24e74145369..048fe0e0028a2 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -26,13 +26,15 @@ import ( ) const ( - QueryTypeMetric = "metric" - QueryTypeFilter = "filter" - QueryTypeLimited = "limited" - QueryTypeLabels = "labels" - QueryTypeSeries = "series" - QueryTypeStats = "stats" - QueryTypeVolume = "volume" + QueryTypeMetric = "metric" + QueryTypeFilter = "filter" + QueryTypeLimited = "limited" + QueryTypeLabels = "labels" + QueryTypeSeries = "series" + QueryTypeIngesterStreams = "ingester_streams" + QueryTypeIngesterSeries = "ingester_series" + QueryTypeStats = "stats" + QueryTypeVolume = "volume" latencyTypeSlow = "slow" latencyTypeFast = "fast" @@ -247,6 +249,64 @@ func PrintMatches(matches []string) string { return strings.Join(matches, ":") } +func RecordIngesterStreamsQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, limit uint32, returnedLines int32, shards []string, stats logql_stats.Result) { + recordIngesterQueryMetrics(ctx, QueryTypeIngesterStreams, log, start, end, query, status, &limit, returnedLines, shards, stats) +} + +func RecordIngesterSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, query string, status string, returnedLines int32, shards []string, stats logql_stats.Result) { + recordIngesterQueryMetrics(ctx, QueryTypeIngesterSeries, log, start, end, query, status, nil, returnedLines, shards, stats) +} + +func recordIngesterQueryMetrics(ctx context.Context, queryType string, log log.Logger, start, end time.Time, query string, status string, limit *uint32, returnedLines int32, shards []string, stats logql_stats.Result) { + var ( + logger = fixLogger(ctx, log) + latencyType = latencyTypeFast + ) + + // Tag throughput metric by latency type based on a threshold. + // Latency below the threshold is fast, above is slow. + if stats.Summary.ExecTime > slowQueryThresholdSecond { + latencyType = latencyTypeSlow + } + + logValues := make([]interface{}, 0, 23) + logValues = append(logValues, + "latency", latencyType, + "query_type", queryType, + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "length", end.Sub(start), + "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), + "status", status, + "query", query, + "query_hash", util.HashedQuery(query), + "returned_lines", returnedLines, + "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), + "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), + "total_bytes_structured_metadata", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalStructuredMetadataBytesProcessed)), " ", "", 1), + "lines_per_second", stats.Summary.LinesProcessedPerSecond, + "total_lines", stats.Summary.TotalLinesProcessed, + "post_filter_lines", stats.Summary.TotalPostFilterLines, + "total_entries", stats.Summary.TotalEntriesReturned, + "chunk_refs_fetch_time", stats.ChunkRefsFetchTime()) + + if limit != nil { + logValues = append(logValues, + "limit", *limit) + } + shard := extractShard(shards) + if shard != nil { + logValues = append(logValues, + "shard_num", shard.Shard, + "shard_count", shard.Of, + ) + } + + level.Info(logger).Log(logValues...) +} + func RecordSeriesQueryMetrics(ctx context.Context, log log.Logger, start, end time.Time, match []string, status string, shards []string, stats logql_stats.Result) { var ( logger = fixLogger(ctx, log)