From 77264f9d47bfe45b4a511293895ef4930a90aeb0 Mon Sep 17 00:00:00 2001 From: Ashwanth Goli Date: Fri, 9 Feb 2024 17:46:52 +0530 Subject: [PATCH] fix and refactor label/series cache test --- pkg/querier/queryrange/labels_cache_test.go | 251 +++++++---------- pkg/querier/queryrange/series_cache_test.go | 284 +++++++++----------- 2 files changed, 222 insertions(+), 313 deletions(-) diff --git a/pkg/querier/queryrange/labels_cache_test.go b/pkg/querier/queryrange/labels_cache_test.go index 048bede2fade3..90b85cb1faf82 100644 --- a/pkg/querier/queryrange/labels_cache_test.go +++ b/pkg/querier/queryrange/labels_cache_test.go @@ -81,173 +81,124 @@ func TestLabelsCache(t *testing.T) { return cacheMiddleware } - cacheMiddleware := setupCacheMW() - for _, values := range []bool{false, true} { - prefix := "labels" - if values { - prefix = "label values" - } - t.Run(prefix+": cache the response for the same request", func(t *testing.T) { - start := testTime.Truncate(time.Millisecond) - end := start.Add(time.Hour) - - labelsReq := LabelRequest{ - LabelRequest: logproto.LabelRequest{ - Start: &start, - End: &end, - }, - } - - if values { - labelsReq.Values = true - labelsReq.Name = "foo" - labelsReq.Query = `{cluster="eu-west1"}` - } - - labelsResp := &LokiLabelNamesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []string{"bar", "buzz"}, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, + composeLabelsResp := func(lbls []string, splits int64) *LokiLabelNamesResponse { + return &LokiLabelNamesResponse{ + Status: "success", + Version: uint32(loghttp.VersionV1), + Data: lbls, + Statistics: stats.Result{ + Summary: stats.Summary{ + Splits: splits, }, - } - - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // should request the entire length with no partitioning as nothing is cached yet. - require.Equal(t, labelsReq.GetStart(), r.GetStart()) - require.Equal(t, labelsReq.GetEnd(), r.GetEnd()) - - got := r.(*LabelRequest) - require.Equal(t, labelsReq.GetName(), got.GetName()) - require.Equal(t, labelsReq.GetValues(), got.GetValues()) - require.Equal(t, labelsReq.GetQuery(), got.GetQuery()) - - return labelsResp, nil - })) + }, + } - ctx := user.InjectOrgID(context.Background(), "fake") - got, err := handler.Do(ctx, &labelsReq) - require.NoError(t, err) - require.Equal(t, 1, called) // called actual handler, as not cached. - require.Equal(t, labelsResp, got) + } - // Doing same request again shouldn't change anything. - called = 0 - got, err = handler.Do(ctx, &labelsReq) - require.NoError(t, err) - require.Equal(t, 0, called) - require.Equal(t, labelsResp, got) - }) + start := testTime.Truncate(time.Millisecond) + end := start.Add(time.Hour) + labelsReq := &LabelRequest{ + LabelRequest: logproto.LabelRequest{ + Start: &start, + End: &end, + }, } + labelsResp := composeLabelsResp([]string{"bar", "buzz"}, 1) + + var downstreamHandlerFunc func(context.Context, queryrangebase.Request) (queryrangebase.Response, error) + downstreamHandler := &mockDownstreamHandler{fn: func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + return downstreamHandlerFunc(ctx, req) + }} - // reset cacheMiddleware - cacheMiddleware = setupCacheMW() for _, values := range []bool{false, true} { + labelsReq := labelsReq prefix := "labels" + if values { - prefix = "label values" + prefix = "label values: " + labelsReq.Values = true + labelsReq.Name = "foo" + labelsReq.Query = `{cluster="eu-west1"}` } - t.Run(prefix+": a new request with overlapping time range should reuse part of the previous request for the overlap", func(t *testing.T) { - cacheMiddleware := setupCacheMW() - - start := testTime.Truncate(time.Millisecond) - end := start.Add(time.Hour) - - labelsReq1 := LabelRequest{ - LabelRequest: logproto.LabelRequest{ - Start: &start, - End: &end, - }, - } - - if values { - labelsReq1.Values = true - labelsReq1.Name = "foo" - labelsReq1.Query = `{cluster="eu-west1"}` - } - - labelsResp1 := &LokiLabelNamesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []string{"bar", "buzz"}, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - } - - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - // should request the entire length with no partitioning as nothing is cached yet. - require.Equal(t, labelsReq1.GetStart(), r.GetStart()) - require.Equal(t, labelsReq1.GetEnd(), r.GetEnd()) + for _, tc := range []struct { + name string + req queryrangebase.Request + expectedQueryStart, expectedQueryEnd time.Time + downstreamResponse *LokiLabelNamesResponse + downstreamCalls int + expectedReponse *LokiLabelNamesResponse + }{ + { + name: "return cached response for the same request", + downstreamCalls: 0, + expectedReponse: labelsResp, + req: labelsReq, + }, + { + name: "a new request with overlapping time range should reuse results of the previous request", + req: labelsReq.WithStartEnd(labelsReq.GetStart(), labelsReq.GetEnd().Add(15*time.Minute)), + expectedQueryStart: labelsReq.GetEnd(), + expectedQueryEnd: labelsReq.GetEnd().Add(15 * time.Minute), + downstreamCalls: 1, + downstreamResponse: composeLabelsResp([]string{"fizz"}, 1), + expectedReponse: composeLabelsResp([]string{"bar", "buzz", "fizz"}, 2), + }, + { + // To avoid returning incorrect results, we only use extents that are entirely within the requested query range. + name: "cached response not entirely within the requested range", + req: labelsReq.WithStartEnd(labelsReq.GetStart().Add(15*time.Minute), labelsReq.GetEnd().Add(-15*time.Minute)), + expectedQueryStart: labelsReq.GetStart().Add(15 * time.Minute), + expectedQueryEnd: labelsReq.GetEnd().Add(-15 * time.Minute), + downstreamCalls: 1, + downstreamResponse: composeLabelsResp([]string{"buzz", "fizz"}, 1), + expectedReponse: composeLabelsResp([]string{"buzz", "fizz"}, 1), + }, + } { + t.Run(prefix+tc.name, func(t *testing.T) { + cacheMiddleware := setupCacheMW() + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + // should request the entire length with no partitioning as nothing is cached yet. + require.Equal(t, labelsReq.GetStart(), r.GetStart()) + require.Equal(t, labelsReq.GetEnd(), r.GetEnd()) + + got := r.(*LabelRequest) + require.Equal(t, labelsReq.GetName(), got.GetName()) + require.Equal(t, labelsReq.GetValues(), got.GetValues()) + require.Equal(t, labelsReq.GetQuery(), got.GetQuery()) + + return labelsResp, nil + } - got := r.(*LabelRequest) - require.Equal(t, labelsReq1.GetName(), got.GetName()) - require.Equal(t, labelsReq1.GetValues(), got.GetValues()) - require.Equal(t, labelsReq1.GetQuery(), got.GetQuery()) + handler := cacheMiddleware.Wrap(downstreamHandler) - return labelsResp1, nil - })) + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, labelsReq) + require.NoError(t, err) + require.Equal(t, 1, downstreamHandler.Called()) // call downstream handler, as not cached. + require.Equal(t, labelsResp, got) - ctx := user.InjectOrgID(context.Background(), "fake") - got, err := handler.Do(ctx, &labelsReq1) - require.NoError(t, err) - require.Equal(t, 1, called) - require.Equal(t, labelsResp1, got) + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + require.Equal(t, tc.expectedQueryStart, r.GetStart()) + require.Equal(t, tc.expectedQueryEnd, r.GetEnd()) - labelsReq2 := labelsReq1.WithStartEnd(labelsReq1.GetStart().Add(15*time.Minute), labelsReq1.GetEnd().Add(15*time.Minute)) + got := r.(*LabelRequest) + require.Equal(t, labelsReq.GetName(), got.GetName()) + require.Equal(t, labelsReq.GetValues(), got.GetValues()) + require.Equal(t, labelsReq.GetQuery(), got.GetQuery()) - called = 0 - handler = cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ + return tc.downstreamResponse, nil + } - // make downstream request only for the non-overlapping portion of the query. - require.Equal(t, labelsReq1.GetEnd(), r.GetStart()) - require.Equal(t, labelsReq2.GetEnd(), r.GetEnd()) - - got := r.(*LabelRequest) - require.Equal(t, labelsReq1.GetName(), got.GetName()) - require.Equal(t, labelsReq1.GetValues(), got.GetValues()) - require.Equal(t, labelsReq1.GetQuery(), got.GetQuery()) - - return &LokiLabelNamesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []string{"fizz"}, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - }, nil - })) + got, err = handler.Do(ctx, tc.req) + require.NoError(t, err) + require.Equal(t, tc.downstreamCalls, downstreamHandler.Called()) + require.Equal(t, tc.expectedReponse, got) - got, err = handler.Do(ctx, labelsReq2) - require.NoError(t, err) - require.Equal(t, 1, called) - // two splits as we merge the results from the extent and downstream request - labelsResp1.Statistics.Summary.Splits = 2 - require.Equal(t, &LokiLabelNamesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []string{"bar", "buzz", "fizz"}, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 2, - }, - }, - }, got) - }) + }) + } } } diff --git a/pkg/querier/queryrange/series_cache_test.go b/pkg/querier/queryrange/series_cache_test.go index 79b1750b16417..6ba869a69411a 100644 --- a/pkg/querier/queryrange/series_cache_test.go +++ b/pkg/querier/queryrange/series_cache_test.go @@ -90,195 +90,135 @@ func TestSeriesCache(t *testing.T) { return cacheMiddleware } - t.Run("caches the response for the same request", func(t *testing.T) { - cacheMiddleware := setupCacheMW() - from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) - - seriesReq := &LokiSeriesRequest{ - StartTs: from.Time(), - EndTs: through.Time(), - Match: []string{`{namespace=~".*"}`}, - Path: seriesAPIPath, - } - - seriesResp := &LokiSeriesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, - }, - }, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, + composeSeriesResp := func(series [][]logproto.SeriesIdentifier_LabelsEntry, splits int64) *LokiSeriesResponse { + var data []logproto.SeriesIdentifier + for _, v := range series { + data = append(data, logproto.SeriesIdentifier{Labels: v}) } - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // should request the entire length with no partitioning as nothing is cached yet. - require.Equal(t, seriesReq.GetStart(), r.GetStart()) - require.Equal(t, seriesReq.GetEnd(), r.GetEnd()) - - return seriesResp, nil - })) - - ctx := user.InjectOrgID(context.Background(), "fake") - got, err := handler.Do(ctx, seriesReq) - require.NoError(t, err) - require.Equal(t, 1, called) // called actual handler, as not cached. - require.Equal(t, seriesResp, got) - - // Doing same request again shouldn't change anything. - called = 0 - got, err = handler.Do(ctx, seriesReq) - require.NoError(t, err) - require.Equal(t, 0, called) - require.Equal(t, seriesResp, got) - }) - - t.Run("a new request with overlapping time range should reuse part of the previous request for the overlap", func(t *testing.T) { - cacheMiddleware := setupCacheMW() - - from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) - req1 := &LokiSeriesRequest{ - StartTs: from.Time(), - EndTs: through.Time(), - Match: []string{`{namespace=~".*"}`}, - Path: seriesAPIPath, - } - resp1 := &LokiSeriesResponse{ + return &LokiSeriesResponse{ Status: "success", Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, - }, - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, - }, - }, + Data: data, Statistics: stats.Result{ Summary: stats.Summary{ - Splits: 1, + Splits: splits, }, }, } + } - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // should request the entire length with no partitioning as nothing is cached yet. - require.Equal(t, req1.GetStart(), r.GetStart()) - require.Equal(t, req1.GetEnd(), r.GetEnd()) + var downstreamHandlerFunc func(context.Context, queryrangebase.Request) (queryrangebase.Response, error) + downstreamHandler := &mockDownstreamHandler{fn: func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + return downstreamHandlerFunc(ctx, req) + }} - return resp1, nil - })) + from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) + seriesReq := &LokiSeriesRequest{ + StartTs: from.Time(), + EndTs: through.Time(), + Match: []string{`{namespace=~".*"}`}, + Path: seriesAPIPath, + } + seriesResp := composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, + {{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, + }, 1) + + for _, tc := range []struct { + name string + req queryrangebase.Request + expectedQueryStart, expectedQueryEnd time.Time + downstreamResponse *LokiSeriesResponse + downstreamCalls int + expectedReponse *LokiSeriesResponse + }{ + { + name: "return cached response for the same request", + downstreamCalls: 0, + expectedReponse: seriesResp, + req: seriesReq, + }, + { + name: "a new request with overlapping time range should reuse results of the previous request", + req: seriesReq.WithStartEnd(seriesReq.GetStart(), seriesReq.GetEnd().Add(15*time.Minute)), + expectedQueryStart: seriesReq.GetEnd(), + expectedQueryEnd: seriesReq.GetEnd().Add(15 * time.Minute), + downstreamCalls: 1, + downstreamResponse: composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, + }, 1), + expectedReponse: composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, + {{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, + }, 2), + }, + { + // To avoid returning incorrect results, we only use extents that are entirely within the requested query range. + name: "cached response not entirely within the requested range", + req: seriesReq.WithStartEnd(seriesReq.GetStart().Add(15*time.Minute), seriesReq.GetEnd().Add(-15*time.Minute)), + expectedQueryStart: seriesReq.GetStart().Add(15 * time.Minute), + expectedQueryEnd: seriesReq.GetEnd().Add(-15 * time.Minute), + downstreamCalls: 1, + downstreamResponse: composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, + }, 1), + expectedReponse: composeSeriesResp([][]logproto.SeriesIdentifier_LabelsEntry{ + {{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, + }, 1), + }, + } { + t.Run(tc.name, func(t *testing.T) { + cacheMiddleware := setupCacheMW() + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + require.Equal(t, seriesReq.GetStart(), r.GetStart()) + require.Equal(t, seriesReq.GetEnd(), r.GetEnd()) - ctx := user.InjectOrgID(context.Background(), "fake") - got, err := handler.Do(ctx, req1) - require.NoError(t, err) - require.Equal(t, 1, called) - require.Equal(t, resp1, got) + return seriesResp, nil + } - req2 := req1.WithStartEnd(req1.GetStart().Add(15*time.Minute), req1.GetEnd().Add(15*time.Minute)) + handler := cacheMiddleware.Wrap(downstreamHandler) - called = 0 - handler = cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ + ctx := user.InjectOrgID(context.Background(), "fake") + got, err := handler.Do(ctx, seriesReq) + require.NoError(t, err) + require.Equal(t, 1, downstreamHandler.Called()) // calls downstream handler, as not cached. + require.Equal(t, seriesResp, got) - // make downstream request only for the non-overlapping portion of the query. - require.Equal(t, req1.GetEnd(), r.GetStart()) - require.Equal(t, req1.GetEnd().Add(15*time.Minute), r.GetEnd()) + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { + require.Equal(t, tc.expectedQueryStart, r.GetStart()) + require.Equal(t, tc.expectedQueryEnd, r.GetEnd()) - return &LokiSeriesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, - }, - }, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - }, nil - })) + return tc.downstreamResponse, nil + } - got, err = handler.Do(ctx, req2) - require.NoError(t, err) - require.Equal(t, 1, called) - // two splits as we merge the results from the extent and downstream request - resp1.Statistics.Summary.Splits = 2 - require.Equal(t, &LokiSeriesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "dev"}}, - }, - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, - }, - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "us-central"}, {Key: "namespace", Value: "prod"}}, - }, - }, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 2, - }, - }, - }, got) - }) + got, err = handler.Do(ctx, tc.req) + require.NoError(t, err) + require.Equal(t, tc.downstreamCalls, downstreamHandler.Called()) + require.Equal(t, tc.expectedReponse, got) + }) + } t.Run("caches are only valid for the same request parameters", func(t *testing.T) { cacheMiddleware := setupCacheMW() - - from, through := util.RoundToMilliseconds(testTime, testTime.Add(1*time.Hour)) - seriesReq := &LokiSeriesRequest{ - StartTs: from.Time(), - EndTs: through.Time(), - Match: []string{`{namespace=~".*"}`}, - Path: seriesAPIPath, - } - seriesResp := &LokiSeriesResponse{ - Status: "success", - Version: uint32(loghttp.VersionV1), - Data: []logproto.SeriesIdentifier{ - { - Labels: []logproto.SeriesIdentifier_LabelsEntry{{Key: "cluster", Value: "eu-west"}, {Key: "namespace", Value: "prod"}}, - }, - }, - Statistics: stats.Result{ - Summary: stats.Summary{ - Splits: 1, - }, - }, - } - - called := 0 - handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { - called++ - - // should request the entire length as none of the subsequent queries hit the cache. + downstreamHandler.ResetCount() + downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) { require.Equal(t, seriesReq.GetStart(), r.GetStart()) require.Equal(t, seriesReq.GetEnd(), r.GetEnd()) + return seriesResp, nil - })) + } + + handler := cacheMiddleware.Wrap(downstreamHandler) // initial call to fill cache ctx := user.InjectOrgID(context.Background(), "fake") _, err := handler.Do(ctx, seriesReq) require.NoError(t, err) - require.Equal(t, 1, called) + require.Equal(t, 1, downstreamHandler.Called()) type testCase struct { fn func(*LokiSeriesRequest) @@ -296,7 +236,7 @@ func TestSeriesCache(t *testing.T) { } for name, tc := range testCases { - called = 0 + downstreamHandler.ResetCount() seriesReq := seriesReq if tc.fn != nil { @@ -309,7 +249,7 @@ func TestSeriesCache(t *testing.T) { _, err = handler.Do(ctx, seriesReq) require.NoError(t, err) - require.Equal(t, 1, called, name) + require.Equal(t, 1, downstreamHandler.Called(), name) } }) } @@ -490,3 +430,21 @@ func TestSeriesQueryCacheKey(t *testing.T) { }) } } + +type mockDownstreamHandler struct { + called int + fn func(context.Context, queryrangebase.Request) (queryrangebase.Response, error) +} + +func (m *mockDownstreamHandler) Called() int { + return m.called +} + +func (m *mockDownstreamHandler) ResetCount() { + m.called = 0 +} + +func (m *mockDownstreamHandler) Do(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + m.called++ + return m.fn(ctx, req) +}