From c51511016efa3946d98f4fba06acc00b956304f7 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 28 Aug 2024 11:26:14 -0600 Subject: [PATCH 1/4] feat: add _extracted suffix to detected fields conflicts detected fields that conflict with indexed labels need the "extracted" suffix --- pkg/querier/querier.go | 19 ++++++- pkg/querier/querier_mock_test.go | 4 +- pkg/querier/querier_test.go | 93 ++++++++++++++++++++++++-------- 3 files changed, 92 insertions(+), 24 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index b320e5c5fd6ad..445ba10186cbd 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -3,6 +3,7 @@ package querier import ( "context" "flag" + "fmt" "net/http" "sort" "strconv" @@ -1100,6 +1101,17 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. return nil, err } + indexedLabels := map[string]struct{}{} + for _, stream := range streams { + lbls, err := syntax.ParseLabels(stream.Labels) + if err != nil { + continue + } + for _, lbl := range lbls { + indexedLabels[lbl.Name] = struct{}{} + } + } + detectedFields := parseDetectedFields(req.FieldLimit, streams) fields := make([]*logproto.DetectedField, len(detectedFields)) @@ -1111,8 +1123,13 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. continue } + name := k + if _, ok := indexedLabels[k]; ok { + name = fmt.Sprintf("%s_extracted", k) + } + fields[fieldCount] = &logproto.DetectedField{ - Label: k, + Label: name, Type: v.fieldType, Cardinality: v.Estimate(), Sketch: sketch, diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 4ddbab7ed2e59..c783b1bf11e34 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -575,7 +575,7 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream } func mockLogfmtStream(from int, quantity int) logproto.Stream { - return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`) + return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`) } func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream { @@ -586,7 +586,7 @@ func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Str entries = append(entries, logproto.Entry{ Timestamp: time.Unix(int64(i), 0), Line: fmt.Sprintf( - `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`, + `message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`, i, i, (i * 10), diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 7336c3b11bfaf..701f01bfefd38 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1777,15 +1777,16 @@ func TestQuerier_DetectedFields(t *testing.T) { detectedFields := resp.Fields // log lines come from querier_mock_test.go // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 7) + assert.Len(t, detectedFields, 8) expectedCardinality := map[string]uint64{ - "message": 5, - "count": 5, - "fake": 1, - "bytes": 5, - "duration": 5, - "percent": 5, - "even": 2, + "message": 5, + "count": 5, + "fake": 1, + "bytes": 5, + "duration": 5, + "percent": 5, + "even": 2, + "name_extracted": 1, } for _, d := range detectedFields { card := expectedCardinality[d.Label] @@ -1821,17 +1822,18 @@ func TestQuerier_DetectedFields(t *testing.T) { detectedFields := resp.Fields // log lines come from querier_mock_test.go // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 9) + assert.Len(t, detectedFields, 10) expectedCardinality := map[string]uint64{ - "variable": 5, - "constant": 1, - "message": 5, - "count": 5, - "fake": 1, - "bytes": 5, - "duration": 5, - "percent": 5, - "even": 2, + "variable": 5, + "constant": 1, + "message": 5, + "count": 5, + "fake": 1, + "bytes": 5, + "duration": 5, + "percent": 5, + "even": 2, + "name_extracted": 1, } for _, d := range detectedFields { card := expectedCardinality[d.Label] @@ -1867,7 +1869,7 @@ func TestQuerier_DetectedFields(t *testing.T) { detectedFields := resp.Fields // log lines come from querier_mock_test.go // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 7) + assert.Len(t, detectedFields, 8) var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField for _, field := range detectedFields { @@ -1923,7 +1925,7 @@ func TestQuerier_DetectedFields(t *testing.T) { detectedFields := resp.Fields // log lines come from querier_mock_test.go // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t - assert.Len(t, detectedFields, 9) + assert.Len(t, detectedFields, 10) var messageField, countField, bytesField, durationField, floatField, evenField, constantField, variableField *logproto.DetectedField for _, field := range detectedFields { @@ -1955,7 +1957,56 @@ func TestQuerier_DetectedFields(t *testing.T) { assert.Equal(t, []string{"logfmt"}, evenField.Parsers) assert.Equal(t, []string{""}, constantField.Parsers) assert.Equal(t, []string{""}, variableField.Parsers) - }) + }, + ) + + t.Run( + "adds _extracted suffix to detected fields that conflict with indexed labels", + func(t *testing.T) { + store := newStoreMock() + store.On("SelectLogs", mock.Anything, mock.Anything). + Return(mockLogfmtStreamIterator(1, 2), nil) + + queryClient := newQueryClientMock() + queryClient.On("Recv"). + Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 2)}), nil) + + ingesterClient := newQuerierClientMock() + ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything). + Return(queryClient, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + store, limits) + require.NoError(t, err) + + resp, err := querier.DetectedFields(ctx, &request) + require.NoError(t, err) + + detectedFields := resp.Fields + // log lines come from querier_mock_test.go + // message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t + assert.Len(t, detectedFields, 10) + + var nameField *logproto.DetectedField + for _, field := range detectedFields { + switch field.Label { + case "name_extracted": + nameField = field + } + } + + assert.NotNil(t, nameField) + assert.Equal(t, "name_extracted", nameField.Label) + assert.Equal(t, logproto.DetectedFieldString, nameField.Type) + assert.Equal(t, []string{"logfmt"}, nameField.Parsers) + assert.Equal(t, uint64(1), nameField.Cardinality) + }, + ) } func BenchmarkQuerierDetectedFields(b *testing.B) { From 38607805c2bee7fc967dc8adc7e16d398be8f802 Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 28 Aug 2024 13:10:00 -0600 Subject: [PATCH 2/4] chore: move extracted label down this avoids an extra iteration over streams --- pkg/querier/querier.go | 31 ++++++++++++++----------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 445ba10186cbd..fdbda8a8977de 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1101,17 +1101,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. return nil, err } - indexedLabels := map[string]struct{}{} - for _, stream := range streams { - lbls, err := syntax.ParseLabels(stream.Labels) - if err != nil { - continue - } - for _, lbl := range lbls { - indexedLabels[lbl.Name] = struct{}{} - } - } - detectedFields := parseDetectedFields(req.FieldLimit, streams) fields := make([]*logproto.DetectedField, len(detectedFields)) @@ -1123,13 +1112,8 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto. continue } - name := k - if _, ok := indexedLabels[k]; ok { - name = fmt.Sprintf("%s_extracted", k) - } - fields[fieldCount] = &logproto.DetectedField{ - Label: name, + Label: k, Type: v.fieldType, Cardinality: v.Estimate(), Sketch: sketch, @@ -1216,9 +1200,18 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p emtpyparser := "" for _, stream := range streams { + lbls, err := syntax.ParseLabels(stream.Labels) + if err != nil { + continue + } + for _, entry := range stream.Entries { structuredMetadata := getStructuredMetadata(entry) for k, vals := range structuredMetadata { + if lbls.Has(k) { + k = fmt.Sprintf("%s_extracted", k) + } + df, ok := detectedFields[k] if !ok && fieldCount < limit { df = newParsedFields(&emtpyparser) @@ -1245,6 +1238,10 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p detected, parser := parseLine(entry.Line) for k, vals := range detected { + if lbls.Has(k) { + k = fmt.Sprintf("%s_extracted", k) + } + df, ok := detectedFields[k] if !ok && fieldCount < limit { df = newParsedFields(parser) From 5cbec46d2b1027ab2fe3985a17a3284a15c9f19f Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Wed, 28 Aug 2024 13:24:11 -0600 Subject: [PATCH 3/4] chore: rely on parsers to add _extracted suffix --- pkg/querier/querier.go | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index fdbda8a8977de..db261e17ab533 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -3,7 +3,6 @@ package querier import ( "context" "flag" - "fmt" "net/http" "sort" "strconv" @@ -1200,18 +1199,9 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p emtpyparser := "" for _, stream := range streams { - lbls, err := syntax.ParseLabels(stream.Labels) - if err != nil { - continue - } - for _, entry := range stream.Entries { structuredMetadata := getStructuredMetadata(entry) for k, vals := range structuredMetadata { - if lbls.Has(k) { - k = fmt.Sprintf("%s_extracted", k) - } - df, ok := detectedFields[k] if !ok && fieldCount < limit { df = newParsedFields(&emtpyparser) @@ -1236,12 +1226,8 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p } } - detected, parser := parseLine(entry.Line) + detected, parser := parseLine(entry.Line, stream) for k, vals := range detected { - if lbls.Has(k) { - k = fmt.Sprintf("%s_extracted", k) - } - df, ok := detectedFields[k] if !ok && fieldCount < limit { df = newParsedFields(parser) @@ -1297,11 +1283,16 @@ func getStructuredMetadata(entry push.Entry) map[string][]string { return result } -func parseLine(line string) (map[string][]string, *string) { +func parseLine(line string, stream logproto.Stream) (map[string][]string, *string) { + streamLbls, err := syntax.ParseLabels(stream.Labels) + if err != nil { + streamLbls = labels.EmptyLabels() + } + parser := "logfmt" logFmtParser := logql_log.NewLogfmtParser(true, false) - lbls := logql_log.NewBaseLabelsBuilder().ForLabels(labels.EmptyLabels(), 0) + lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0) _, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls) if !logfmtSuccess || lbls.HasErr() { parser = "json" @@ -1315,6 +1306,10 @@ func parseLine(line string) (map[string][]string, *string) { parsedLabels := map[string]map[string]struct{}{} for _, lbl := range lbls.LabelsResult().Labels() { + // skip indexed labels, as we only want detected fields + if streamLbls.Has(lbl.Name) { + continue + } if values, ok := parsedLabels[lbl.Name]; ok { values[lbl.Value] = struct{}{} } else { From ffc64a619fa50b08a39560756b4932f7875ce3bf Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 29 Aug 2024 10:27:12 -0600 Subject: [PATCH 4/4] perf: move stream label extraction up a loop --- pkg/querier/querier.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index db261e17ab533..a03182ae2b942 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1199,6 +1199,11 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p emtpyparser := "" for _, stream := range streams { + streamLbls, err := syntax.ParseLabels(stream.Labels) + if err != nil { + streamLbls = labels.EmptyLabels() + } + for _, entry := range stream.Entries { structuredMetadata := getStructuredMetadata(entry) for k, vals := range structuredMetadata { @@ -1226,7 +1231,7 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p } } - detected, parser := parseLine(entry.Line, stream) + detected, parser := parseLine(entry.Line, streamLbls) for k, vals := range detected { df, ok := detectedFields[k] if !ok && fieldCount < limit { @@ -1283,12 +1288,7 @@ func getStructuredMetadata(entry push.Entry) map[string][]string { return result } -func parseLine(line string, stream logproto.Stream) (map[string][]string, *string) { - streamLbls, err := syntax.ParseLabels(stream.Labels) - if err != nil { - streamLbls = labels.EmptyLabels() - } - +func parseLine(line string, streamLbls labels.Labels) (map[string][]string, *string) { parser := "logfmt" logFmtParser := logql_log.NewLogfmtParser(true, false)