From 6e982d189c0620a854c41f3d9b8699868e517e7d Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 27 Oct 2023 17:07:29 +0200 Subject: [PATCH 1/6] Marshal --- pkg/util/marshal/marshal.go | 36 +++++++-- pkg/util/marshal/marshal_test.go | 105 ++++++++++++++++++++++++--- pkg/util/marshal/query.go | 58 ++++++++++++++- pkg/util/marshal/tail.go | 26 ------- pkg/util/unmarshal/unmarshal_test.go | 3 +- 5 files changed, 181 insertions(+), 47 deletions(-) diff --git a/pkg/util/marshal/marshal.go b/pkg/util/marshal/marshal.go index 562808b300232..fd28907d0579a 100644 --- a/pkg/util/marshal/marshal.go +++ b/pkg/util/marshal/marshal.go @@ -81,18 +81,38 @@ type WebsocketWriter interface { WriteMessage(int, []byte) error } -// WriteTailResponseJSON marshals the legacy.TailResponse to v1 loghttp JSON and -// then writes it to the provided connection. -func WriteTailResponseJSON(r legacy.TailResponse, c WebsocketWriter) error { - v1Response, err := NewTailResponse(r) +type websocketJSONWriter struct { + WebsocketWriter +} + +func (w *websocketJSONWriter) Write(p []byte) (n int, err error) { + err = w.WriteMessage(websocket.TextMessage, p) if err != nil { - return err + return 0, err } - data, err := jsoniter.Marshal(v1Response) + return len(p), nil +} + +func NewWebsocketJSONWriter(ws WebsocketWriter) io.Writer { + return &websocketJSONWriter{ws} +} + +// WriteTailResponseJSON marshals the legacy.TailResponse to v1 loghttp JSON and +// then writes it to the provided writer. +func WriteTailResponseJSON(r legacy.TailResponse, w io.Writer, encodeFlags httpreq.EncodingFlags) error { + // TODO(salvacorts): I think we can dismiss the new TailResponse and be an alias of legacy.TailResponse + // v1Response, err := NewTailResponse(r) + // if err != nil { + // return err + // } + s := jsoniter.ConfigFastest.BorrowStream(w) + defer jsoniter.ConfigFastest.ReturnStream(s) + + err := EncodeTailResult(r, s, encodeFlags) if err != nil { - return err + return fmt.Errorf("could not write JSON tail response: %w", err) } - return c.WriteMessage(websocket.TextMessage, data) + return s.Flush() } // WriteSeriesResponseJSON marshals a logproto.SeriesResponse to v1 loghttp JSON and then diff --git a/pkg/util/marshal/marshal_test.go b/pkg/util/marshal/marshal_test.go index 070f7b0ed4012..fa8cc5d8aa3e5 100644 --- a/pkg/util/marshal/marshal_test.go +++ b/pkg/util/marshal/marshal_test.go @@ -410,7 +410,6 @@ var labelTests = []struct { } // covers responses from /loki/api/v1/tail -// TODO(salvacorts): Support encoding flags. And fix serialized structured metadata labels which shouldn't be there unless the categorize flag is set. var tailTests = []struct { actual legacy.TailResponse expected string @@ -451,7 +450,7 @@ var tailTests = []struct { }, "values":[ [ "123456789012345", "super line"], - [ "123456789012346", "super line with labels", { "foo": "a", "bar": "b" } ] + [ "123456789012346", "super line with labels" ] ] } ], @@ -467,6 +466,90 @@ var tailTests = []struct { }, } +var tailTestWithEncodingFlags = []struct { + actual legacy.TailResponse + encodingFlags httpreq.EncodingFlags + expected string +}{ + { + actual: legacy.TailResponse{ + Streams: []logproto.Stream{ + { + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 123456789012345), + Line: "super line", + }, + { + Timestamp: time.Unix(0, 123456789012346), + Line: "super line with labels", + StructuredMetadata: []logproto.LabelAdapter{ + {Name: "foo", Value: "a"}, + {Name: "bar", Value: "b"}, + }, + }, + { + Timestamp: time.Unix(0, 123456789012347), + Line: "super line with labels msg=text", + StructuredMetadata: []logproto.LabelAdapter{ + {Name: "foo", Value: "a"}, + {Name: "bar", Value: "b"}, + }, + Parsed: []logproto.LabelAdapter{ + {Name: "msg", Value: "text"}, + }, + }, + }, + Labels: `{test="test"}`, + }, + }, + DroppedEntries: []legacy.DroppedEntry{ + { + Timestamp: time.Unix(0, 123456789022345), + Labels: "{test=\"test\"}", + }, + }, + }, + encodingFlags: httpreq.NewEncodingFlags(httpreq.FlagCategorizeLabels), + expected: fmt.Sprintf(`{ + "streams": [ + { + "stream": { + "test": "test" + }, + "values":[ + [ "123456789012345", "super line"], + [ "123456789012346", "super line with labels", { + "structuredMetadata": { + "foo": "a", + "bar": "b" + } + }], + [ "123456789012347", "super line with labels msg=text", { + "structuredMetadata": { + "foo": "a", + "bar": "b" + }, + "parsed": { + "msg": "text" + } + }] + ] + } + ], + "dropped_entries": [ + { + "timestamp": "123456789022345", + "labels": { + "test": "test" + } + } + ], + "encodingFlags": ["%s"] + }`, httpreq.FlagCategorizeLabels), + }, +} + func Test_WriteQueryResponseJSON(t *testing.T) { for i, queryTest := range queryTests { var b bytes.Buffer @@ -515,15 +598,18 @@ func Test_WriteQueryResponseJSONWithError(t *testing.T) { func Test_MarshalTailResponse(t *testing.T) { for i, tailTest := range tailTests { - // convert logproto to model objects - model, err := NewTailResponse(tailTest.actual) + var b bytes.Buffer + err := WriteTailResponseJSON(tailTest.actual, &b, nil) require.NoError(t, err) - // marshal model object - bytes, err := json.Marshal(model) + require.JSONEqf(t, tailTest.expected, b.String(), "Tail Test %d failed", i) + } + for i, tailTest := range tailTestWithEncodingFlags { + var b bytes.Buffer + err := WriteTailResponseJSON(tailTest.actual, &b, tailTest.encodingFlags) require.NoError(t, err) - require.JSONEqf(t, tailTest.expected, string(bytes), "Tail Test %d failed", i) + require.JSONEqf(t, tailTest.expected, b.String(), "Tail Test %d failed", i) } } @@ -925,10 +1011,11 @@ func Test_WriteTailResponseJSON(t *testing.T) { {Timestamp: time.Unix(0, 2), Labels: `{app="dropped"}`}, }, }, - WebsocketWriterFunc(func(i int, b []byte) error { + NewWebsocketJSONWriter(WebsocketWriterFunc(func(i int, b []byte) error { require.Equal(t, `{"streams":[{"stream":{"app":"foo"},"values":[["1","foobar"]]}],"dropped_entries":[{"timestamp":"2","labels":{"app":"dropped"}}]}`, string(b)) return nil - }), + })), + nil, ), ) } diff --git a/pkg/util/marshal/query.go b/pkg/util/marshal/query.go index fb6aead8a76ee..164a792b97234 100644 --- a/pkg/util/marshal/query.go +++ b/pkg/util/marshal/query.go @@ -5,6 +5,7 @@ import ( "strconv" "unsafe" + legacy "github.com/grafana/loki/pkg/loghttp/legacy" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -191,6 +192,60 @@ func EncodeResult(data parser.Value, statistics stats.Result, s *jsoniter.Stream return nil } +func EncodeTailResult(data legacy.TailResponse, s *jsoniter.Stream, encodeFlags httpreq.EncodingFlags) error { + s.WriteObjectStart() + s.WriteObjectField("streams") + err := encodeStreams(data.Streams, s, encodeFlags) + if err != nil { + return err + } + + if len(data.DroppedEntries) > 0 { + s.WriteMore() + s.WriteObjectField("dropped_entries") + err = encodeDroppedEntries(data.DroppedEntries, s) + if err != nil { + return err + } + } + + if len(encodeFlags) > 0 { + s.WriteMore() + s.WriteObjectField("encodingFlags") + if err := encodeEncodingFlags(s, encodeFlags); err != nil { + return err + } + } + + s.WriteObjectEnd() + return nil +} + +func encodeDroppedEntries(entries []legacy.DroppedEntry, s *jsoniter.Stream) error { + s.WriteArrayStart() + defer s.WriteArrayEnd() + + for i, entry := range entries { + if i > 0 { + s.WriteMore() + } + + ds, err := NewDroppedStream(&entry) + if err != nil { + return err + } + + jsonEntry, err := ds.MarshalJSON() + if err != nil { + return err + } + + s.WriteRaw(string(jsonEntry)) + } + + return nil +} + func encodeEncodingFlags(s *jsoniter.Stream, flags httpreq.EncodingFlags) error { s.WriteArrayStart() defer s.WriteArrayEnd() @@ -329,7 +384,6 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream, encodeFlags httpre encodeLabels(logproto.FromLabelsToLabelAdapters(lbls), s) s.WriteObjectEnd() - s.Flush() s.WriteMore() s.WriteObjectField("values") @@ -373,8 +427,6 @@ func encodeStream(stream logproto.Stream, s *jsoniter.Stream, encodeFlags httpre s.WriteObjectEnd() } s.WriteArrayEnd() - - s.Flush() } s.WriteArrayEnd() diff --git a/pkg/util/marshal/tail.go b/pkg/util/marshal/tail.go index 5655aee09c288..222b76c046b7d 100644 --- a/pkg/util/marshal/tail.go +++ b/pkg/util/marshal/tail.go @@ -5,32 +5,6 @@ import ( legacy "github.com/grafana/loki/pkg/loghttp/legacy" ) -// NewTailResponse constructs a TailResponse from a legacy.TailResponse -func NewTailResponse(r legacy.TailResponse) (loghttp.TailResponse, error) { - var err error - ret := loghttp.TailResponse{ - Streams: make([]loghttp.Stream, len(r.Streams)), - DroppedStreams: make([]loghttp.DroppedStream, len(r.DroppedEntries)), - } - - for i, s := range r.Streams { - ret.Streams[i], err = NewStream(s) - - if err != nil { - return loghttp.TailResponse{}, err - } - } - - for i, d := range r.DroppedEntries { - ret.DroppedStreams[i], err = NewDroppedStream(&d) - if err != nil { - return loghttp.TailResponse{}, err - } - } - - return ret, nil -} - // NewDroppedStream constructs a DroppedStream from a legacy.DroppedEntry func NewDroppedStream(s *legacy.DroppedEntry) (loghttp.DroppedStream, error) { l, err := NewLabelSet(s.Labels) diff --git a/pkg/util/unmarshal/unmarshal_test.go b/pkg/util/unmarshal/unmarshal_test.go index 9fdaf27512127..582dc4de80188 100644 --- a/pkg/util/unmarshal/unmarshal_test.go +++ b/pkg/util/unmarshal/unmarshal_test.go @@ -224,6 +224,7 @@ func (ws *websocket) ReadMessage() (int, []byte, error) { func Test_ReadTailResponse(t *testing.T) { ws := &websocket{} + wsJson := marshal.NewWebsocketJSONWriter(ws) require.NoError(t, marshal.WriteTailResponseJSON(legacy_loghttp.TailResponse{ Streams: []logproto.Stream{ {Labels: `{app="bar"}`, Entries: []logproto.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}}}, @@ -231,7 +232,7 @@ func Test_ReadTailResponse(t *testing.T) { DroppedEntries: []legacy_loghttp.DroppedEntry{ {Timestamp: time.Unix(0, 1), Labels: `{app="foo"}`}, }, - }, ws)) + }, wsJson, nil)) res := &loghttp.TailResponse{} require.NoError(t, ReadTailResponseJSON(res, ws)) From 6bc1dc42820305f05db1c8759bdbf1df2b566df2 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Fri, 27 Oct 2023 17:07:56 +0200 Subject: [PATCH 2/6] Tail --- pkg/ingester/tailer.go | 10 +- pkg/ingester/tailer_test.go | 199 +++++++++++++++++++++++++- pkg/querier/http.go | 11 +- pkg/querier/querier.go | 5 +- pkg/querier/querier_mock_test.go | 2 +- pkg/querier/querier_test.go | 4 +- pkg/querier/tail.go | 42 +++++- pkg/querier/tail_test.go | 219 ++++++++++++++++++++++++++++- pkg/util/httpreq/encoding_flags.go | 20 ++- 9 files changed, 477 insertions(+), 35 deletions(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 106fe25bbfe4d..51e995807bbf5 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -136,6 +136,8 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream { // Optimization: skip filtering entirely, if no filter is set + // Note: returns the stream regardless of structured metadata. This is fine, + // the querier will take care of properly returning the structured metadata. if log.IsNoopPipeline(t.pipeline) { return []*logproto.Stream{&stream} } @@ -151,7 +153,7 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log sp := t.pipeline.ForStream(lbs) for _, e := range stream.Entries { - newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line) + newLine, parsedLbs, ok := sp.ProcessString(e.Timestamp.UnixNano(), e.Line, logproto.FromLabelAdaptersToLabels(e.StructuredMetadata)...) if !ok { continue } @@ -163,8 +165,10 @@ func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*log streams[parsedLbs.Hash()] = stream } stream.Entries = append(stream.Entries, logproto.Entry{ - Timestamp: e.Timestamp, - Line: newLine, + Timestamp: e.Timestamp, + Line: newLine, + StructuredMetadata: logproto.FromLabelsToLabelAdapters(parsedLbs.StructuredMetadata()), + Parsed: logproto.FromLabelsToLabelAdapters(parsedLbs.Parsed()), }) } streamsResult := make([]*logproto.Stream, 0, len(streams)) diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 9b06e4560018a..e814beea3411a 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -93,13 +93,28 @@ func Test_dropstream(t *testing.T) { } } -type fakeTailServer struct{} +type fakeTailServer struct { + responses []logproto.TailResponse +} + +func (f *fakeTailServer) Send(response *logproto.TailResponse) error { + f.responses = append(f.responses, *response) + return nil + +} + +func (f *fakeTailServer) Context() context.Context { return context.Background() } + +func (f *fakeTailServer) GetResponses() []logproto.TailResponse { + return f.responses +} -func (f *fakeTailServer) Send(*logproto.TailResponse) error { return nil } -func (f *fakeTailServer) Context() context.Context { return context.Background() } +func (f *fakeTailServer) Reset() { + f.responses = f.responses[:0] +} func Test_TailerSendRace(t *testing.T) { - tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10) + tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10, nil) require.NoError(t, err) var wg sync.WaitGroup @@ -137,3 +152,179 @@ func Test_IsMatching(t *testing.T) { }) } } + +func Test_CategorizeLabels(t *testing.T) { + lbs := makeRandomLabels() + + for _, tc := range []struct { + name string + query string + sentStream logproto.Stream + expectedResponses []logproto.TailResponse + }{ + { + // Optimization will make the same stream to be returned regardless of structured metadata. + name: "noop pipeline", + query: `{app="foo"}`, + sentStream: logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + expectedResponses: []logproto.TailResponse{ + { + Stream: &logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + DroppedStreams: nil, + }, + }, + }, + { + name: "filter pipeline", + query: `{app="foo"} |= "foo"`, + sentStream: logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + { + Timestamp: time.Unix(0, 3), + Line: "bar", + }, + }, + }, + expectedResponses: []logproto.TailResponse{ + { + Stream: &logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + }, + }, + DroppedStreams: nil, + }, + { + Stream: &logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + DroppedStreams: nil, + }, + }, + }, + { + name: "parse pipeline labels", + query: `{app="foo"} | logfmt`, + sentStream: logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + expectedResponses: []logproto.TailResponse{ + { + Stream: &logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("foo", "1").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "1")), + }, + }, + }, + DroppedStreams: nil, + }, + { + Stream: &logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "2").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "2")), + }, + }, + }, + DroppedStreams: nil, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var server fakeTailServer + tail, err := newTailer("foo", tc.query, &server, 10, nil) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(1) + go func() { + tail.loop() + wg.Done() + }() + + tail.send(tc.sentStream, lbs) + + // Wait for the stream to be received by the server. + require.Eventually(t, func() bool { + return len(server.GetResponses()) > 0 + }, 30*time.Second, 1*time.Second, "stream was not received") + + responses := server.GetResponses() + require.Equal(t, len(responses), len(tc.expectedResponses)) + for i := range responses { + require.Equal(t, tc.expectedResponses[i].Stream, responses[i].Stream) + require.Equal(t, tc.expectedResponses[i].DroppedStreams, responses[i].DroppedStreams) + } + + tail.close() + wg.Wait() + }) + } +} diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 61b2f640b44b7..5956d069e1a8f 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -146,6 +146,9 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { return } + encodingFlags := httpreq.ExtractEncodingFlags(r) + version := loghttp.GetVersion(r.RequestURI) + conn, err := upgrader.Upgrade(w, r, nil) if err != nil { level.Error(logger).Log("msg", "Error in upgrading websocket", "err", err) @@ -164,7 +167,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { } }() - tailer, err := q.querier.Tail(r.Context(), req) + tailer, err := q.querier.Tail(r.Context(), req, encodingFlags.Has(httpreq.FlagCategorizeLabels)) if err != nil { if err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseInternalServerErr, err.Error())); err != nil { level.Error(logger).Log("msg", "Error connecting to ingesters for tailing", "err", err) @@ -180,6 +183,8 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { ticker := time.NewTicker(wsPingPeriod) defer ticker.Stop() + connJsonWriter := marshal.NewWebsocketJSONWriter(conn) + var response *loghttp_legacy.TailResponse responseChan := tailer.getResponseChan() closeErrChan := tailer.getCloseErrorChan() @@ -210,8 +215,8 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { select { case response = <-responseChan: var err error - if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 { - err = marshal.WriteTailResponseJSON(*response, conn) + if version == loghttp.VersionV1 { + err = marshal.WriteTailResponseJSON(*response, connJsonWriter, encodingFlags) } else { err = marshal_legacy.WriteTailResponseJSON(*response, conn) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 8295f02c644c0..23e2c89f5e2b0 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -88,7 +88,7 @@ type Querier interface { logql.Querier Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) - Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) + Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error) IndexStats(ctx context.Context, req *loghttp.RangeQuery) (*stats.Stats, error) Volume(ctx context.Context, req *logproto.VolumeRequest) (*logproto.VolumeResponse, error) } @@ -434,7 +434,7 @@ func (*SingleTenantQuerier) Check(_ context.Context, _ *grpc_health_v1.HealthChe } // Tail keeps getting matching logs from all ingesters for given query -func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) { +func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailRequest, categorizedLabels bool) (*Tailer, error) { err := q.checkTailRequestLimit(ctx) if err != nil { return nil, err @@ -496,6 +496,7 @@ func (q *SingleTenantQuerier) Tail(ctx context.Context, req *logproto.TailReques }, q.cfg.TailMaxDuration, tailerWaitEntryThrottle, + categorizedLabels, q.metrics, ), nil } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index aef47440326b9..fa2de7590465f 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -530,7 +530,7 @@ func (q *querierMock) Series(ctx context.Context, req *logproto.SeriesRequest) ( return args.Get(0).(func() *logproto.SeriesResponse)(), args.Error(1) } -func (q *querierMock) Tail(_ context.Context, _ *logproto.TailRequest) (*Tailer, error) { +func (q *querierMock) Tail(_ context.Context, _ *logproto.TailRequest, _ bool) (*Tailer, error) { return nil, errors.New("querierMock.Tail() has not been mocked") } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index d89d24a1751b0..33493be0b9e2e 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -117,7 +117,7 @@ func TestQuerier_Tail_QueryTimeoutConfigFlag(t *testing.T) { require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "test") - _, err = q.Tail(ctx, &request) + _, err = q.Tail(ctx, &request, false) require.NoError(t, err) calls := ingesterClient.GetMockedCallsByMethod("Query") @@ -511,7 +511,7 @@ func TestQuerier_concurrentTailLimits(t *testing.T) { require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "test") - _, err = q.Tail(ctx, &request) + _, err = q.Tail(ctx, &request, false) assert.Equal(t, testData.expectedError, err) }) } diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 09e785a13b3e6..66aa001236f91 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -7,7 +7,9 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/grafana/loki/pkg/logql/syntax" "github.com/pkg/errors" + "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/iter" loghttp "github.com/grafana/loki/pkg/loghttp/legacy" @@ -50,11 +52,12 @@ type Tailer struct { querierTailClients map[string]logproto.Querier_TailClient // addr -> grpc clients for tailing logs from ingesters querierTailClientsMtx sync.RWMutex - stopped bool - delayFor time.Duration - responseChan chan *loghttp.TailResponse - closeErrChan chan error - tailMaxDuration time.Duration + stopped bool + delayFor time.Duration + responseChan chan *loghttp.TailResponse + closeErrChan chan error + tailMaxDuration time.Duration + categorizeLabels bool // if we are not seeing any response from ingester, // how long do we want to wait by going into sleep @@ -247,8 +250,31 @@ func (t *Tailer) next() bool { return false } - t.currEntry = t.openStreamIterator.Entry() - t.currLabels = t.openStreamIterator.Labels() + entry := t.openStreamIterator.Entry() + streamLabels := t.openStreamIterator.Labels() + + // If categorizeLabels is true, We need to remove the structured metadata labels and parsed labels from the stream labels. + // TODO(salvacorts): If this is too slow, provided this is in the hot path, we can consider doing this in the iterator. + if t.categorizeLabels && (len(entry.StructuredMetadata) > 0 || len(entry.Parsed) > 0) { + lbls, err := syntax.ParseLabels(streamLabels) + if err != nil { + // TODO(salvacorts): Print log message here. + return false + } + + builder := labels.NewBuilder(lbls) + for _, label := range entry.StructuredMetadata { + builder.Del(label.Name) + } + for _, label := range entry.Parsed { + builder.Del(label.Name) + } + + streamLabels = builder.Labels().String() + } + + t.currEntry = entry + t.currLabels = streamLabels t.recordStream(t.openStreamIterator.StreamHash()) return true @@ -305,6 +331,7 @@ func newTailer( tailDisconnectedIngesters func([]string) (map[string]logproto.Querier_TailClient, error), tailMaxDuration time.Duration, waitEntryThrottle time.Duration, + categorizeLabels bool, m *Metrics, ) *Tailer { t := Tailer{ @@ -317,6 +344,7 @@ func newTailer( tailDisconnectedIngesters: tailDisconnectedIngesters, tailMaxDuration: tailMaxDuration, waitEntryThrottle: waitEntryThrottle, + categorizeLabels: categorizeLabels, metrics: m, } diff --git a/pkg/querier/tail_test.go b/pkg/querier/tail_test.go index a2186cd614f73..94f857a3bdfc6 100644 --- a/pkg/querier/tail_test.go +++ b/pkg/querier/tail_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -161,7 +162,7 @@ func TestTailer(t *testing.T) { tailClients["test"] = test.tailClient } - tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, NewMetrics(nil)) + tailer := newTailer(0, tailClients, test.historicEntries, tailDisconnectedIngesters, timeout, throttle, false, NewMetrics(nil)) defer tailer.close() test.tester(t, tailer, test.tailClient) @@ -169,6 +170,222 @@ func TestTailer(t *testing.T) { } } +func TestCategorizedLabels(t *testing.T) { + lbs := labels.FromStrings("app", "foo") + historicalEntries := iter.NewStreamIterator(logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "foo=1", + }, + { + Timestamp: time.Unix(2, 0), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }) + tailClients := map[string]*tailClientMock{ + "test1": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3, 0), + Line: "foo=3", + }, + }, + })), + "test2": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + })), + "test3": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), + }, + }, + })), + } + + for _, tc := range []struct { + name string + categorizeLabels bool + historicEntries iter.EntryIterator + tailClients map[string]*tailClientMock + expectedResponses []*loghttp.TailResponse + }{ + { + name: "without categorize", + categorizeLabels: false, + historicEntries: historicalEntries, + tailClients: tailClients, + expectedResponses: []*loghttp.TailResponse{ + { + Streams: logproto.Streams{ + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "foo=1", + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(2, 0), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + }, + }, + { + Streams: logproto.Streams{ + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3, 0), + Line: "foo=3", + }, + }, + }, + { + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + { + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), + }, + }, + }, + }, + }, + }, + }, + { + name: "categorize", + categorizeLabels: true, + historicEntries: historicalEntries, + tailClients: tailClients, + expectedResponses: []*loghttp.TailResponse{ + { + Streams: logproto.Streams{ + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1, 0), + Line: "foo=1", + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(2, 0), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + }, + }, + { + Streams: logproto.Streams{ + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3, 0), + Line: "foo=3", + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), + }, + }, + }, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + tailDisconnectedIngesters := func([]string) (map[string]logproto.Querier_TailClient, error) { + return map[string]logproto.Querier_TailClient{}, nil + } + + tailClients := map[string]logproto.Querier_TailClient{} + for k, v := range tc.tailClients { + tailClients[k] = v + } + + tailer := newTailer(0, tailClients, tc.historicEntries, tailDisconnectedIngesters, timeout, throttle, tc.categorizeLabels, NewMetrics(nil)) + defer tailer.close() + + // Make tail clients receive their responses + for _, client := range tc.tailClients { + client.triggerRecv() + } + + err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer) + require.NoError(t, err) + + responses, err := readFromTailer(tailer, maxEntriesPerTailResponse*maxBufferedTailResponses) + require.NoError(t, err) + + require.Equal(t, tc.expectedResponses, responses) + }) + } +} + func readFromTailer(tailer *Tailer, maxEntries int) ([]*loghttp.TailResponse, error) { responses := make([]*loghttp.TailResponse, 0) entriesCount := 0 diff --git a/pkg/util/httpreq/encoding_flags.go b/pkg/util/httpreq/encoding_flags.go index 89656618eb60d..232f2bc4e0f99 100644 --- a/pkg/util/httpreq/encoding_flags.go +++ b/pkg/util/httpreq/encoding_flags.go @@ -71,11 +71,7 @@ func AddEncodingFlagsToContext(ctx context.Context, flags EncodingFlags) context func ExtractEncodingFlags(req *http.Request) EncodingFlags { rawValue := req.Header.Get(LokiEncodingFlagsHeader) - if rawValue == "" { - return nil - } - - return parseEncodingFlags(rawValue) + return ParseEncodingFlags(rawValue) } func ExtractEncodingFlagsFromProto(req *httpgrpc.HTTPRequest) EncodingFlags { @@ -83,11 +79,7 @@ func ExtractEncodingFlagsFromProto(req *httpgrpc.HTTPRequest) EncodingFlags { for _, header := range req.GetHeaders() { if header.GetKey() == LokiEncodingFlagsHeader { rawValue = header.GetValues()[0] - if rawValue == "" { - return nil - } - - return parseEncodingFlags(rawValue) + return ParseEncodingFlags(rawValue) } } @@ -100,10 +92,14 @@ func ExtractEncodingFlagsFromCtx(ctx context.Context) EncodingFlags { return nil } - return parseEncodingFlags(rawValue) + return ParseEncodingFlags(rawValue) } -func parseEncodingFlags(rawFlags string) EncodingFlags { +func ParseEncodingFlags(rawFlags string) EncodingFlags { + if rawFlags == "" { + return nil + } + split := strings.Split(rawFlags, EncodeFlagsDelimiter) flags := make(EncodingFlags, len(split)) for _, rawFlag := range split { From 1100835614373b360212a69d24aaede164a76d76 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 30 Oct 2023 12:47:23 +0100 Subject: [PATCH 3/6] Use iterator --- pkg/ingester/tailer_test.go | 4 +- pkg/iter/categorized_labels_iterator.go | 70 +++++ pkg/iter/categorized_labels_iterator_test.go | 144 +++++++++++ pkg/logql/engine.go | 33 +-- pkg/querier/tail.go | 43 +--- pkg/querier/tail_test.go | 256 +++++++++---------- 6 files changed, 361 insertions(+), 189 deletions(-) create mode 100644 pkg/iter/categorized_labels_iterator.go create mode 100644 pkg/iter/categorized_labels_iterator_test.go diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index e814beea3411a..af0aa501c08e0 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -114,7 +114,7 @@ func (f *fakeTailServer) Reset() { } func Test_TailerSendRace(t *testing.T) { - tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10, nil) + tail, err := newTailer("foo", `{app="foo"} |= "foo"`, &fakeTailServer{}, 10) require.NoError(t, err) var wg sync.WaitGroup @@ -299,7 +299,7 @@ func Test_CategorizeLabels(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { var server fakeTailServer - tail, err := newTailer("foo", tc.query, &server, 10, nil) + tail, err := newTailer("foo", tc.query, &server, 10) require.NoError(t, err) var wg sync.WaitGroup diff --git a/pkg/iter/categorized_labels_iterator.go b/pkg/iter/categorized_labels_iterator.go new file mode 100644 index 0000000000000..1c673b9b155cf --- /dev/null +++ b/pkg/iter/categorized_labels_iterator.go @@ -0,0 +1,70 @@ +package iter + +import ( + "fmt" + + "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/syntax" + "github.com/prometheus/prometheus/model/labels" +) + +type categorizeLabelsIterator struct { + EntryIterator + currEntry logproto.Entry + currStreamLabels string + currHash uint64 + currErr error +} + +func NewCategorizeLabelsIterator(wrap EntryIterator) EntryIterator { + return &categorizeLabelsIterator{ + EntryIterator: wrap, + } +} + +func (c *categorizeLabelsIterator) Next() bool { + if !c.EntryIterator.Next() { + return false + } + + c.currEntry = c.Entry() + if len(c.currEntry.StructuredMetadata) == 0 && len(c.currEntry.Parsed) == 0 { + c.currStreamLabels = c.EntryIterator.Labels() + c.currHash = c.EntryIterator.StreamHash() + return true + } + + // We need to remove the structured metadata labels and parsed labels from the stream labels. + streamLabels := c.EntryIterator.Labels() + lbls, err := syntax.ParseLabels(streamLabels) + if err != nil { + c.currErr = fmt.Errorf("failed to parse series labels to categorize labels: %w", err) + return false + } + + builder := labels.NewBuilder(lbls) + for _, label := range c.currEntry.StructuredMetadata { + builder.Del(label.Name) + } + for _, label := range c.currEntry.Parsed { + builder.Del(label.Name) + } + + newLabels := builder.Labels() + c.currStreamLabels = newLabels.String() + c.currHash = newLabels.Hash() + + return true +} + +func (c *categorizeLabelsIterator) Error() error { + return c.currErr +} + +func (c *categorizeLabelsIterator) Labels() string { + return c.currStreamLabels +} + +func (c *categorizeLabelsIterator) StreamHash() uint64 { + return c.currHash +} diff --git a/pkg/iter/categorized_labels_iterator_test.go b/pkg/iter/categorized_labels_iterator_test.go new file mode 100644 index 0000000000000..5a5d909604510 --- /dev/null +++ b/pkg/iter/categorized_labels_iterator_test.go @@ -0,0 +1,144 @@ +package iter + +import ( + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" +) + +func TestNewCategorizeLabelsIterator(t *testing.T) { + for _, tc := range []struct { + name string + inner EntryIterator + expectedStreams []logproto.Stream + }{ + { + name: "no structured metadata nor parsed labels", + inner: NewSortEntryIterator([]EntryIterator{ + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + }, + }, + }), + }, logproto.FORWARD), + expectedStreams: []logproto.Stream{ + { + Labels: labels.FromStrings("namespace", "default").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + }, + }, + }, + }, + }, + { + name: "structured metadata and parsed labels", + inner: NewSortEntryIterator([]EntryIterator{ + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + }, + }), + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default", "traceID", "123").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + }), + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default", "foo", "3").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 3), + Line: "foo=3", + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")), + }, + }, + }), + NewStreamIterator(logproto.Stream{ + Labels: labels.FromStrings("namespace", "default", "traceID", "123", "foo", "4").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 4), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")), + }, + }, + }), + }, logproto.FORWARD), + expectedStreams: []logproto.Stream{ + { + Labels: labels.FromStrings("namespace", "default").String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "foo=1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + { + Timestamp: time.Unix(0, 3), + Line: "foo=3", + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "3")), + }, + { + Timestamp: time.Unix(0, 4), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "4")), + }, + }, + }, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + itr := NewCategorizeLabelsIterator(tc.inner) + + streamsEntries := make(map[string][]logproto.Entry) + for itr.Next() { + streamsEntries[itr.Labels()] = append(streamsEntries[itr.Labels()], itr.Entry()) + require.NoError(t, itr.Error()) + } + + var streams []logproto.Stream + for lbls, entries := range streamsEntries { + streams = append(streams, logproto.Stream{ + Labels: lbls, + Entries: entries, + }) + } + + require.ElementsMatch(t, tc.expectedStreams, streams) + }) + } +} diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index 7729f2941f884..9c91924704a6c 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -286,16 +286,18 @@ func (q *query) Eval(ctx context.Context) (promql_parser.Value, error) { return value, err case syntax.LogSelectorExpr: - iter, err := q.evaluator.NewIterator(ctx, e, q.params) + itr, err := q.evaluator.NewIterator(ctx, e, q.params) if err != nil { return nil, err } encodingFlags := httpreq.ExtractEncodingFlagsFromCtx(ctx) - categorizeLabels := encodingFlags.Has(httpreq.FlagCategorizeLabels) + if encodingFlags.Has(httpreq.FlagCategorizeLabels) { + itr = iter.NewCategorizeLabelsIterator(itr) + } - defer util.LogErrorWithContext(ctx, "closing iterator", iter.Close) - streams, err := readStreams(iter, q.params.Limit(), q.params.Direction(), q.params.Interval(), categorizeLabels) + defer util.LogErrorWithContext(ctx, "closing iterator", itr.Close) + streams, err := readStreams(itr, q.params.Limit(), q.params.Direction(), q.params.Interval(), true) return streams, err default: return nil, fmt.Errorf("unexpected type (%T): cannot evaluate", e) @@ -512,7 +514,7 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte // value here because many unit tests start at time.Unix(0,0) lastEntry := lastEntryMinTime for respSize < size && i.Next() { - entry := i.Entry() + streamLabels, entry := i.Labels(), i.Entry() forwardShouldOutput := dir == logproto.FORWARD && (entry.Timestamp.Equal(lastEntry.Add(interval)) || entry.Timestamp.After(lastEntry.Add(interval))) @@ -523,27 +525,6 @@ func readStreams(i iter.EntryIterator, size uint32, dir logproto.Direction, inte // If lastEntry.Unix < 0 this is the first pass through the loop and we should output the line. // Then check to see if the entry is equal to, or past a forward or reverse step if interval == 0 || lastEntry.Unix() < 0 || forwardShouldOutput || backwardShouldOutput { - streamLabels := i.Labels() - - // If categorizeLabels is true, We need to remove the structured metadata labels and parsed labels from the stream labels. - // TODO(salvacorts): If this is too slow, provided this is in the hot path, we can consider doing this in the iterator. - if categorizeLabels && (len(entry.StructuredMetadata) > 0 || len(entry.Parsed) > 0) { - lbls, err := syntax.ParseLabels(streamLabels) - if err != nil { - return nil, fmt.Errorf("failed to parse series labels to categorize labels: %w", err) - } - - builder := labels.NewBuilder(lbls) - for _, label := range entry.StructuredMetadata { - builder.Del(label.Name) - } - for _, label := range entry.Parsed { - builder.Del(label.Name) - } - - streamLabels = builder.Labels().String() - } - stream, ok := streams[streamLabels] if !ok { stream = &logproto.Stream{ diff --git a/pkg/querier/tail.go b/pkg/querier/tail.go index 66aa001236f91..bccedfb7c532e 100644 --- a/pkg/querier/tail.go +++ b/pkg/querier/tail.go @@ -7,9 +7,7 @@ import ( "time" "github.com/go-kit/log/level" - "github.com/grafana/loki/pkg/logql/syntax" "github.com/pkg/errors" - "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/iter" loghttp "github.com/grafana/loki/pkg/loghttp/legacy" @@ -237,7 +235,12 @@ func (t *Tailer) pushTailResponseFromIngester(resp *logproto.TailResponse) { t.streamMtx.Lock() defer t.streamMtx.Unlock() - t.openStreamIterator.Push(iter.NewStreamIterator(*resp.Stream)) + itr := iter.NewStreamIterator(*resp.Stream) + if t.categorizeLabels { + itr = iter.NewCategorizeLabelsIterator(itr) + } + + t.openStreamIterator.Push(itr) } // finds oldest entry by peeking at open stream iterator. @@ -250,31 +253,8 @@ func (t *Tailer) next() bool { return false } - entry := t.openStreamIterator.Entry() - streamLabels := t.openStreamIterator.Labels() - - // If categorizeLabels is true, We need to remove the structured metadata labels and parsed labels from the stream labels. - // TODO(salvacorts): If this is too slow, provided this is in the hot path, we can consider doing this in the iterator. - if t.categorizeLabels && (len(entry.StructuredMetadata) > 0 || len(entry.Parsed) > 0) { - lbls, err := syntax.ParseLabels(streamLabels) - if err != nil { - // TODO(salvacorts): Print log message here. - return false - } - - builder := labels.NewBuilder(lbls) - for _, label := range entry.StructuredMetadata { - builder.Del(label.Name) - } - for _, label := range entry.Parsed { - builder.Del(label.Name) - } - - streamLabels = builder.Labels().String() - } - - t.currEntry = entry - t.currLabels = streamLabels + t.currEntry = t.openStreamIterator.Entry() + t.currLabels = t.openStreamIterator.Labels() t.recordStream(t.openStreamIterator.StreamHash()) return true @@ -334,8 +314,13 @@ func newTailer( categorizeLabels bool, m *Metrics, ) *Tailer { + historicEntriesIter := historicEntries + if categorizeLabels { + historicEntriesIter = iter.NewCategorizeLabelsIterator(historicEntries) + } + t := Tailer{ - openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntries}, logproto.FORWARD), + openStreamIterator: iter.NewMergeEntryIterator(context.Background(), []iter.EntryIterator{historicEntriesIter}, logproto.FORWARD), querierTailClients: querierTailClients, delayFor: delayFor, responseChan: make(chan *loghttp.TailResponse, maxBufferedTailResponses), diff --git a/pkg/querier/tail_test.go b/pkg/querier/tail_test.go index 94f857a3bdfc6..32c6bed36ed2d 100644 --- a/pkg/querier/tail_test.go +++ b/pkg/querier/tail_test.go @@ -171,121 +171,119 @@ func TestTailer(t *testing.T) { } func TestCategorizedLabels(t *testing.T) { + t.Parallel() + lbs := labels.FromStrings("app", "foo") - historicalEntries := iter.NewStreamIterator(logproto.Stream{ - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(1, 0), - Line: "foo=1", - }, - { - Timestamp: time.Unix(2, 0), - Line: "foo=2", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - }, - }, - }) - tailClients := map[string]*tailClientMock{ - "test1": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + createHistoricalEntries := func() iter.EntryIterator { + return iter.NewStreamIterator(logproto.Stream{ Labels: lbs.String(), Entries: []logproto.Entry{ { - Timestamp: time.Unix(3, 0), - Line: "foo=3", + Timestamp: time.Unix(1, 0), + Line: "foo=1", }, - }, - })), - "test2": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ - Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), - Entries: []logproto.Entry{ { - Timestamp: time.Unix(4, 0), - Line: "foo=4", + Timestamp: time.Unix(2, 0), + Line: "foo=2", StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), }, }, - })), - "test3": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ - Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(5, 0), - Line: "foo=5", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), + }) + } + createTailClients := func() map[string]*tailClientMock { + return map[string]*tailClientMock{ + "test1": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: lbs.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3, 0), + Line: "foo=3", + }, }, - }, - })), + })), + "test2": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + }, + }, + })), + "test3": newTailClientMock().mockRecvWithTrigger(mockTailResponse(logproto.Stream{ + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), + }, + }, + })), + } } for _, tc := range []struct { - name string - categorizeLabels bool - historicEntries iter.EntryIterator - tailClients map[string]*tailClientMock - expectedResponses []*loghttp.TailResponse + name string + categorizeLabels bool + historicEntries iter.EntryIterator + tailClients map[string]*tailClientMock + expectedStreams []logproto.Stream }{ { name: "without categorize", categorizeLabels: false, - historicEntries: historicalEntries, - tailClients: tailClients, - expectedResponses: []*loghttp.TailResponse{ + historicEntries: createHistoricalEntries(), + tailClients: createTailClients(), + expectedStreams: []logproto.Stream{ { - Streams: logproto.Streams{ + Labels: lbs.String(), + Entries: []logproto.Entry{ { - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(1, 0), - Line: "foo=1", - }, - }, + Timestamp: time.Unix(1, 0), + Line: "foo=1", }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ { - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(2, 0), - Line: "foo=2", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - }, - }, + Timestamp: time.Unix(2, 0), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), }, }, }, { - Streams: logproto.Streams{ + Labels: lbs.String(), + Entries: []logproto.Entry{ { - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(3, 0), - Line: "foo=3", - }, - }, + Timestamp: time.Unix(3, 0), + Line: "foo=3", }, + }, + }, + { + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), + Entries: []logproto.Entry{ { - Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(4, 0), - Line: "foo=4", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - }, - }, + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), }, + }, + }, + { + Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(), + Entries: []logproto.Entry{ { - Labels: labels.NewBuilder(lbs).Set("traceID", "123").Set("foo", "5").Labels().String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(5, 0), - Line: "foo=5", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), - }, - }, + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), }, }, }, @@ -294,63 +292,55 @@ func TestCategorizedLabels(t *testing.T) { { name: "categorize", categorizeLabels: true, - historicEntries: historicalEntries, - tailClients: tailClients, - expectedResponses: []*loghttp.TailResponse{ + historicEntries: createHistoricalEntries(), + tailClients: createTailClients(), + expectedStreams: []logproto.Stream{ { - Streams: logproto.Streams{ + Labels: lbs.String(), + Entries: []logproto.Entry{ { - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(1, 0), - Line: "foo=1", - }, - }, + Timestamp: time.Unix(1, 0), + Line: "foo=1", }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ { - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(2, 0), - Line: "foo=2", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - }, - }, + Timestamp: time.Unix(2, 0), + Line: "foo=2", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), }, }, }, { - Streams: logproto.Streams{ + Labels: lbs.String(), + Entries: []logproto.Entry{ { - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(3, 0), - Line: "foo=3", - }, - }, + Timestamp: time.Unix(3, 0), + Line: "foo=3", }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ { - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(4, 0), - Line: "foo=4", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - }, - }, + Timestamp: time.Unix(4, 0), + Line: "foo=4", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), }, + }, + }, + { + Labels: lbs.String(), + Entries: []logproto.Entry{ { - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(5, 0), - Line: "foo=5", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), - }, - }, + Timestamp: time.Unix(5, 0), + Line: "foo=5", + StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), + Parsed: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", "5")), }, }, }, @@ -378,10 +368,12 @@ func TestCategorizedLabels(t *testing.T) { err := waitUntilTailerOpenStreamsHaveBeenConsumed(tailer) require.NoError(t, err) - responses, err := readFromTailer(tailer, maxEntriesPerTailResponse*maxBufferedTailResponses) + maxEntries := countEntriesInStreams(tc.expectedStreams) + responses, err := readFromTailer(tailer, maxEntries) require.NoError(t, err) - require.Equal(t, tc.expectedResponses, responses) + streams := flattenStreamsFromResponses(responses) + require.ElementsMatch(t, tc.expectedStreams, streams) }) } } @@ -421,7 +413,7 @@ func waitUntilTailerOpenStreamsHaveBeenConsumed(tailer *Tailer) error { select { case <-timeoutTicker.C: - return errors.New("timeout expired while reading responses from Tailer") + return errors.New("timeout expired while waiting for Tailer to consume open streams") default: time.Sleep(throttle) } From 9ab9ebbd7dca3ab342f4b177031fcf0da3fe78cb Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 30 Oct 2023 13:27:56 +0100 Subject: [PATCH 4/6] Fix test --- pkg/ingester/tailer_test.go | 51 +------------------------------------ 1 file changed, 1 insertion(+), 50 deletions(-) diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index af0aa501c08e0..6e8f0309c2da5 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -153,7 +153,7 @@ func Test_IsMatching(t *testing.T) { } } -func Test_CategorizeLabels(t *testing.T) { +func Test_StructuredMetadata(t *testing.T) { lbs := makeRandomLabels() for _, tc := range []struct { @@ -200,55 +200,6 @@ func Test_CategorizeLabels(t *testing.T) { }, }, }, - { - name: "filter pipeline", - query: `{app="foo"} |= "foo"`, - sentStream: logproto.Stream{ - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, 1), - Line: "foo=1", - }, - { - Timestamp: time.Unix(0, 2), - Line: "foo=2", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - }, - { - Timestamp: time.Unix(0, 3), - Line: "bar", - }, - }, - }, - expectedResponses: []logproto.TailResponse{ - { - Stream: &logproto.Stream{ - Labels: lbs.String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, 1), - Line: "foo=1", - }, - }, - }, - DroppedStreams: nil, - }, - { - Stream: &logproto.Stream{ - Labels: labels.NewBuilder(lbs).Set("traceID", "123").Labels().String(), - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(0, 2), - Line: "foo=2", - StructuredMetadata: logproto.FromLabelsToLabelAdapters(labels.FromStrings("traceID", "123")), - }, - }, - }, - DroppedStreams: nil, - }, - }, - }, { name: "parse pipeline labels", query: `{app="foo"} | logfmt`, From aec7e519bb93d1ecb5c53b2e633f94f336f97ee7 Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 30 Oct 2023 14:30:25 +0100 Subject: [PATCH 5/6] Lint fixes --- pkg/ingester/tailer.go | 2 -- pkg/iter/categorized_labels_iterator.go | 3 ++- pkg/iter/categorized_labels_iterator_test.go | 3 ++- pkg/querier/http.go | 4 ++-- pkg/util/marshal/query.go | 2 +- pkg/util/unmarshal/unmarshal_test.go | 4 ++-- 6 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/ingester/tailer.go b/pkg/ingester/tailer.go index 51e995807bbf5..72e7026e810e7 100644 --- a/pkg/ingester/tailer.go +++ b/pkg/ingester/tailer.go @@ -136,8 +136,6 @@ func (t *tailer) send(stream logproto.Stream, lbs labels.Labels) { func (t *tailer) processStream(stream logproto.Stream, lbs labels.Labels) []*logproto.Stream { // Optimization: skip filtering entirely, if no filter is set - // Note: returns the stream regardless of structured metadata. This is fine, - // the querier will take care of properly returning the structured metadata. if log.IsNoopPipeline(t.pipeline) { return []*logproto.Stream{&stream} } diff --git a/pkg/iter/categorized_labels_iterator.go b/pkg/iter/categorized_labels_iterator.go index 1c673b9b155cf..1e95cad09a16e 100644 --- a/pkg/iter/categorized_labels_iterator.go +++ b/pkg/iter/categorized_labels_iterator.go @@ -3,9 +3,10 @@ package iter import ( "fmt" + "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" - "github.com/prometheus/prometheus/model/labels" ) type categorizeLabelsIterator struct { diff --git a/pkg/iter/categorized_labels_iterator_test.go b/pkg/iter/categorized_labels_iterator_test.go index 5a5d909604510..18259edfbf169 100644 --- a/pkg/iter/categorized_labels_iterator_test.go +++ b/pkg/iter/categorized_labels_iterator_test.go @@ -4,9 +4,10 @@ import ( "testing" "time" - "github.com/grafana/loki/pkg/logproto" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" ) func TestNewCategorizeLabelsIterator(t *testing.T) { diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 5956d069e1a8f..fed3301b40274 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -183,7 +183,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { ticker := time.NewTicker(wsPingPeriod) defer ticker.Stop() - connJsonWriter := marshal.NewWebsocketJSONWriter(conn) + connWriter := marshal.NewWebsocketJSONWriter(conn) var response *loghttp_legacy.TailResponse responseChan := tailer.getResponseChan() @@ -216,7 +216,7 @@ func (q *QuerierAPI) TailHandler(w http.ResponseWriter, r *http.Request) { case response = <-responseChan: var err error if version == loghttp.VersionV1 { - err = marshal.WriteTailResponseJSON(*response, connJsonWriter, encodingFlags) + err = marshal.WriteTailResponseJSON(*response, connWriter, encodingFlags) } else { err = marshal_legacy.WriteTailResponseJSON(*response, conn) } diff --git a/pkg/util/marshal/query.go b/pkg/util/marshal/query.go index 164a792b97234..b048b0a952f87 100644 --- a/pkg/util/marshal/query.go +++ b/pkg/util/marshal/query.go @@ -5,7 +5,6 @@ import ( "strconv" "unsafe" - legacy "github.com/grafana/loki/pkg/loghttp/legacy" jsoniter "github.com/json-iterator/go" "github.com/pkg/errors" "github.com/prometheus/common/model" @@ -14,6 +13,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/grafana/loki/pkg/loghttp" + legacy "github.com/grafana/loki/pkg/loghttp/legacy" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logqlmodel" "github.com/grafana/loki/pkg/logqlmodel/stats" diff --git a/pkg/util/unmarshal/unmarshal_test.go b/pkg/util/unmarshal/unmarshal_test.go index 582dc4de80188..93372f62ebef1 100644 --- a/pkg/util/unmarshal/unmarshal_test.go +++ b/pkg/util/unmarshal/unmarshal_test.go @@ -224,7 +224,7 @@ func (ws *websocket) ReadMessage() (int, []byte, error) { func Test_ReadTailResponse(t *testing.T) { ws := &websocket{} - wsJson := marshal.NewWebsocketJSONWriter(ws) + wsJSON := marshal.NewWebsocketJSONWriter(ws) require.NoError(t, marshal.WriteTailResponseJSON(legacy_loghttp.TailResponse{ Streams: []logproto.Stream{ {Labels: `{app="bar"}`, Entries: []logproto.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}}}, @@ -232,7 +232,7 @@ func Test_ReadTailResponse(t *testing.T) { DroppedEntries: []legacy_loghttp.DroppedEntry{ {Timestamp: time.Unix(0, 1), Labels: `{app="foo"}`}, }, - }, wsJson, nil)) + }, wsJSON, nil)) res := &loghttp.TailResponse{} require.NoError(t, ReadTailResponseJSON(res, ws)) From fec3c297fc5ccb3cfcdbcd63892e5885b99a1bbc Mon Sep 17 00:00:00 2001 From: Salva Corts Date: Mon, 30 Oct 2023 14:33:28 +0100 Subject: [PATCH 6/6] Fix test --- pkg/ingester/tailer_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/ingester/tailer_test.go b/pkg/ingester/tailer_test.go index 6e8f0309c2da5..59293352030df 100644 --- a/pkg/ingester/tailer_test.go +++ b/pkg/ingester/tailer_test.go @@ -268,11 +268,7 @@ func Test_StructuredMetadata(t *testing.T) { }, 30*time.Second, 1*time.Second, "stream was not received") responses := server.GetResponses() - require.Equal(t, len(responses), len(tc.expectedResponses)) - for i := range responses { - require.Equal(t, tc.expectedResponses[i].Stream, responses[i].Stream) - require.Equal(t, tc.expectedResponses[i].DroppedStreams, responses[i].DroppedStreams) - } + require.ElementsMatch(t, tc.expectedResponses, responses) tail.close() wg.Wait()