Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add _extracted suffix to detected fields conflicts
Browse files Browse the repository at this point in the history
detected fields that conflict with indexed labels need the "extracted"
suffix
trevorwhitney committed Aug 28, 2024

Verified

This commit was signed with the committer’s verified signature.
trevorwhitney Trevor Whitney
1 parent 8afdfd5 commit c515110
Showing 3 changed files with 92 additions and 24 deletions.
19 changes: 18 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package querier
import (
"context"
"flag"
"fmt"
"net/http"
"sort"
"strconv"
@@ -1100,6 +1101,17 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
return nil, err
}

indexedLabels := map[string]struct{}{}
for _, stream := range streams {
lbls, err := syntax.ParseLabels(stream.Labels)
if err != nil {
continue
}
for _, lbl := range lbls {
indexedLabels[lbl.Name] = struct{}{}
}
}

detectedFields := parseDetectedFields(req.FieldLimit, streams)

fields := make([]*logproto.DetectedField, len(detectedFields))
@@ -1111,8 +1123,13 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
continue
}

name := k
if _, ok := indexedLabels[k]; ok {
name = fmt.Sprintf("%s_extracted", k)
}

fields[fieldCount] = &logproto.DetectedField{
Label: k,
Label: name,
Type: v.fieldType,
Cardinality: v.Estimate(),
Sketch: sketch,
4 changes: 2 additions & 2 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
@@ -575,7 +575,7 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream
}

func mockLogfmtStream(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`)
return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`)
}

func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
@@ -586,7 +586,7 @@ func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Str
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`,
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
i,
i,
(i * 10),
93 changes: 72 additions & 21 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
@@ -1777,15 +1777,16 @@ func TestQuerier_DetectedFields(t *testing.T) {
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
assert.Len(t, detectedFields, 8)
expectedCardinality := map[string]uint64{
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
"name_extracted": 1,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
@@ -1821,17 +1822,18 @@ func TestQuerier_DetectedFields(t *testing.T) {
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 9)
assert.Len(t, detectedFields, 10)
expectedCardinality := map[string]uint64{
"variable": 5,
"constant": 1,
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
"variable": 5,
"constant": 1,
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
"name_extracted": 1,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
@@ -1867,7 +1869,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
assert.Len(t, detectedFields, 8)

var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField
for _, field := range detectedFields {
@@ -1923,7 +1925,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 9)
assert.Len(t, detectedFields, 10)

var messageField, countField, bytesField, durationField, floatField, evenField, constantField, variableField *logproto.DetectedField
for _, field := range detectedFields {
@@ -1955,7 +1957,56 @@ func TestQuerier_DetectedFields(t *testing.T) {
assert.Equal(t, []string{"logfmt"}, evenField.Parsers)
assert.Equal(t, []string{""}, constantField.Parsers)
assert.Equal(t, []string{""}, variableField.Parsers)
})
},
)

t.Run(
"adds _extracted suffix to detected fields that conflict with indexed labels",
func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 10)

var nameField *logproto.DetectedField
for _, field := range detectedFields {
switch field.Label {
case "name_extracted":
nameField = field
}
}

assert.NotNil(t, nameField)
assert.Equal(t, "name_extracted", nameField.Label)
assert.Equal(t, logproto.DetectedFieldString, nameField.Type)
assert.Equal(t, []string{"logfmt"}, nameField.Parsers)
assert.Equal(t, uint64(1), nameField.Cardinality)
},
)
}

func BenchmarkQuerierDetectedFields(b *testing.B) {

0 comments on commit c515110

Please sign in to comment.