From 25eedf87937ae156818d8fddf773384161e478c4 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 3 Apr 2024 12:35:08 +0530 Subject: [PATCH 01/26] Fetch labels and values from store --- pkg/querier/multi_tenant_querier.go | 6 ++-- pkg/querier/querier.go | 45 +++++++++++++++++++++++++---- pkg/querier/querier_test.go | 14 ++++----- 3 files changed, 50 insertions(+), 15 deletions(-) diff --git a/pkg/querier/multi_tenant_querier.go b/pkg/querier/multi_tenant_querier.go index 12cb412a61cf9..cf469f3588ff8 100644 --- a/pkg/querier/multi_tenant_querier.go +++ b/pkg/querier/multi_tenant_querier.go @@ -283,7 +283,6 @@ func (q *MultiTenantQuerier) DetectedFields(ctx context.Context, req *logproto.D } func (q *MultiTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) { - // TODO(shantanu) tenantIDs, err := tenant.TenantIDs(ctx) if err != nil { return nil, err @@ -293,7 +292,10 @@ func (q *MultiTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.D return q.Querier.DetectedLabels(ctx, req) } - //resp := make([]*logproto.DetectedLabels, len(tenantIDs)) + level.Debug(q.logger).Log( + "msg", "detected labels requested for multiple tenants, but not yet supported. returning static labels", + "tenantIDs", strings.Join(tenantIDs, ","), + ) return &logproto.DetectedLabelsResponse{ DetectedLabels: []*logproto.DetectedLabel{ diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 3c03cde0653b7..4fbc588220662 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -8,6 +8,7 @@ import ( "regexp" "sort" "strconv" + "strings" "time" "github.com/axiomhq/hyperloglog" @@ -910,11 +911,16 @@ func (q *SingleTenantQuerier) Volume(ctx context.Context, req *logproto.VolumeRe } func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) { - var ingesterLabels *logproto.LabelToValuesResponse + userID, err := tenant.TenantID(ctx) + if err != nil { + return nil, err + } var detectedLabels []*logproto.DetectedLabel g, ctx := errgroup.WithContext(ctx) - ingesterQueryInterval, _ := q.buildQueryIntervals(*req.Start, *req.End) + ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(*req.Start, *req.End) + + var ingesterLabels *logproto.LabelToValuesResponse if !q.cfg.QueryStoreOnly && ingesterQueryInterval != nil { g.Go(func() error { var err error @@ -928,6 +934,33 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. }) } + if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { + var matchers []*labels.Matcher + if req.Query != "" { + matchers, err = syntax.ParseMatchers(req.Query, true) + if err != nil { + return nil, err + } + } + g.Go(func() error { + var err error + start := model.TimeFromUnixNano(storeQueryInterval.start.UnixNano()) + end := model.TimeFromUnixNano(storeQueryInterval.end.UnixNano()) + storeLabels, err := q.store.LabelNamesForMetricName(ctx, userID, start, end, "logs") + for _, label := range storeLabels { + values, err := q.store.LabelValuesForMetricName(ctx, userID, start, end, "logs", label, matchers...) + if err != nil { + return err + } + uniqValues := slices.CompactFunc(values, strings.EqualFold) + if q.isLabelRelevant(label, uniqValues) { + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqValues))}) + } + } + return err + }) + } + if err := g.Wait(); err != nil { return nil, err } @@ -939,7 +972,7 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. } for label, values := range ingesterLabels.Labels { - if q.isLabelRelevant(label, values) { + if q.isLabelRelevant(label, values.Values) { detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(values.Values))}) } } @@ -949,13 +982,13 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. }, nil } -func (q *SingleTenantQuerier) isLabelRelevant(label string, values *logproto.UniqueLabelValues) bool { +func (q *SingleTenantQuerier) isLabelRelevant(label string, values []string) bool { staticLabels := []string{"pod", "namespace", "cluster", "instance"} - cardinality := len(values.Values) + cardinality := len(values) // TODO(shantanu) make these values configurable if !slices.Contains(staticLabels, label) && (cardinality < 1 || cardinality > 50) || - containsAllIDTypes(values.Values) { + containsAllIDTypes(values) { return false } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index df87a72df3664..4408ca98a304b 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1373,37 +1373,37 @@ func TestQuerier_isLabelRelevant(t *testing.T) { for _, tc := range []struct { name string label string - values *logproto.UniqueLabelValues + values []string expected bool }{ { label: "uuidv4 values are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}, expected: false, }, { label: "guid values are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"57808f62-f117-4a22-84a0-bc3282c7f106", "5076e837-cd8d-4dd7-95ff-fecb087dccf6", "2e2a6554-1744-4399-b89a-88ae79c27096", "d3c31248-ec0c-4bc4-b11c-8fb1cfb42e62"}}, + values: []string{"57808f62-f117-4a22-84a0-bc3282c7f106", "5076e837-cd8d-4dd7-95ff-fecb087dccf6", "2e2a6554-1744-4399-b89a-88ae79c27096", "d3c31248-ec0c-4bc4-b11c-8fb1cfb42e62"}, expected: false, }, { label: "integer values are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"1", "2", "3", "4"}}, + values: []string{"1", "2", "3", "4"}, expected: false, }, { label: "string values are relevant", - values: &logproto.UniqueLabelValues{Values: []string{"ingester", "querier", "query-frontend", "index-gateway"}}, + values: []string{"ingester", "querier", "query-frontend", "index-gateway"}, expected: true, }, { label: "guid with braces are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"{E9550CF7-58D9-48B9-8845-D9800C651AAC}", "{1617921B-1749-4FF0-A058-31AFB5D98149}", "{C119D92E-A4B9-48A3-A92C-6CA8AA8A6CCC}", "{228AAF1D-2DE7-4909-A4E9-246A7FA9D988}"}}, + values: []string{"{E9550CF7-58D9-48B9-8845-D9800C651AAC}", "{1617921B-1749-4FF0-A058-31AFB5D98149}", "{C119D92E-A4B9-48A3-A92C-6CA8AA8A6CCC}", "{228AAF1D-2DE7-4909-A4E9-246A7FA9D988}"}, expected: false, }, { label: "float values are not relevant", - values: &logproto.UniqueLabelValues{Values: []string{"1.2", "2.5", "3.3", "4.1"}}, + values: []string{"1.2", "2.5", "3.3", "4.1"}, expected: false, }, } { From 50b2c4b408121269f1e0b7f288e1583af67ecd7a Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 3 Apr 2024 13:12:13 +0530 Subject: [PATCH 02/26] Dedupe label and values from ingester and store --- pkg/querier/querier.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 4fbc588220662..c49b6477ad214 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -910,6 +910,7 @@ func (q *SingleTenantQuerier) Volume(ctx context.Context, req *logproto.VolumeRe return seriesvolume.Merge(responses, req.Limit), nil } +// DetectedLabels fetches labels and values from store and ingesters and filters them by relevance criteria as per logs app. func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) { userID, err := tenant.TenantID(ctx) if err != nil { @@ -934,6 +935,7 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. }) } + storeLabelsMap := make(map[string][]string) if !q.cfg.QueryIngesterOnly && storeQueryInterval != nil { var matchers []*labels.Matcher if req.Query != "" { @@ -952,9 +954,8 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. if err != nil { return err } - uniqValues := slices.CompactFunc(values, strings.EqualFold) - if q.isLabelRelevant(label, uniqValues) { - detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqValues))}) + if q.isLabelRelevant(label, values) { + storeLabelsMap[label] = values } } return err @@ -973,7 +974,14 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. for label, values := range ingesterLabels.Labels { if q.isLabelRelevant(label, values.Values) { - detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(values.Values))}) + combinedValues := values.Values + storeValues, storeHasLabel := storeLabelsMap[label] + if storeHasLabel { + combinedValues = append(combinedValues, storeValues...) + } + uniqValues := slices.CompactFunc(combinedValues, strings.EqualFold) + // TODO(shantanu): There's a bug here. Unique values can go above 50. Will need a bit of refactoring + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqValues))}) } } @@ -982,6 +990,8 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. }, nil } +// isLabelRelevant returns if the label is relevant for logs app. A label is relevant if it is not of any numeric, UUID or GUID type +// It is also not relevant to return if the values are less than 1 or beyond 50. func (q *SingleTenantQuerier) isLabelRelevant(label string, values []string) bool { staticLabels := []string{"pod", "namespace", "cluster", "instance"} cardinality := len(values) From 9398dfa4472b5eb7f5b0b60d93fe236dd098f254 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 3 Apr 2024 14:37:44 +0530 Subject: [PATCH 03/26] Enforce some validations and timeout --- pkg/querier/querier.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index c49b6477ad214..391bd667576c2 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -918,7 +918,15 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. } var detectedLabels []*logproto.DetectedLabel + // Enforce the query timeout while querying backends + queryTimeout := q.limits.QueryTimeout(ctx, userID) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(queryTimeout)) + defer cancel() g, ctx := errgroup.WithContext(ctx) + + if *req.Start, *req.End, err = validateQueryTimeRangeLimits(ctx, userID, q.limits, *req.Start, *req.End); err != nil { + return nil, err + } ingesterQueryInterval, storeQueryInterval := q.buildQueryIntervals(*req.Start, *req.End) var ingesterLabels *logproto.LabelToValuesResponse @@ -930,7 +938,6 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. splitReq.End = &ingesterQueryInterval.end ingesterLabels, err = q.ingesterQuerier.DetectedLabel(ctx, &splitReq) - level.Info(q.logger).Log("msg", ingesterLabels) return err }) } From 97cf8a3e7d80ff5acd522e942b3291df2262fec3 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 3 Apr 2024 16:24:52 +0530 Subject: [PATCH 04/26] Fetch label values from index --- pkg/ingester/instance.go | 23 +++++++++++++++++++++-- pkg/querier/queryrange/codec.go | 12 ------------ 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 99bf587b21e85..eb37350ddb426 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -589,9 +589,28 @@ type UniqueValues map[string]struct{} // LabelsWithValues returns the label names with all the unique values depending on the request func (i *instance) LabelsWithValues(ctx context.Context, startTime time.Time, matchers ...*labels.Matcher) (map[string]UniqueValues, error) { - // TODO (shantanu): Figure out how to get the label names from index directly when no matchers are given. - labelMap := make(map[string]UniqueValues) + if len(matchers) == 0 { + labelsFromIndex, err := i.index.LabelNames(startTime, nil) + if err != nil { + return nil, err + } + + for _, label := range labelsFromIndex { + values, err := i.index.LabelValues(startTime, label, nil) + if err != nil { + return nil, err + } + existingValues, exists := labelMap[label] + if !exists { + existingValues = make(map[string]struct{}) + } + for _, v := range values { + existingValues[v] = struct{}{} + } + } + } + err := i.forMatchingStreams(ctx, startTime, matchers, nil, func(s *stream) error { for _, label := range s.labels { v, exists := labelMap[label.Name] diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 2d36eafc0a5da..80d8304928b24 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -268,18 +268,6 @@ type DetectedLabelsRequest struct { logproto.DetectedLabelsRequest } -// NewDetectedLabelsRequest creates a new request for detected labels -func NewDetectedLabelsRequest(start, end time.Time, query, path string) *DetectedLabelsRequest { - return &DetectedLabelsRequest{ - DetectedLabelsRequest: logproto.DetectedLabelsRequest{ - Start: &start, - End: &end, - Query: query, - }, - path: path, - } -} - func (r *DetectedLabelsRequest) AsProto() *logproto.DetectedLabelsRequest { return &r.DetectedLabelsRequest } From 30329f7a2d3fbaba7843096c2bda6f5ccfe047eb Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 3 Apr 2024 16:44:47 +0530 Subject: [PATCH 05/26] Fix passing matchers in ingesters --- pkg/ingester/ingester.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 937eceaa683e1..5f8da3be439f6 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1393,7 +1393,6 @@ func (i *Ingester) GetDetectedLabels(ctx context.Context, req *logproto.Detected if err != nil { return nil, err } - level.Info(i.logger).Log("msg", matchers) } labelMap, err := instance.LabelsWithValues(ctx, *req.Start, matchers...) From 4ac58f32581c39085b6856b4cb4b406a22d227de Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 8 Apr 2024 16:47:37 +0530 Subject: [PATCH 06/26] Add basic querier tests for Detected Labels --- pkg/querier/querier.go | 18 +++++--- pkg/querier/querier_mock_test.go | 10 +++++ pkg/querier/querier_test.go | 73 +++++++++++++++++++++++++++++--- 3 files changed, 89 insertions(+), 12 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index f4ff897e5ab85..836f3fb25109d 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -914,6 +914,7 @@ func (q *SingleTenantQuerier) Volume(ctx context.Context, req *logproto.VolumeRe func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto.DetectedLabelsRequest) (*logproto.DetectedLabelsResponse, error) { var ingesterLabels *logproto.LabelToValuesResponse var detectedLabels []*logproto.DetectedLabel + staticLabels := []string{"cluster", "namespace", "instance", "pod"} g, ctx := errgroup.WithContext(ctx) ingesterQueryInterval, _ := q.buildQueryIntervals(*req.Start, *req.End) @@ -940,8 +941,15 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. }, nil } + // append static labels before so they are in sorted order + for _, l := range staticLabels { + if values, present := ingesterLabels.Labels[l]; present { + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: l, Cardinality: uint64(len(values.Values))}) + } + } + for label, values := range ingesterLabels.Labels { - if q.isLabelRelevant(label, values) { + if q.isLabelRelevant(label, values, staticLabels) { detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(values.Values))}) } } @@ -967,12 +975,10 @@ func (q *SingleTenantQuerier) Patterns(ctx context.Context, req *logproto.QueryP return res, err } -func (q *SingleTenantQuerier) isLabelRelevant(label string, values *logproto.UniqueLabelValues) bool { - staticLabels := []string{"pod", "namespace", "cluster", "instance"} +func (q *SingleTenantQuerier) isLabelRelevant(label string, values *logproto.UniqueLabelValues, staticLabels []string) bool { cardinality := len(values.Values) - // TODO(shantanu) make these values configurable - if !slices.Contains(staticLabels, label) && - (cardinality < 1 || cardinality > 50) || + if slices.Contains(staticLabels, label) || + (cardinality < 2 || cardinality > 50) || containsAllIDTypes(values.Values) { return false } diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index 3d5edc50b8317..6d025a9e0db5e 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -111,6 +111,16 @@ func (c *querierClientMock) GetChunkIDs(ctx context.Context, in *logproto.GetChu return res.(*logproto.GetChunkIDsResponse), args.Error(1) } +func (c *querierClientMock) GetDetectedLabels(ctx context.Context, in *logproto.DetectedLabelsRequest, opts ...grpc.CallOption) (*logproto.LabelToValuesResponse, error) { + args := c.Called(ctx, in, opts) + res := args.Get(0) + if res == nil { + return (*logproto.LabelToValuesResponse)(nil), args.Error(1) + } + return res.(*logproto.LabelToValuesResponse), args.Error(1) + +} + func (c *querierClientMock) GetVolume(ctx context.Context, in *logproto.VolumeRequest, opts ...grpc.CallOption) (*logproto.VolumeResponse, error) { args := c.Called(ctx, in, opts) res := args.Get(0) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index df87a72df3664..54412b0ace076 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -14,11 +14,6 @@ import ( "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/user" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" @@ -28,6 +23,10 @@ import ( "github.com/grafana/loki/v3/pkg/storage" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/validation" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) const ( @@ -1148,6 +1147,13 @@ func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*quer }, }, }, nil) + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.DetectedLabelsResponse{ + DetectedLabels: []*logproto.DetectedLabel{ + {Label: "pod", Cardinality: 1}, + {Label: "namespace", Cardinality: 3}, + {Label: "customerId", Cardinality: 200}, + }, + }, nil) store := newStoreMock() store.On("SelectLogs", mock.Anything, mock.Anything).Return(mockStreamIterator(0, 1), nil) @@ -1409,8 +1415,63 @@ func TestQuerier_isLabelRelevant(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { querier := &SingleTenantQuerier{cfg: mockQuerierConfig()} - assert.Equal(t, tc.expected, querier.isLabelRelevant(tc.label, tc.values)) + assert.Equal(t, tc.expected, querier.isLabelRelevant(tc.label, tc.values, []string{"host", "cluster", "namespace", "instance", "pod"})) }) } } + +func TestQuerier_DetectedLabels(t *testing.T) { + start := time.Now() + end := time.Now() + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {[]string{"ingester"}}, + "foo": {[]string{"abc", "def", "ghi"}}, + "bar": {[]string{"cgi", "def"}}, + "all-ids": {[]string{"1", "2", "3", "5"}}, + "uuids": {[]string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + }} + + expectedResponse := logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ + { + Label: "cluster", + Cardinality: 1, + }, + { + Label: "foo", + Cardinality: 3, + }, + { + Label: "bar", + Cardinality: 2, + }, + }} + + query := "" + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + ctx := user.InjectOrgID(context.Background(), "test") + request := logproto.DetectedLabelsRequest{ + Start: &start, + End: &end, + Query: query, + } + conf := mockQuerierConfig() + conf.IngesterQueryStoreMaxLookback = 0 + + ingesterClient := newQuerierClientMock() + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + newStoreMock(), limits) + + resp, err := querier.DetectedLabels(ctx, &request) + calls := ingesterClient.GetMockedCallsByMethod("GetDetectedLabels") + assert.Equal(t, 1, len(calls)) + require.Equal(t, &expectedResponse, resp) +} From adab10df3ccd17d956f9805621d45ecb7a8af919 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 8 Apr 2024 16:53:16 +0530 Subject: [PATCH 07/26] Log on error in type conversion in detected labels --- pkg/loki/modules.go | 3 ++- pkg/querier/ingester_querier.go | 17 +++++++++++++---- pkg/querier/ingester_querier_test.go | 6 ++++++ pkg/querier/querier_test.go | 12 +++++++----- 4 files changed, 28 insertions(+), 10 deletions(-) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 882c0d40d130d..c6499b97c702b 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -883,7 +883,8 @@ func (t *Loki) setupAsyncStore() error { } func (t *Loki) initIngesterQuerier() (_ services.Service, err error) { - t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace) + logger := log.With(util_log.Logger, "component", "querier") + t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace, logger) if err != nil { return nil, err } diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index 8b5b4d14a729e..7fe103b2649eb 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "golang.org/x/exp/slices" "github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume" @@ -41,23 +43,25 @@ type IngesterQuerier struct { ring ring.ReadRing pool *ring_client.Pool extraQueryDelay time.Duration + logger log.Logger } -func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string) (*IngesterQuerier, error) { +func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) { factory := func(addr string) (ring_client.PoolClient, error) { return client.New(clientCfg, addr) } - return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace) + return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace, logger) } // newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory // used for testing purposes -func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string) (*IngesterQuerier, error) { +func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) { iq := IngesterQuerier{ ring: ring, pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace), extraQueryDelay: extraQueryDelay, + logger: logger, } err := services.StartAndAwaitRunning(context.Background(), iq.pool) @@ -364,12 +368,17 @@ func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.Detec }) if err != nil { + level.Error(q.logger).Log("msg", "error getting detected labels", "err", err) return nil, err } labelMap := make(map[string][]string) for _, resp := range ingesterResponses { - thisIngester := resp.response.(*logproto.LabelToValuesResponse) + thisIngester, ok := resp.response.(*logproto.LabelToValuesResponse) + if !ok { + level.Error(q.logger).Log("msg", "Cannot convert response to LabelToValuesResponse in detectedlabels", + "response", resp) + } for label, thisIngesterValues := range thisIngester.Labels { var combinedValues []string diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index d2cb00d82ec59..cfe7e9e37c97b 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + util_log "github.com/grafana/loki/v3/pkg/util/log" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -110,6 +111,7 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, + util_log.Logger, ) require.NoError(t, err) @@ -210,6 +212,7 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, + util_log.Logger, ) require.NoError(t, err) @@ -308,6 +311,7 @@ func TestQuerier_tailDisconnectedIngesters(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, + util_log.Logger, ) require.NoError(t, err) @@ -371,6 +375,7 @@ func TestIngesterQuerier_Volume(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, + util_log.Logger, ) require.NoError(t, err) @@ -392,6 +397,7 @@ func TestIngesterQuerier_Volume(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, + util_log.Logger, ) require.NoError(t, err) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 54412b0ace076..48e8aba303a59 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -14,6 +14,12 @@ import ( "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/user" + util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" @@ -23,10 +29,6 @@ import ( "github.com/grafana/loki/v3/pkg/storage" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/validation" - "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) const ( @@ -1357,7 +1359,7 @@ func TestQuerier_SelectSamplesWithDeletes(t *testing.T) { } func newQuerier(cfg Config, clientCfg client.Config, clientFactory ring_client.PoolFactory, ring ring.ReadRing, dg *mockDeleteGettter, store storage.Store, limits *validation.Overrides) (*SingleTenantQuerier, error) { - iq, err := newIngesterQuerier(clientCfg, ring, cfg.ExtraQueryDelay, clientFactory, constants.Loki) + iq, err := newIngesterQuerier(clientCfg, ring, cfg.ExtraQueryDelay, clientFactory, constants.Loki, util_log.Logger) if err != nil { return nil, err } From a6cbec0ef52c50b3f2dbf66596a7df371655cce9 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 8 Apr 2024 17:55:40 +0530 Subject: [PATCH 08/26] Improve querier test --- pkg/querier/querier_test.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 48e8aba303a59..7b14ea876bdc1 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -5,6 +5,7 @@ import ( "errors" "io" "net/http" + "strconv" "testing" "time" @@ -1424,14 +1425,21 @@ func TestQuerier_isLabelRelevant(t *testing.T) { } func TestQuerier_DetectedLabels(t *testing.T) { + start := time.Now() end := time.Now() + manyValues := []string{} + for i := 0; i < 60; i++ { + manyValues = append(manyValues, "a"+strconv.Itoa(i)) + } ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ - "cluster": {[]string{"ingester"}}, - "foo": {[]string{"abc", "def", "ghi"}}, - "bar": {[]string{"cgi", "def"}}, - "all-ids": {[]string{"1", "2", "3", "5"}}, - "uuids": {[]string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "def", "ghi"}}, + "bar": {Values: []string{"cgi", "def"}}, + "all-ids": {Values: []string{"1", "2", "3", "5"}}, + "uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + "manyvalues": {Values: manyValues}, + "namespace": {Values: manyValues}, }} expectedResponse := logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ @@ -1439,6 +1447,10 @@ func TestQuerier_DetectedLabels(t *testing.T) { Label: "cluster", Cardinality: 1, }, + { + Label: "namespace", + Cardinality: 60, + }, { Label: "foo", Cardinality: 3, @@ -1471,8 +1483,9 @@ func TestQuerier_DetectedLabels(t *testing.T) { mockReadRingWithOneActiveIngester(), &mockDeleteGettter{}, newStoreMock(), limits) - + require.NoError(t, err) resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) calls := ingesterClient.GetMockedCallsByMethod("GetDetectedLabels") assert.Equal(t, 1, len(calls)) require.Equal(t, &expectedResponse, resp) From 6c830c9f967aaa9012c4e3140ff4b473dd2d0f83 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 8 Apr 2024 18:13:16 +0530 Subject: [PATCH 09/26] Add test for ingester_querier --- pkg/querier/ingester_querier_test.go | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index cfe7e9e37c97b..0ee33f20f97ae 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -407,3 +407,37 @@ func TestIngesterQuerier_Volume(t *testing.T) { require.Equal(t, []logproto.Volume(nil), volumes.Volumes) }) } + +func TestIngesterQuerier_DetectedLabels(t *testing.T) { + t.Run("it returns all unique detected labels from all ingesters", func(t *testing.T) { + req := logproto.DetectedLabelsRequest{} + res := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "all-ids": {Values: []string{"1", "3"}}, + "bar": {Values: []string{"cgi", "def"}}, + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "ghi"}}, + }} + + ingesterClient := newQuerierClientMock() + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "abc", "ghi"}}, + "bar": {Values: []string{"cgi", "def"}}, + "all-ids": {Values: []string{"1", "3", "3", "3"}}, + }}, nil) + + ingesterQuerier, err := newIngesterQuerier( + mockIngesterClientConfig(), + newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), + mockQuerierConfig().ExtraQueryDelay, + newIngesterClientMockFactory(ingesterClient), + constants.Loki, + util_log.Logger, + ) + require.NoError(t, err) + + detectedLabels, err := ingesterQuerier.DetectedLabel(context.Background(), &req) + require.NoError(t, err) + require.Equal(t, &res, detectedLabels) + }) +} From 78d831f01006b69318ac78e312374c1734cea66b Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 8 Apr 2024 19:17:20 +0530 Subject: [PATCH 10/26] Format --- pkg/querier/ingester_querier_test.go | 3 ++- pkg/querier/querier_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index 0ee33f20f97ae..5757e34665c8f 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -7,9 +7,10 @@ import ( "testing" "time" - util_log "github.com/grafana/loki/v3/pkg/util/log" "go.uber.org/atomic" + util_log "github.com/grafana/loki/v3/pkg/util/log" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 7b14ea876bdc1..0b47452153083 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -15,12 +15,13 @@ import ( "github.com/grafana/dskit/ring" ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/user" - util_log "github.com/grafana/loki/v3/pkg/util/log" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/compactor/deletion" "github.com/grafana/loki/v3/pkg/ingester/client" "github.com/grafana/loki/v3/pkg/logproto" From 384a52ca903aef7e5ac04dde6dcb75d3feab2308 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 10 Apr 2024 14:23:55 +0530 Subject: [PATCH 11/26] Add tests to ingester instance --- pkg/ingester/instance_test.go | 55 +++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 0cd5838251248..1f780e2bd9fb2 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -1480,6 +1480,61 @@ func insertData(t *testing.T, instance *instance) { } } +func TestInstance_LabelsWithValues(t *testing.T) { + instance, currentTime, _ := setupTestStreams(t) + start := []time.Time{currentTime.Add(11 * time.Nanosecond)}[0] + m, err := labels.NewMatcher(labels.MatchEqual, "app", "test") + require.NoError(t, err) + + tests := []struct { + name string + startTime time.Time + matchers []*labels.Matcher + expectedResponse map[string]UniqueValues + }{ + { + name: "label names with no matchers", + startTime: start, + expectedResponse: map[string]UniqueValues{ + "app": map[string]struct{}{ + "test": {}, + "test2": {}, + }, + "job": map[string]struct{}{ + "varlogs": {}, + "varlogs2": {}, + }, + }, + }, + { + name: "label names with matchers", + startTime: start, + matchers: []*labels.Matcher{m}, + expectedResponse: map[string]UniqueValues{ + "app": map[string]struct{}{ + "test": {}, + }, + "job": map[string]struct{}{ + "varlogs": {}, + "varlogs2": {}, + }, + }, + }, + { + name: "label names matchers and no start time", + matchers: []*labels.Matcher{m}, + expectedResponse: map[string]UniqueValues{}, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + res, err := instance.LabelsWithValues(context.Background(), tc.startTime, tc.matchers...) + require.NoError(t, err) + require.Equal(t, tc.expectedResponse, res) + }) + } +} + type fakeQueryServer func(*logproto.QueryResponse) error func (f fakeQueryServer) Send(res *logproto.QueryResponse) error { From 4f9d6c850cc22582f430159ce93e7cdcc0a85366 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 10 Apr 2024 15:53:33 +0530 Subject: [PATCH 12/26] Add ingester tests for detected labels --- pkg/ingester/ingester_test.go | 63 +++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 94fd5700c6809..c440287472fb0 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -784,6 +784,69 @@ func Test_InMemoryLabels(t *testing.T) { require.Equal(t, []string{"bar", "foo"}, res.Values) } +func TestIngester_GetDetectedLabels(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "test") + + ingesterConfig := defaultIngesterTestConfig(t) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + store := &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Push labels + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + { + Labels: `{foo="bar1",bar="baz3"}`, + }, + { + Labels: `{foo="foo1",bar="baz1"}`, + }, + { + Labels: `{foo="foo",bar="baz1"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + } + + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{ + Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0], + End: nil, + Query: "", + }) + + require.NoError(t, err) + fooValues, ok := res.Labels["foo"] + require.True(t, ok) + barValues, ok := res.Labels["bar"] + require.True(t, ok) + require.Equal(t, 4, len(fooValues.Values)) + require.Equal(t, 3, len(barValues.Values)) +} + func Test_DedupeIngester(t *testing.T) { var ( requests = int64(400) From 189c23b7ec5de0b0a99fa3e258c5352a7cc07eaa Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 10 Apr 2024 20:31:24 +0530 Subject: [PATCH 13/26] Add logic to fetch labels from store --- pkg/querier/querier.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 2193223926c2a..198e43e5161e9 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -975,25 +975,34 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. return nil, err } - if ingesterLabels == nil { + if ingesterLabels == nil && len(storeLabelsMap) == 0 { return &logproto.DetectedLabelsResponse{ DetectedLabels: []*logproto.DetectedLabel{}, }, nil } - for label, values := range ingesterLabels.Labels { - if q.isLabelRelevant(label, values.Values) { - combinedValues := values.Values - storeValues, storeHasLabel := storeLabelsMap[label] - if storeHasLabel { - combinedValues = append(combinedValues, storeValues...) + if ingesterLabels != nil { + for label, values := range ingesterLabels.Labels { + if q.isLabelRelevant(label, values.Values) { + combinedValues := values.Values + storeValues, storeHasLabel := storeLabelsMap[label] + if storeHasLabel { + combinedValues = append(combinedValues, storeValues...) + } + uniqValues := slices.CompactFunc(combinedValues, strings.EqualFold) + // TODO(shantanu): There's a bug here. Unique values can go above 50. Will need a bit of refactoring + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqValues))}) + delete(storeLabelsMap, label) } - uniqValues := slices.CompactFunc(combinedValues, strings.EqualFold) - // TODO(shantanu): There's a bug here. Unique values can go above 50. Will need a bit of refactoring - detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqValues))}) } } + if storeLabelsMap != nil { + for label, values := range storeLabelsMap { + uniqValues := slices.CompactFunc(values, strings.EqualFold) + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqValues))}) + } + } return &logproto.DetectedLabelsResponse{ DetectedLabels: detectedLabels, }, nil From 8f3c2979401b27065e9263ebb78a825115fdc3d6 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 10 Apr 2024 21:03:01 +0530 Subject: [PATCH 14/26] Fix querier tests --- pkg/querier/querier.go | 8 ++++---- pkg/querier/querier_test.go | 17 +++++++++++++++-- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 5d11e8f607be2..29d1f243a70b9 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -964,7 +964,7 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. if err != nil { return err } - if q.isLabelRelevant(label, values) { + if q.isLabelRelevant(label, values, staticLabels) { storeLabelsMap[label] = values } } @@ -1034,11 +1034,11 @@ func (q *SingleTenantQuerier) Patterns(ctx context.Context, req *logproto.QueryP // isLabelRelevant returns if the label is relevant for logs app. A label is relevant if it is not of any numeric, UUID or GUID type // It is also not relevant to return if the values are less than 1 or beyond 50. -func (q *SingleTenantQuerier) isLabelRelevant(label string, values *logproto.UniqueLabelValues, staticLabels []string) bool { - cardinality := len(values.Values) +func (q *SingleTenantQuerier) isLabelRelevant(label string, values []string, staticLabels []string) bool { + cardinality := len(values) if slices.Contains(staticLabels, label) || (cardinality < 2 || cardinality > 50) || - containsAllIDTypes(values.Values) { + containsAllIDTypes(values) { return false } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 3376479fbcb63..22ad90f0c632d 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1426,7 +1426,6 @@ func TestQuerier_isLabelRelevant(t *testing.T) { } func TestQuerier_DetectedLabels(t *testing.T) { - start := time.Now() end := time.Now() manyValues := []string{} @@ -1460,6 +1459,14 @@ func TestQuerier_DetectedLabels(t *testing.T) { Label: "bar", Cardinality: 2, }, + { + Label: "label1", + Cardinality: 2, + }, + { + Label: "label2", + Cardinality: 2, + }, }} query := "" @@ -1477,13 +1484,19 @@ func TestQuerier_DetectedLabels(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). Return(&ingesterResponse, nil) + + storeClient := newStoreMock() + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"label1", "label2"}, nil) + storeClient.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "label1", mock.Anything).Return([]string{"val1", "val2"}, nil) + storeClient.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "label2", mock.Anything).Return([]string{"val3", "val4"}, nil) + querier, err := newQuerier( conf, mockIngesterClientConfig(), newIngesterClientMockFactory(ingesterClient), mockReadRingWithOneActiveIngester(), &mockDeleteGettter{}, - newStoreMock(), limits) + storeClient, limits) require.NoError(t, err) resp, err := querier.DetectedLabels(ctx, &request) require.NoError(t, err) From 5eb40be21fe1e0ed5f866b3a3d2277b04c6a84de Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 10 Apr 2024 21:54:46 +0530 Subject: [PATCH 15/26] Add tests --- pkg/querier/querier.go | 12 ++- pkg/querier/querier_test.go | 201 ++++++++++++++++++++++++------------ 2 files changed, 141 insertions(+), 72 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 29d1f243a70b9..fc78602c30655 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -8,7 +8,6 @@ import ( "regexp" "sort" "strconv" - "strings" "time" "github.com/axiomhq/hyperloglog" @@ -997,9 +996,11 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. if storeHasLabel { combinedValues = append(combinedValues, storeValues...) } - uniqValues := slices.CompactFunc(combinedValues, strings.EqualFold) + + slices.Sort(combinedValues) + uniqueValues := slices.Compact(combinedValues) // TODO(shantanu): There's a bug here. Unique values can go above 50. Will need a bit of refactoring - detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqValues))}) + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqueValues))}) delete(storeLabelsMap, label) } } @@ -1007,8 +1008,9 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. if storeLabelsMap != nil { for label, values := range storeLabelsMap { - uniqValues := slices.CompactFunc(values, strings.EqualFold) - detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqValues))}) + slices.Sort(values) + uniqueValues := slices.Compact(values) + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqueValues))}) } } return &logproto.DetectedLabelsResponse{ diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 22ad90f0c632d..1c98752633591 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1426,81 +1426,148 @@ func TestQuerier_isLabelRelevant(t *testing.T) { } func TestQuerier_DetectedLabels(t *testing.T) { - start := time.Now() - end := time.Now() manyValues := []string{} for i := 0; i < 60; i++ { manyValues = append(manyValues, "a"+strconv.Itoa(i)) } - ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ - "cluster": {Values: []string{"ingester"}}, - "foo": {Values: []string{"abc", "def", "ghi"}}, - "bar": {Values: []string{"cgi", "def"}}, - "all-ids": {Values: []string{"1", "2", "3", "5"}}, - "uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, - "manyvalues": {Values: manyValues}, - "namespace": {Values: manyValues}, - }} - - expectedResponse := logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ - { - Label: "cluster", - Cardinality: 1, - }, - { - Label: "namespace", - Cardinality: 60, - }, - { - Label: "foo", - Cardinality: 3, - }, - { - Label: "bar", - Cardinality: 2, - }, - { - Label: "label1", - Cardinality: 2, - }, + tests := []struct { + name string + start time.Time + end time.Time + ingesterResponse logproto.LabelToValuesResponse + storeResponse map[string][]string + query string + expectedResponse logproto.DetectedLabelsResponse + }{ { - Label: "label2", - Cardinality: 2, + name: "Both store and ingester responses are present and don't overlap", + start: time.Now(), + end: time.Now(), + ingesterResponse: logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "def", "ghi", "abc"}}, + "all-ids": {Values: []string{"1", "2", "3", "5"}}, + "uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + "manyvalues": {Values: manyValues}, + "namespace": {Values: manyValues}, + }}, + expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ + { + Label: "cluster", + Cardinality: 1, + }, + { + Label: "namespace", + Cardinality: 60, + }, + { + Label: "foo", + Cardinality: 3, + }, + { + Label: "a", + Cardinality: 2, + }, + }}, + storeResponse: map[string][]string{"a": {"val1", "val2"}}, + }, { + name: "Only ingester response is present", + start: time.Now(), + end: time.Now(), + ingesterResponse: logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "foo": {Values: []string{"abc", "def", "ghi"}}, + "all-ids": {Values: []string{"1", "2", "3", "5"}}, + }}, + expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ + { + Label: "foo", + Cardinality: 3, + }, + }}, + }, { + name: "Only store response is present", + storeResponse: map[string][]string{"foo": {"val1", "val2"}}, + expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ + { + Label: "foo", + Cardinality: 2, + }, + }}, + }, { + name: "Both store and ingester responses are present and overlap", + start: time.Now(), + end: time.Now(), + ingesterResponse: logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "def", "ghi", "abc"}}, + "all-ids": {Values: []string{"1", "2", "3", "5"}}, + "uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + "manyvalues": {Values: manyValues}, + "namespace": {Values: manyValues}, + }}, + expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ + { + Label: "cluster", + Cardinality: 1, + }, + { + Label: "namespace", + Cardinality: 60, + }, + { + Label: "foo", + Cardinality: 4, + }, + }}, + storeResponse: map[string][]string{"foo": {"abc", "lmo"}}, }, - }} - - query := "" - limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) - require.NoError(t, err) - ctx := user.InjectOrgID(context.Background(), "test") - request := logproto.DetectedLabelsRequest{ - Start: &start, - End: &end, - Query: query, } - conf := mockQuerierConfig() - conf.IngesterQueryStoreMaxLookback = 0 - ingesterClient := newQuerierClientMock() - ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(&ingesterResponse, nil) + for _, tc := range tests { + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + ctx := user.InjectOrgID(context.Background(), "test") + request := logproto.DetectedLabelsRequest{ + Start: &tc.start, + End: &tc.end, + Query: tc.query, + } + conf := mockQuerierConfig() + conf.IngesterQueryStoreMaxLookback = 0 - storeClient := newStoreMock() - storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{"label1", "label2"}, nil) - storeClient.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "label1", mock.Anything).Return([]string{"val1", "val2"}, nil) - storeClient.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "label2", mock.Anything).Return([]string{"val3", "val4"}, nil) + ingesterClient := newQuerierClientMock() + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&tc.ingesterResponse, nil) + storeClient := newStoreMock() + storeLabels := []string{} + if tc.storeResponse != nil { + for l := range tc.storeResponse { + storeLabels = append(storeLabels, l) + } + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(storeLabels, nil) - querier, err := newQuerier( - conf, - mockIngesterClientConfig(), - newIngesterClientMockFactory(ingesterClient), - mockReadRingWithOneActiveIngester(), - &mockDeleteGettter{}, - storeClient, limits) - require.NoError(t, err) - resp, err := querier.DetectedLabels(ctx, &request) - require.NoError(t, err) - calls := ingesterClient.GetMockedCallsByMethod("GetDetectedLabels") - assert.Equal(t, 1, len(calls)) - require.Equal(t, &expectedResponse, resp) + for _, l := range storeLabels { + vals, _ := tc.storeResponse[l] + storeClient.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, l, mock.Anything). + Return(vals, nil) + } + } else { + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(storeLabels, nil) + } + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + calls := ingesterClient.GetMockedCallsByMethod("GetDetectedLabels") + assert.Equal(t, 1, len(calls)) + require.Equal(t, &tc.expectedResponse, resp) + } } From 949928cb53a1e506ffd9d86c8655dfa40debcdd7 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 10 Apr 2024 22:19:25 +0530 Subject: [PATCH 16/26] Fix lint --- pkg/querier/querier.go | 11 +++++------ pkg/querier/querier_test.go | 2 +- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index fc78602c30655..212d5d8133c41 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -1006,13 +1006,12 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. } } - if storeLabelsMap != nil { - for label, values := range storeLabelsMap { - slices.Sort(values) - uniqueValues := slices.Compact(values) - detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqueValues))}) - } + for label, values := range storeLabelsMap { + slices.Sort(values) + uniqueValues := slices.Compact(values) + detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: label, Cardinality: uint64(len(uniqueValues))}) } + return &logproto.DetectedLabelsResponse{ DetectedLabels: detectedLabels, }, nil diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 1c98752633591..76394cb65ff0e 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1548,7 +1548,7 @@ func TestQuerier_DetectedLabels(t *testing.T) { Return(storeLabels, nil) for _, l := range storeLabels { - vals, _ := tc.storeResponse[l] + vals := tc.storeResponse[l] storeClient.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, l, mock.Anything). Return(vals, nil) } From 46102d1dab54ddfdc91f5124a706639efc39fd80 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Thu, 11 Apr 2024 15:50:39 +0530 Subject: [PATCH 17/26] Add metrics to detected labels --- pkg/logql/metrics.go | 41 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index e9921a07c2944..c07317bc9a455 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -579,6 +579,43 @@ func extractShard(shards []string) *astmapper.ShardAnnotation { return &shard } -func RecordDetectedLabelsQueryMetrics(_ context.Context, _ log.Logger, _ time.Time, _ time.Time, _ string, _ string, _ logql_stats.Result) { - // TODO(shantanu) log metrics here +func RecordDetectedLabelsQueryMetrics(ctx context.Context, log log.Logger, start time.Time, end time.Time, query string, status string, stats logql_stats.Result) { + var ( + logger = fixLogger(ctx, log) + latencyType = latencyTypeFast + queryType = QueryTypeVolume + ) + + // Tag throughput metric by latency type based on a threshold. + // Latency below the threshold is fast, above is slow. + if stats.Summary.ExecTime > slowQueryThresholdSecond { + latencyType = latencyTypeSlow + } + + rangeType := "range" + + level.Info(logger).Log( + "latency", latencyType, + "query_type", queryType, + "query", query, + "query_hash", util.HashedQuery(query), + "start", start.Format(time.RFC3339Nano), + "end", end.Format(time.RFC3339Nano), + "start_delta", time.Since(start), + "end_delta", time.Since(end), + "range_type", rangeType, + "length", end.Sub(start), + "duration", time.Duration(int64(stats.Summary.ExecTime*float64(time.Second))), + "status", status, + "splits", stats.Summary.Splits, + "total_entries", stats.Summary.TotalEntriesReturned, + // cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats + "cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, + "cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, + "cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored, + "cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), + "cache_volume_results_query_length_served", stats.Caches.VolumeResult.CacheQueryLengthServed(), + ) + + execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) } From 9d81685f8dda5741e607aea04a5b3079457d0108 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 15 Apr 2024 07:35:33 +0530 Subject: [PATCH 18/26] Fix assigning back to map in detected labels --- pkg/ingester/instance.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 65b7e287deafd..eb98f8a39b630 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -607,7 +607,10 @@ func (i *instance) LabelsWithValues(ctx context.Context, startTime time.Time, ma for _, v := range values { existingValues[v] = struct{}{} } + labelMap[label] = existingValues } + + return labelMap, nil } err := i.forMatchingStreams(ctx, startTime, matchers, nil, func(s *stream) error { From 8ceac343cb002392727a653ee79417f3f4ef1c1a Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 15 Apr 2024 07:41:43 +0530 Subject: [PATCH 19/26] Fixes to ingester_querier and tests --- pkg/querier/ingester_querier.go | 2 +- pkg/querier/ingester_querier_test.go | 15 +++++++-------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index 7fe103b2649eb..3bd5f0459c91a 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -376,7 +376,7 @@ func (q *IngesterQuerier) DetectedLabel(ctx context.Context, req *logproto.Detec for _, resp := range ingesterResponses { thisIngester, ok := resp.response.(*logproto.LabelToValuesResponse) if !ok { - level.Error(q.logger).Log("msg", "Cannot convert response to LabelToValuesResponse in detectedlabels", + level.Warn(q.logger).Log("msg", "Cannot convert response to LabelToValuesResponse in detectedlabels", "response", resp) } diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index 5757e34665c8f..d1dea6fc23dfc 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -7,10 +7,9 @@ import ( "testing" "time" + "github.com/go-kit/log" "go.uber.org/atomic" - util_log "github.com/grafana/loki/v3/pkg/util/log" - "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -112,7 +111,7 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, - util_log.Logger, + log.NewNopLogger(), ) require.NoError(t, err) @@ -213,7 +212,7 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, - util_log.Logger, + log.NewNopLogger(), ) require.NoError(t, err) @@ -312,7 +311,7 @@ func TestQuerier_tailDisconnectedIngesters(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, - util_log.Logger, + log.NewNopLogger(), ) require.NoError(t, err) @@ -376,7 +375,7 @@ func TestIngesterQuerier_Volume(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, - util_log.Logger, + log.NewNopLogger(), ) require.NoError(t, err) @@ -398,7 +397,7 @@ func TestIngesterQuerier_Volume(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, - util_log.Logger, + log.NewNopLogger(), ) require.NoError(t, err) @@ -433,7 +432,7 @@ func TestIngesterQuerier_DetectedLabels(t *testing.T) { mockQuerierConfig().ExtraQueryDelay, newIngesterClientMockFactory(ingesterClient), constants.Loki, - util_log.Logger, + log.NewNopLogger(), ) require.NoError(t, err) From f2244fa2cd3d7f85316a0df8e44a4599c82bf008 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 15 Apr 2024 08:19:27 +0530 Subject: [PATCH 20/26] Minor improvements as per PR comments --- pkg/querier/ingester_querier_test.go | 81 +++++++++------------------- pkg/querier/querier_test.go | 14 ++--- 2 files changed, 33 insertions(+), 62 deletions(-) diff --git a/pkg/querier/ingester_querier_test.go b/pkg/querier/ingester_querier_test.go index d1dea6fc23dfc..713c170f7dea2 100644 --- a/pkg/querier/ingester_querier_test.go +++ b/pkg/querier/ingester_querier_test.go @@ -105,14 +105,8 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { } else { ingesterClient.On(testData.method, mock.Anything, mock.Anything, mock.Anything).Return(testData.retVal, nil).Run(runFn) } - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock(ringIngesters, 1), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - log.NewNopLogger(), - ) + + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock(ringIngesters, 1), ingesterClient) require.NoError(t, err) wg.Add(3) @@ -206,14 +200,7 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) { } else { ingesterClient.On(testData.method, mock.Anything, mock.Anything, mock.Anything).Return(testData.retVal, nil).Run(runFn) } - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock(ringIngesters, 1), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - log.NewNopLogger(), - ) + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock(ringIngesters, 1), ingesterClient) require.NoError(t, err) wg.Add(3) @@ -305,14 +292,7 @@ func TestQuerier_tailDisconnectedIngesters(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("Tail", mock.Anything, &req, mock.Anything).Return(newTailClientMock(), nil) - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock(testData.ringIngesters, 0), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - log.NewNopLogger(), - ) + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock(testData.ringIngesters, 0), ingesterClient) require.NoError(t, err) actualClients, err := ingesterQuerier.TailDisconnectedIngesters(context.Background(), &req, testData.connectedIngestersAddr) @@ -369,14 +349,7 @@ func TestIngesterQuerier_Volume(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("GetVolume", mock.Anything, mock.Anything, mock.Anything).Return(ret, nil) - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - log.NewNopLogger(), - ) + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), ingesterClient) require.NoError(t, err) volumes, err := ingesterQuerier.Volume(context.Background(), "", 0, 1, 10, nil, "labels") @@ -391,14 +364,7 @@ func TestIngesterQuerier_Volume(t *testing.T) { ingesterClient := newQuerierClientMock() ingesterClient.On("GetVolume", mock.Anything, mock.Anything, mock.Anything).Return(nil, status.Error(codes.Unimplemented, "something bad")) - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - log.NewNopLogger(), - ) + ingesterQuerier, err := newTestIngesterQuerier(newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), ingesterClient) require.NoError(t, err) volumes, err := ingesterQuerier.Volume(context.Background(), "", 0, 1, 10, nil, "labels") @@ -411,12 +377,6 @@ func TestIngesterQuerier_Volume(t *testing.T) { func TestIngesterQuerier_DetectedLabels(t *testing.T) { t.Run("it returns all unique detected labels from all ingesters", func(t *testing.T) { req := logproto.DetectedLabelsRequest{} - res := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ - "all-ids": {Values: []string{"1", "3"}}, - "bar": {Values: []string{"cgi", "def"}}, - "cluster": {Values: []string{"ingester"}}, - "foo": {Values: []string{"abc", "ghi"}}, - }} ingesterClient := newQuerierClientMock() ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything).Return(&logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ @@ -426,18 +386,29 @@ func TestIngesterQuerier_DetectedLabels(t *testing.T) { "all-ids": {Values: []string{"1", "3", "3", "3"}}, }}, nil) - ingesterQuerier, err := newIngesterQuerier( - mockIngesterClientConfig(), - newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0), - mockQuerierConfig().ExtraQueryDelay, - newIngesterClientMockFactory(ingesterClient), - constants.Loki, - log.NewNopLogger(), - ) + readRingMock := newReadRingMock([]ring.InstanceDesc{mockInstanceDesc("1.1.1.1", ring.ACTIVE), mockInstanceDesc("3.3.3.3", ring.ACTIVE)}, 0) + ingesterQuerier, err := newTestIngesterQuerier(readRingMock, ingesterClient) require.NoError(t, err) detectedLabels, err := ingesterQuerier.DetectedLabel(context.Background(), &req) require.NoError(t, err) - require.Equal(t, &res, detectedLabels) + + require.Equal(t, &logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "all-ids": {Values: []string{"1", "3"}}, + "bar": {Values: []string{"cgi", "def"}}, + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "ghi"}}, + }}, detectedLabels) }) } + +func newTestIngesterQuerier(readRingMock *readRingMock, ingesterClient *querierClientMock) (*IngesterQuerier, error) { + return newIngesterQuerier( + mockIngesterClientConfig(), + readRingMock, + mockQuerierConfig().ExtraQueryDelay, + newIngesterClientMockFactory(ingesterClient), + constants.Loki, + log.NewNopLogger(), + ) +} diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 76394cb65ff0e..95093e5ae1944 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1444,12 +1444,12 @@ func TestQuerier_DetectedLabels(t *testing.T) { start: time.Now(), end: time.Now(), ingesterResponse: logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ - "cluster": {Values: []string{"ingester"}}, - "foo": {Values: []string{"abc", "def", "ghi", "abc"}}, - "all-ids": {Values: []string{"1", "2", "3", "5"}}, - "uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, - "manyvalues": {Values: manyValues}, - "namespace": {Values: manyValues}, + "cluster": {Values: []string{"ingester"}}, + "foo": {Values: []string{"abc", "def", "ghi", "abc"}}, + "all-ids": {Values: []string{"1", "2", "3", "5"}}, + "uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + "not-relevant": {Values: manyValues}, + "namespace": {Values: manyValues}, }}, expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ { @@ -1486,7 +1486,7 @@ func TestQuerier_DetectedLabels(t *testing.T) { }}, }, { name: "Only store response is present", - storeResponse: map[string][]string{"foo": {"val1", "val2"}}, + storeResponse: map[string][]string{"foo": {"val1", "val2", "val1"}, "all-ids": {"1", "2", "3", "5"}}, expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ { Label: "foo", From cab219f581338e10ad5c2a23f3ee51f43264bfbb Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 17 Apr 2024 14:55:45 +0530 Subject: [PATCH 21/26] Convert table tests to methods --- pkg/ingester/instance_test.go | 84 ++++++++++++++++------------------- 1 file changed, 39 insertions(+), 45 deletions(-) diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 1f780e2bd9fb2..acc5864fc5573 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -1486,53 +1486,47 @@ func TestInstance_LabelsWithValues(t *testing.T) { m, err := labels.NewMatcher(labels.MatchEqual, "app", "test") require.NoError(t, err) - tests := []struct { - name string - startTime time.Time - matchers []*labels.Matcher - expectedResponse map[string]UniqueValues - }{ - { - name: "label names with no matchers", - startTime: start, - expectedResponse: map[string]UniqueValues{ - "app": map[string]struct{}{ - "test": {}, - "test2": {}, - }, - "job": map[string]struct{}{ - "varlogs": {}, - "varlogs2": {}, - }, + t.Run("label names with no matchers returns all detected labels", func(t *testing.T) { + var matchers []*labels.Matcher + res, err := instance.LabelsWithValues(context.Background(), start, matchers...) + completeResponse := map[string]UniqueValues{ + "app": map[string]struct{}{ + "test": {}, + "test2": {}, }, - }, - { - name: "label names with matchers", - startTime: start, - matchers: []*labels.Matcher{m}, - expectedResponse: map[string]UniqueValues{ - "app": map[string]struct{}{ - "test": {}, - }, - "job": map[string]struct{}{ - "varlogs": {}, - "varlogs2": {}, - }, + "job": map[string]struct{}{ + "varlogs": {}, + "varlogs2": {}, }, - }, - { - name: "label names matchers and no start time", - matchers: []*labels.Matcher{m}, - expectedResponse: map[string]UniqueValues{}, - }, - } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - res, err := instance.LabelsWithValues(context.Background(), tc.startTime, tc.matchers...) - require.NoError(t, err) - require.Equal(t, tc.expectedResponse, res) - }) - } + } + require.NoError(t, err) + require.Equal(t, completeResponse, res) + }) + + t.Run("label names with matcher returns response with matching detected labels", func(t *testing.T) { + matchers := []*labels.Matcher{m} + res, err := instance.LabelsWithValues(context.Background(), start, matchers...) + responseWithMatchingLabel := map[string]UniqueValues{ + "app": map[string]struct{}{ + "test": {}, + }, + "job": map[string]struct{}{ + "varlogs": {}, + "varlogs2": {}, + }, + } + require.NoError(t, err) + require.Equal(t, responseWithMatchingLabel, res) + }) + + t.Run("label names matchers and no start time returns a empty response", func(t *testing.T) { + matchers := []*labels.Matcher{m} + var st time.Time + res, err := instance.LabelsWithValues(context.Background(), st, matchers...) + + require.NoError(t, err) + require.Equal(t, map[string]UniqueValues{}, res) + }) } type fakeQueryServer func(*logproto.QueryResponse) error From e26c62d1b2885815cd9987db31b7a7d96e59c268 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Wed, 17 Apr 2024 16:33:57 +0530 Subject: [PATCH 22/26] Convert querier table tests to methods --- pkg/querier/querier_test.go | 355 ++++++++++++++++++++++++------------ 1 file changed, 235 insertions(+), 120 deletions(-) diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 95093e5ae1944..879d7fea3b90b 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1427,134 +1427,79 @@ func TestQuerier_isLabelRelevant(t *testing.T) { func TestQuerier_DetectedLabels(t *testing.T) { manyValues := []string{} + now := time.Now() for i := 0; i < 60; i++ { manyValues = append(manyValues, "a"+strconv.Itoa(i)) } - tests := []struct { - name string - start time.Time - end time.Time - ingesterResponse logproto.LabelToValuesResponse - storeResponse map[string][]string - query string - expectedResponse logproto.DetectedLabelsResponse - }{ - { - name: "Both store and ingester responses are present and don't overlap", - start: time.Now(), - end: time.Now(), - ingesterResponse: logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ - "cluster": {Values: []string{"ingester"}}, - "foo": {Values: []string{"abc", "def", "ghi", "abc"}}, - "all-ids": {Values: []string{"1", "2", "3", "5"}}, - "uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, - "not-relevant": {Values: manyValues}, - "namespace": {Values: manyValues}, - }}, - expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ - { - Label: "cluster", - Cardinality: 1, - }, - { - Label: "namespace", - Cardinality: 60, - }, - { - Label: "foo", - Cardinality: 3, - }, - { - Label: "a", - Cardinality: 2, - }, - }}, - storeResponse: map[string][]string{"a": {"val1", "val2"}}, - }, { - name: "Only ingester response is present", - start: time.Now(), - end: time.Now(), - ingesterResponse: logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ - "foo": {Values: []string{"abc", "def", "ghi"}}, - "all-ids": {Values: []string{"1", "2", "3", "5"}}, - }}, - expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ - { - Label: "foo", - Cardinality: 3, - }, - }}, - }, { - name: "Only store response is present", - storeResponse: map[string][]string{"foo": {"val1", "val2", "val1"}, "all-ids": {"1", "2", "3", "5"}}, - expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ - { - Label: "foo", - Cardinality: 2, - }, - }}, - }, { - name: "Both store and ingester responses are present and overlap", - start: time.Now(), - end: time.Now(), - ingesterResponse: logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ - "cluster": {Values: []string{"ingester"}}, - "foo": {Values: []string{"abc", "def", "ghi", "abc"}}, - "all-ids": {Values: []string{"1", "2", "3", "5"}}, - "uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, - "manyvalues": {Values: manyValues}, - "namespace": {Values: manyValues}, - }}, - expectedResponse: logproto.DetectedLabelsResponse{DetectedLabels: []*logproto.DetectedLabel{ - { - Label: "cluster", - Cardinality: 1, - }, - { - Label: "namespace", - Cardinality: 60, - }, - { - Label: "foo", - Cardinality: 4, - }, - }}, - storeResponse: map[string][]string{"foo": {"abc", "lmo"}}, - }, + + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + ctx := user.InjectOrgID(context.Background(), "test") + + conf := mockQuerierConfig() + conf.IngesterQueryStoreMaxLookback = 0 + + request := logproto.DetectedLabelsRequest{ + Start: &now, + End: &now, + Query: "", } - for _, tc := range tests { - limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) - require.NoError(t, err) - ctx := user.InjectOrgID(context.Background(), "test") - request := logproto.DetectedLabelsRequest{ - Start: &tc.start, - End: &tc.end, - Query: tc.query, - } - conf := mockQuerierConfig() - conf.IngesterQueryStoreMaxLookback = 0 + t.Run("when both store and ingester responses are present, a combined response is returned", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "ingesterLabel": {Values: []string{"abc", "def", "ghi", "abc"}}, + }} ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(&tc.ingesterResponse, nil) + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{"storeLabel"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "storeLabel", mock.Anything). + Return([]string{"val1", "val2"}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + calls := ingesterClient.GetMockedCallsByMethod("GetDetectedLabels") + assert.Equal(t, 1, len(calls)) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 3) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "storeLabel", Cardinality: 2}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "ingesterLabel", Cardinality: 3}) + }) + + t.Run("when both store and ingester responses are present, duplicates are removed", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "ingesterLabel": {Values: []string{"abc", "def", "ghi", "abc"}}, + "commonLabel": {Values: []string{"abc", "def", "ghi", "abc"}}, + }} + + ingesterClient := newQuerierClientMock() storeClient := newStoreMock() - storeLabels := []string{} - if tc.storeResponse != nil { - for l := range tc.storeResponse { - storeLabels = append(storeLabels, l) - } - storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). - Return(storeLabels, nil) - for _, l := range storeLabels { - vals := tc.storeResponse[l] - storeClient.On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, l, mock.Anything). - Return(vals, nil) - } - } else { - storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(storeLabels, nil) - } + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{"storeLabel", "commonLabel"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "storeLabel", mock.Anything). + Return([]string{"val1", "val2"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "commonLabel", mock.Anything). + Return([]string{"def", "xyz", "lmo", "abc"}, nil) querier, err := newQuerier( conf, @@ -1564,10 +1509,180 @@ func TestQuerier_DetectedLabels(t *testing.T) { &mockDeleteGettter{}, storeClient, limits) require.NoError(t, err) + resp, err := querier.DetectedLabels(ctx, &request) require.NoError(t, err) + calls := ingesterClient.GetMockedCallsByMethod("GetDetectedLabels") assert.Equal(t, 1, len(calls)) - require.Equal(t, &tc.expectedResponse, resp) - } + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 4) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "storeLabel", Cardinality: 2}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "ingesterLabel", Cardinality: 3}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "commonLabel", Cardinality: 5}) + }) + + t.Run("returns a response when ingester data is empty", func(t *testing.T) { + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&logproto.LabelToValuesResponse{}, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{"storeLabel1", "storeLabel2"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "storeLabel1", mock.Anything). + Return([]string{"val1", "val2"}, nil). + On("LabelValuesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, "storeLabel2", mock.Anything). + Return([]string{"val1", "val2"}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 2) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "storeLabel1", Cardinality: 2}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "storeLabel2", Cardinality: 2}) + }) + + t.Run("returns a response when store data is empty", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"ingester"}}, + "ingesterLabel": {Values: []string{"abc", "def", "ghi", "abc"}}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 2) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "cluster", Cardinality: 1}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "ingesterLabel", Cardinality: 3}) + }) + + t.Run("id types like uuids, guids and numbers are not relevant detected labels", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "all-ints": {Values: []string{"1", "2", "3", "4"}}, + "all-floats": {Values: []string{"1.2", "2.3", "3.4", "4.5"}}, + "all-uuids": {Values: []string{"751e8ee6-b377-4b2e-b7b5-5508fbe980ef", "6b7e2663-8ecb-42e1-8bdc-0c5de70185b3", "2e1e67ff-be4f-47b8-aee1-5d67ff1ddabf", "c95b2d62-74ed-4ed7-a8a1-eb72fc67946e"}}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 0) + }) + + t.Run("labels with more than required cardinality are not relevant", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "less-than-m-values": {Values: []string{"val1"}}, + "more-than-n-values": {Values: manyValues}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{}, nil) + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 0) + }) + + t.Run("static labels are always returned no matter their cardinality or value types", func(t *testing.T) { + ingesterResponse := logproto.LabelToValuesResponse{Labels: map[string]*logproto.UniqueLabelValues{ + "cluster": {Values: []string{"val1"}}, + "namespace": {Values: manyValues}, + "pod": {Values: []string{"1", "2", "3", "4"}}, + }} + + ingesterClient := newQuerierClientMock() + storeClient := newStoreMock() + + ingesterClient.On("GetDetectedLabels", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&ingesterResponse, nil) + storeClient.On("LabelNamesForMetricName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return([]string{}, nil) + request := logproto.DetectedLabelsRequest{ + Start: &now, + End: &now, + Query: "", + } + + querier, err := newQuerier( + conf, + mockIngesterClientConfig(), + newIngesterClientMockFactory(ingesterClient), + mockReadRingWithOneActiveIngester(), + &mockDeleteGettter{}, + storeClient, limits) + require.NoError(t, err) + + resp, err := querier.DetectedLabels(ctx, &request) + require.NoError(t, err) + + detectedLabels := resp.DetectedLabels + assert.Len(t, detectedLabels, 3) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "cluster", Cardinality: 1}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "pod", Cardinality: 4}) + assert.Contains(t, detectedLabels, &logproto.DetectedLabel{Label: "namespace", Cardinality: 60}) + }) } From 24d90c4046afce5d70500d4ad39409bf3f4fd1ec Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 15 Apr 2024 07:06:05 +0530 Subject: [PATCH 23/26] Add detected label ingester test with query --- pkg/ingester/ingester_test.go | 60 +++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index c440287472fb0..6722e6cfccf3a 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -847,6 +847,66 @@ func TestIngester_GetDetectedLabels(t *testing.T) { require.Equal(t, 3, len(barValues.Values)) } +func TestIngester_GetDetectedLabelsWithQuery(t *testing.T) { + ctx := user.InjectOrgID(context.Background(), "test") + + ingesterConfig := defaultIngesterTestConfig(t) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + store := &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger()) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + // Push labels + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{foo="bar",bar="baz1"}`, + }, + { + Labels: `{foo="bar",bar="baz2"}`, + }, + { + Labels: `{foo="bar1",bar="baz3"}`, + }, + { + Labels: `{foo="foo1",bar="baz4"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + } + + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + res, err := i.GetDetectedLabels(ctx, &logproto.DetectedLabelsRequest{ + Start: &[]time.Time{time.Now().Add(11 * time.Nanosecond)}[0], + End: nil, + Query: `{foo="bar"}`, + }) + + require.NoError(t, err) + fooValues, ok := res.Labels["foo"] + require.True(t, ok) + barValues, ok := res.Labels["bar"] + require.True(t, ok) + require.Equal(t, 1, len(fooValues.Values)) + require.Equal(t, 2, len(barValues.Values)) +} + func Test_DedupeIngester(t *testing.T) { var ( requests = int64(400) From a075f57a28160d1c4890adba509da6e3aef89a3c Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Fri, 19 Apr 2024 14:16:40 +0530 Subject: [PATCH 24/26] Add detected labels tripperware --- pkg/logql/metrics.go | 1 + pkg/querier/queryrange/roundtrip.go | 43 ++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 10d158f845ed4..b3b22ff43c02d 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -596,6 +596,7 @@ func RecordDetectedLabelsQueryMetrics(ctx context.Context, log log.Logger, start rangeType := "range" level.Info(logger).Log( + "api", "detected_labels", "latency", latencyType, "query_type", queryType, "query", query, diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index 228f20a514057..f2ff2a485ef20 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -250,6 +250,19 @@ func NewMiddleware( return nil, nil, err } + detectedLabelsTripperware, err := NewDetectedLabelsTripperware( + cfg, + engineOpts, + log, + limits, + schema, + metrics, + indexStatsTripperware, + metricsNamespace) + + if err != nil { + return nil, nil, err + } return base.MiddlewareFunc(func(next base.Handler) base.Handler { var ( metricRT = metricsTripperware.Wrap(next) @@ -261,13 +274,41 @@ func NewMiddleware( statsRT = indexStatsTripperware.Wrap(next) seriesVolumeRT = seriesVolumeTripperware.Wrap(next) detectedFieldsRT = detectedFieldsTripperware.Wrap(next) - detectedLabelsRT = next // TODO(shantanu): add middlewares + detectedLabelsRT = detectedLabelsTripperware.Wrap(next) ) return newRoundTripper(log, next, limitedRT, logFilterRT, metricRT, seriesRT, labelsRT, instantRT, statsRT, seriesVolumeRT, detectedFieldsRT, detectedLabelsRT, limits) }), StopperWrapper{resultsCache, statsCache, volumeCache}, nil } +func NewDetectedLabelsTripperware(cfg Config, opts logql.EngineOpts, logger log.Logger, l Limits, schema config.SchemaConfig, metrics *Metrics, mw base.Middleware, namespace string) (base.Middleware, error) { + return base.MiddlewareFunc(func(next base.Handler) base.Handler { + statsHandler := mw.Wrap(next) + + queryRangeMiddleware := []base.Middleware{ + StatsCollectorMiddleware(), + NewLimitsMiddleware(l), + NewQuerySizeLimiterMiddleware(schema.Configs, opts, logger, l, statsHandler), + base.InstrumentMiddleware("split_by_interval", metrics.InstrumentMiddlewareMetrics), + } + + // The sharding middleware takes care of enforcing this limit for both shardable and non-shardable queries. + // If we are not using sharding, we enforce the limit by adding this middleware after time splitting. + queryRangeMiddleware = append(queryRangeMiddleware, + NewQuerierSizeLimiterMiddleware(schema.Configs, opts, logger, l, statsHandler), + ) + + if cfg.MaxRetries > 0 { + queryRangeMiddleware = append( + queryRangeMiddleware, base.InstrumentMiddleware("retry", metrics.InstrumentMiddlewareMetrics), + base.NewRetryMiddleware(logger, cfg.MaxRetries, metrics.RetryMiddlewareMetrics, namespace), + ) + } + + return NewLimitedRoundTripper(next, l, schema.Configs, queryRangeMiddleware...) + }), nil +} + type roundTripper struct { logger log.Logger From cdc11f9072d1338408eef60d99fc299a9a8f1736 Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Fri, 19 Apr 2024 14:18:41 +0530 Subject: [PATCH 25/26] Comment cache metrics --- pkg/logql/metrics.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index b3b22ff43c02d..052446c6b5b74 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -612,11 +612,11 @@ func RecordDetectedLabelsQueryMetrics(ctx context.Context, log log.Logger, start "splits", stats.Summary.Splits, "total_entries", stats.Summary.TotalEntriesReturned, // cache is accumulated by middleware used by the frontend only; logs from the queriers will not show cache stats - "cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, - "cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, - "cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored, - "cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), - "cache_volume_results_query_length_served", stats.Caches.VolumeResult.CacheQueryLengthServed(), + //"cache_volume_results_req", stats.Caches.VolumeResult.EntriesRequested, + //"cache_volume_results_hit", stats.Caches.VolumeResult.EntriesFound, + //"cache_volume_results_stored", stats.Caches.VolumeResult.EntriesStored, + //"cache_volume_results_download_time", stats.Caches.VolumeResult.CacheDownloadTime(), + //"cache_volume_results_query_length_served", stats.Caches.VolumeResult.CacheQueryLengthServed(), ) execLatency.WithLabelValues(status, queryType, "").Observe(stats.Summary.ExecTime) From 4f4921984fa775d001df7f50841559fcf689a32f Mon Sep 17 00:00:00 2001 From: shantanualshi Date: Mon, 22 Apr 2024 12:08:31 +0530 Subject: [PATCH 26/26] Change static labels to map --- pkg/querier/querier.go | 10 +++++----- pkg/querier/querier_test.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 6c99b99893f47..bee850fd82d69 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -916,7 +916,7 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. return nil, err } var detectedLabels []*logproto.DetectedLabel - staticLabels := []string{"cluster", "namespace", "instance", "pod"} + staticLabels := map[string]struct{}{"cluster": {}, "namespace": {}, "instance": {}, "pod": {}} // Enforce the query timeout while querying backends queryTimeout := q.limits.QueryTimeout(ctx, userID) @@ -980,7 +980,7 @@ func (q *SingleTenantQuerier) DetectedLabels(ctx context.Context, req *logproto. } // append static labels before so they are in sorted order - for _, l := range staticLabels { + for l := range staticLabels { if values, present := ingesterLabels.Labels[l]; present { detectedLabels = append(detectedLabels, &logproto.DetectedLabel{Label: l, Cardinality: uint64(len(values.Values))}) } @@ -1033,10 +1033,10 @@ func (q *SingleTenantQuerier) Patterns(ctx context.Context, req *logproto.QueryP // isLabelRelevant returns if the label is relevant for logs app. A label is relevant if it is not of any numeric, UUID or GUID type // It is also not relevant to return if the values are less than 1 or beyond 50. -func (q *SingleTenantQuerier) isLabelRelevant(label string, values []string, staticLabels []string) bool { +func (q *SingleTenantQuerier) isLabelRelevant(label string, values []string, staticLabels map[string]struct{}) bool { cardinality := len(values) - if slices.Contains(staticLabels, label) || - (cardinality < 2 || cardinality > 50) || + _, isStaticLabel := staticLabels[label] + if isStaticLabel || (cardinality < 2 || cardinality > 50) || containsAllIDTypes(values) { return false } diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 879d7fea3b90b..e6c228f04920e 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -1419,7 +1419,7 @@ func TestQuerier_isLabelRelevant(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { querier := &SingleTenantQuerier{cfg: mockQuerierConfig()} - assert.Equal(t, tc.expected, querier.isLabelRelevant(tc.label, tc.values, []string{"host", "cluster", "namespace", "instance", "pod"})) + assert.Equal(t, tc.expected, querier.isLabelRelevant(tc.label, tc.values, map[string]struct{}{"host": {}, "cluster": {}, "namespace": {}, "instance": {}, "pod": {}})) }) }