Skip to content

Commit

Permalink
fix and refactor label/series cache test
Browse files Browse the repository at this point in the history
  • Loading branch information
ashwanthgoli committed Feb 9, 2024
1 parent 79f6a11 commit 77264f9
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 313 deletions.
251 changes: 101 additions & 150 deletions pkg/querier/queryrange/labels_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,173 +81,124 @@ func TestLabelsCache(t *testing.T) {
return cacheMiddleware
}

cacheMiddleware := setupCacheMW()
for _, values := range []bool{false, true} {
prefix := "labels"
if values {
prefix = "label values"
}
t.Run(prefix+": cache the response for the same request", func(t *testing.T) {
start := testTime.Truncate(time.Millisecond)
end := start.Add(time.Hour)

labelsReq := LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &start,
End: &end,
},
}

if values {
labelsReq.Values = true
labelsReq.Name = "foo"
labelsReq.Query = `{cluster="eu-west1"}`
}

labelsResp := &LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: []string{"bar", "buzz"},
Statistics: stats.Result{
Summary: stats.Summary{
Splits: 1,
},
composeLabelsResp := func(lbls []string, splits int64) *LokiLabelNamesResponse {
return &LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: lbls,
Statistics: stats.Result{
Summary: stats.Summary{
Splits: splits,
},
}

called := 0
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
called++

// should request the entire length with no partitioning as nothing is cached yet.
require.Equal(t, labelsReq.GetStart(), r.GetStart())
require.Equal(t, labelsReq.GetEnd(), r.GetEnd())

got := r.(*LabelRequest)
require.Equal(t, labelsReq.GetName(), got.GetName())
require.Equal(t, labelsReq.GetValues(), got.GetValues())
require.Equal(t, labelsReq.GetQuery(), got.GetQuery())

return labelsResp, nil
}))
},
}

ctx := user.InjectOrgID(context.Background(), "fake")
got, err := handler.Do(ctx, &labelsReq)
require.NoError(t, err)
require.Equal(t, 1, called) // called actual handler, as not cached.
require.Equal(t, labelsResp, got)
}

// Doing same request again shouldn't change anything.
called = 0
got, err = handler.Do(ctx, &labelsReq)
require.NoError(t, err)
require.Equal(t, 0, called)
require.Equal(t, labelsResp, got)
})
start := testTime.Truncate(time.Millisecond)
end := start.Add(time.Hour)
labelsReq := &LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &start,
End: &end,
},
}
labelsResp := composeLabelsResp([]string{"bar", "buzz"}, 1)

var downstreamHandlerFunc func(context.Context, queryrangebase.Request) (queryrangebase.Response, error)
downstreamHandler := &mockDownstreamHandler{fn: func(ctx context.Context, req queryrangebase.Request) (queryrangebase.Response, error) {
return downstreamHandlerFunc(ctx, req)
}}

