Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support categorized labels in Tailing #11079

Merged
merged 6 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/ingester/tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log

sp := t.pipeline.ForStream(lbs)
for _, e := range stream.Entries {
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line)
newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line, logproto.FromLabelAdaptersToLabels(e.StructuredMetadata)...)
if !ok {
continue
}
Expand All @@ -163,8 +163,10 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log
streams[parsedLbs.Hash()] = stream
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: e.Timestamp,
Line: newLine,
Timestamp: e.Timestamp,
Line: newLine,
StructuredMetadata: logproto.FromLabelsToLabelAdapters(parsedLbs.StructuredMetadata()),
Parsed: logproto.FromLabelsToLabelAdapters(parsedLbs.Parsed()),
})
}
streamsResult := make([]*logproto.Stream, 0, len(streams))
Expand Down
144 changes: 141 additions & 3 deletions pkg/ingester/tailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,25 @@ func Test_dropstream(t *testing.T) {
}
}

type fakeTailServer struct{}
type fakeTailServer struct {
responses []logproto.TailResponse
}

func (f *fakeTailServer) Send(response *logproto.TailResponse) error {
f.responses = append(f.responses, *response)
return nil

}

func (f *fakeTailServer) Context() context.Context { return context.Background() }

func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil }
func (f *fakeTailServer) Context() context.Context { return context.Background() }
func (f *fakeTailServer) GetResponses() []logproto.TailResponse {
return f.responses
}

func (f *fakeTailServer) Reset() {
f.responses = f.responses[:0]
}

func Test_TailerSendRace(t *testing.T) {
tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10)
Expand Down Expand Up @@ -137,3 +152,126 @@ func Test_IsMatching(t *testing.T) {
})
}
}

func Test_StructuredMetadata(t *testing.T) {
lbs := makeRandomLabels()

for _, tc := range []struct {
name string
query string
sentStream logproto.Stream
expectedResponses []logproto.TailResponse
}{
{
// Optimization will make the same stream to be returned regardless of structured metadata.
name: "noop pipeline",
query: `{app="foo"}`,
sentStream: logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
expectedResponses: []logproto.TailResponse{
{
Stream: &logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
DroppedStreams: nil,
},
},
},
{
name: "parse pipeline labels",
query: `{app="foo"} | logfmt`,
sentStream: logproto.Stream{
Labels: lbs.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
},
expectedResponses: []logproto.TailResponse{
{
Stream: &logproto.Stream{
Labels: labels.NewBuilder(lbs).Set("foo", "1").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")),
},
},
},
DroppedStreams: nil,
},
{
Stream: &logproto.Stream{
Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "2").Labels().String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2")),
},
},
},
DroppedStreams: nil,
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
var server fakeTailServer
tail, err := newTailer("foo", tc.query, &server, 10)
require.NoError(t, err)

var wg sync.WaitGroup
wg.Add(1)
go func() {
tail.loop()
wg.Done()
}()

tail.send(tc.sentStream, lbs)

// Wait for the stream to be received by the server.
require.Eventually(t, func() bool {
return len(server.GetResponses()) > 0
}, 30*time.Second, 1*time.Second, "stream was not received")

responses := server.GetResponses()
require.ElementsMatch(t, tc.expectedResponses, responses)

tail.close()
wg.Wait()
})
}
}
71 changes: 71 additions & 0 deletions pkg/iter/categorized_labels_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package iter

import (
"fmt"

"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/syntax"
)

type categorizeLabelsIterator struct {
EntryIterator
currEntry logproto.Entry
currStreamLabels string
currHash uint64
currErr error
}

func NewCategorizeLabelsIterator(wrap EntryIterator) EntryIterator {
return &categorizeLabelsIterator{
EntryIterator: wrap,
}
}

func (c *categorizeLabelsIterator) Next() bool {
if !c.EntryIterator.Next() {
return false
}

c.currEntry = c.Entry()
if len(c.currEntry.StructuredMetadata) == 0 && len(c.currEntry.Parsed) == 0 {
c.currStreamLabels = c.EntryIterator.Labels()
c.currHash = c.EntryIterator.StreamHash()
return true
}

// We need to remove the structured metadata labels and parsed labels from the stream labels.
streamLabels := c.EntryIterator.Labels()
lbls, err := syntax.ParseLabels(streamLabels)
if err != nil {
c.currErr = fmt.Errorf("failed to parse series labels to categorize labels: %w", err)
return false
}

builder := labels.NewBuilder(lbls)
for _, label := range c.currEntry.StructuredMetadata {
builder.Del(label.Name)
}
for _, label := range c.currEntry.Parsed {
builder.Del(label.Name)
}

newLabels := builder.Labels()
c.currStreamLabels = newLabels.String()
c.currHash = newLabels.Hash()

return true
}

func (c *categorizeLabelsIterator) Error() error {
return c.currErr
}

func (c *categorizeLabelsIterator) Labels() string {
return c.currStreamLabels
}

func (c *categorizeLabelsIterator) StreamHash() uint64 {
return c.currHash
}
145 changes: 145 additions & 0 deletions pkg/iter/categorized_labels_iterator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package iter

import (
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
)

func TestNewCategorizeLabelsIterator(t *testing.T) {
for _, tc := range []struct {
name string
inner EntryIterator
expectedStreams []logproto.Stream
}{
{
name: "no structured metadata nor parsed labels",
inner: NewSortEntryIterator([]EntryIterator{
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
},
},
}),
}, logproto.FORWARD),
expectedStreams: []logproto.Stream{
{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
},
},
},
},
},
{
name: "structured metadata and parsed labels",
inner: NewSortEntryIterator([]EntryIterator{
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "traceID", "123").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "foo", "3").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 3),
Line: "foo=3",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")),
},
},
}),
NewStreamIterator(logproto.Stream{
Labels: labels.FromStrings("namespace", "default", "traceID", "123", "foo", "4").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 4),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")),
},
},
}),
}, logproto.FORWARD),
expectedStreams: []logproto.Stream{
{
Labels: labels.FromStrings("namespace", "default").String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "foo=1",
},
{
Timestamp: time.Unix(0, 2),
Line: "foo=2",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
},
{
Timestamp: time.Unix(0, 3),
Line: "foo=3",
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")),
},
{
Timestamp: time.Unix(0, 4),
Line: "foo=4",
StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")),
Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")),
},
},
},
},
},
} {
t.Run(tc.name, func(t *testing.T) {
itr := NewCategorizeLabelsIterator(tc.inner)

streamsEntries := make(map[string][]logproto.Entry)
for itr.Next() {
streamsEntries[itr.Labels()] = append(streamsEntries[itr.Labels()], itr.Entry())
require.NoError(t, itr.Error())
}

var streams []logproto.Stream
for lbls, entries := range streamsEntries {
streams = append(streams, logproto.Stream{
Labels: lbls,
Entries: entries,
})
}

require.ElementsMatch(t, tc.expectedStreams, streams)
})
}
}
Loading