From 5cbec46d2b1027ab2fe3985a17a3284a15c9f19f Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 28 Aug 2024 13:24:11 -0600 Subject: [PATCH] chore: rely on parsers to add _extracted suffix --- pkg/querier/querier.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index fdbda8a8977de..db261e17ab533 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -3,7 +3,6 @@ package querier import ( "context" "flag" - "fmt" "net/http" "sort" "strconv" @@ -1200,18 +1199,9 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p emtpyparser := "" for _, stream := range streams { - lbls, err := syntax.ParseLabels(stream.Labels) - if err != nil { - continue - } - for _, entry := range stream.Entries { structuredMetadata := getStructuredMetadata(entry) for k, vals := range structuredMetadata { - if lbls.Has(k) { - k = fmt.Sprintf("%s_extracted", k) - } - df, ok := detectedFields[k] if !ok && fieldCount < limit { df = newParsedFields(&emtpyparser) @@ -1236,12 +1226,8 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p } } - detected, parser := parseLine(entry.Line) + detected, parser := parseLine(entry.Line, stream) for k, vals := range detected { - if lbls.Has(k) { - k = fmt.Sprintf("%s_extracted", k) - } - df, ok := detectedFields[k] if !ok && fieldCount < limit { df = newParsedFields(parser) @@ -1297,11 +1283,16 @@ func getStructuredMetadata(entry push.Entry) map[string][]string { return result } -func parseLine(line string) (map[string][]string, *string) { +func parseLine(line string, stream logproto.Stream) (map[string][]string, *string) { + streamLbls, err := syntax.ParseLabels(stream.Labels) + if err != nil { + streamLbls = labels.EmptyLabels() + } + parser := "logfmt" logFmtParser := logql_log.NewLogfmtParser(true, false) - lbls := logql_log.NewBaseLabelsBuilder().ForLabels(labels.EmptyLabels(), 0) + lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0) _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) if !logfmtSuccess || lbls.HasErr() { parser = "json" @@ -1315,6 +1306,10 @@ func parseLine(line string) (map[string][]string, *string) { parsedLabels := map[string]map[string]struct{}{} for _, lbl := range lbls.LabelsResult().Labels() { + // skip indexed labels, as we only want detected fields + if streamLbls.Has(lbl.Name) { + continue + } if values, ok := parsedLabels[lbl.Name]; ok { values[lbl.Value] = struct{}{} } else {