// reset cacheMiddleware
cacheMiddleware = setupCacheMW()
for _, values := range []bool{false, true} {
labelsReq := labelsReq
prefix := "labels"

if values {
prefix = "label values"
prefix = "label values: "
labelsReq.Values = true
labelsReq.Name = "foo"
labelsReq.Query = `{cluster="eu-west1"}`
}
t.Run(prefix+": a new request with overlapping time range should reuse part of the previous request for the overlap", func(t *testing.T) {
cacheMiddleware := setupCacheMW()

start := testTime.Truncate(time.Millisecond)
end := start.Add(time.Hour)

labelsReq1 := LabelRequest{
LabelRequest: logproto.LabelRequest{
Start: &start,
End: &end,
},
}

if values {
labelsReq1.Values = true
labelsReq1.Name = "foo"
labelsReq1.Query = `{cluster="eu-west1"}`
}

labelsResp1 := &LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: []string{"bar", "buzz"},
Statistics: stats.Result{
Summary: stats.Summary{
Splits: 1,
},
},
}

called := 0
handler := cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
called++

// should request the entire length with no partitioning as nothing is cached yet.
require.Equal(t, labelsReq1.GetStart(), r.GetStart())
require.Equal(t, labelsReq1.GetEnd(), r.GetEnd())
for _, tc := range []struct {
name string
req queryrangebase.Request
expectedQueryStart, expectedQueryEnd time.Time
downstreamResponse *LokiLabelNamesResponse
downstreamCalls int
expectedReponse *LokiLabelNamesResponse
}{
{
name: "return cached response for the same request",
downstreamCalls: 0,
expectedReponse: labelsResp,
req: labelsReq,
},
{
name: "a new request with overlapping time range should reuse results of the previous request",
req: labelsReq.WithStartEnd(labelsReq.GetStart(), labelsReq.GetEnd().Add(15*time.Minute)),
expectedQueryStart: labelsReq.GetEnd(),
expectedQueryEnd: labelsReq.GetEnd().Add(15 * time.Minute),
downstreamCalls: 1,
downstreamResponse: composeLabelsResp([]string{"fizz"}, 1),
expectedReponse: composeLabelsResp([]string{"bar", "buzz", "fizz"}, 2),
},
{
// To avoid returning incorrect results, we only use extents that are entirely within the requested query range.
name: "cached response not entirely within the requested range",
req: labelsReq.WithStartEnd(labelsReq.GetStart().Add(15*time.Minute), labelsReq.GetEnd().Add(-15*time.Minute)),
expectedQueryStart: labelsReq.GetStart().Add(15 * time.Minute),
expectedQueryEnd: labelsReq.GetEnd().Add(-15 * time.Minute),
downstreamCalls: 1,
downstreamResponse: composeLabelsResp([]string{"buzz", "fizz"}, 1),
expectedReponse: composeLabelsResp([]string{"buzz", "fizz"}, 1),
},
} {
t.Run(prefix+tc.name, func(t *testing.T) {
cacheMiddleware := setupCacheMW()
downstreamHandler.ResetCount()
downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
// should request the entire length with no partitioning as nothing is cached yet.
require.Equal(t, labelsReq.GetStart(), r.GetStart())
require.Equal(t, labelsReq.GetEnd(), r.GetEnd())

got := r.(*LabelRequest)
require.Equal(t, labelsReq.GetName(), got.GetName())
require.Equal(t, labelsReq.GetValues(), got.GetValues())
require.Equal(t, labelsReq.GetQuery(), got.GetQuery())

return labelsResp, nil
}

got := r.(*LabelRequest)
require.Equal(t, labelsReq1.GetName(), got.GetName())
require.Equal(t, labelsReq1.GetValues(), got.GetValues())
require.Equal(t, labelsReq1.GetQuery(), got.GetQuery())
handler := cacheMiddleware.Wrap(downstreamHandler)

return labelsResp1, nil
}))
ctx := user.InjectOrgID(context.Background(), "fake")
got, err := handler.Do(ctx, labelsReq)
require.NoError(t, err)
require.Equal(t, 1, downstreamHandler.Called()) // call downstream handler, as not cached.
require.Equal(t, labelsResp, got)

ctx := user.InjectOrgID(context.Background(), "fake")
got, err := handler.Do(ctx, &labelsReq1)
require.NoError(t, err)
require.Equal(t, 1, called)
require.Equal(t, labelsResp1, got)
downstreamHandler.ResetCount()
downstreamHandlerFunc = func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
require.Equal(t, tc.expectedQueryStart, r.GetStart())
require.Equal(t, tc.expectedQueryEnd, r.GetEnd())

labelsReq2 := labelsReq1.WithStartEnd(labelsReq1.GetStart().Add(15*time.Minute), labelsReq1.GetEnd().Add(15*time.Minute))
got := r.(*LabelRequest)
require.Equal(t, labelsReq.GetName(), got.GetName())
require.Equal(t, labelsReq.GetValues(), got.GetValues())
require.Equal(t, labelsReq.GetQuery(), got.GetQuery())

called = 0
handler = cacheMiddleware.Wrap(queryrangebase.HandlerFunc(func(_ context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
called++
return tc.downstreamResponse, nil
}

// make downstream request only for the non-overlapping portion of the query.
require.Equal(t, labelsReq1.GetEnd(), r.GetStart())
require.Equal(t, labelsReq2.GetEnd(), r.GetEnd())

got := r.(*LabelRequest)
require.Equal(t, labelsReq1.GetName(), got.GetName())
require.Equal(t, labelsReq1.GetValues(), got.GetValues())
require.Equal(t, labelsReq1.GetQuery(), got.GetQuery())

return &LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: []string{"fizz"},
Statistics: stats.Result{
Summary: stats.Summary{
Splits: 1,
},
},
}, nil
}))
got, err = handler.Do(ctx, tc.req)
require.NoError(t, err)
require.Equal(t, tc.downstreamCalls, downstreamHandler.Called())
require.Equal(t, tc.expectedReponse, got)

got, err = handler.Do(ctx, labelsReq2)
require.NoError(t, err)
require.Equal(t, 1, called)
// two splits as we merge the results from the extent and downstream request
labelsResp1.Statistics.Summary.Splits = 2
require.Equal(t, &LokiLabelNamesResponse{
Status: "success",
Version: uint32(loghttp.VersionV1),
Data: []string{"bar", "buzz", "fizz"},
Statistics: stats.Result{
Summary: stats.Summary{
Splits: 2,
},
},
}, got)
})
})
}
}
}

Expand Down
Loading

0 comments on commit 77264f9

Please sign in to comment.