Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: (Bug) correct resultType when storing instant query results in cache #12312

Merged
merged 4 commits into from
Mar 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/loki/loki-local-with-memcached.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ common:
kvstore:
store: inmemory

limits_config:
split_instant_metric_queries_by_interval: '10m'


query_range:
align_queries_with_step: true
cache_index_stats_results: true
Expand Down
14 changes: 11 additions & 3 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
json "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"

"github.com/grafana/loki/pkg/loghttp"
Expand Down Expand Up @@ -1271,15 +1272,22 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
return nil, errors.New("merging responses requires at least one response")
}
var mergedStats stats.Result
switch responses[0].(type) {
switch res := responses[0].(type) {
// LokiPromResponse type is used for both instant and range queries.
// Meaning, values that are merged can be either vector or matrix types.
case *LokiPromResponse:

codec := queryrangebase.PrometheusCodecForRangeQueries
if res.Response.Data.ResultType == model.ValVector.String() {
codec = queryrangebase.PrometheusCodecForInstantQueries
}

promResponses := make([]queryrangebase.Response, 0, len(responses))
for _, res := range responses {
mergedStats.MergeSplit(res.(*LokiPromResponse).Statistics)
promResponses = append(promResponses, res.(*LokiPromResponse).Response)
}
promRes, err := queryrangebase.PrometheusCodec.MergeResponse(promResponses...)
promRes, err := codec.MergeResponse(promResponses...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1800,7 +1808,7 @@ func NewEmptyResponse(r queryrangebase.Request) (queryrangebase.Response, error)
}
if _, ok := expr.(syntax.SampleExpr); ok {
return &LokiPromResponse{
Response: queryrangebase.NewEmptyPrometheusResponse(),
Response: queryrangebase.NewEmptyPrometheusResponse(model.ValMatrix), // range metric query
}, nil
}
return &LokiResponse{
Expand Down
2 changes: 0 additions & 2 deletions pkg/querier/queryrange/instant_metric_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func (cfg *InstantMetricCacheConfig) Validate() error {
return cfg.ResultsCacheConfig.Validate()
}

type instantMetricExtractor struct{}

func NewInstantMetricCacheMiddleware(
log log.Logger,
limits Limits,
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func Test_MaxQueryParallelism(t *testing.T) {
defer count.Dec()
// simulate some work
time.Sleep(20 * time.Millisecond)
return base.NewEmptyPrometheusResponse(), nil
return base.NewEmptyPrometheusResponse(model.ValMatrix), nil
})
ctx := user.InjectOrgID(context.Background(), "foo")

Expand Down Expand Up @@ -271,7 +271,7 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
h := base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
// simulate some work
time.Sleep(20 * time.Millisecond)
return base.NewEmptyPrometheusResponse(), nil
return base.NewEmptyPrometheusResponse(model.ValMatrix), nil
})
ctx := user.InjectOrgID(context.Background(), "foo")

Expand All @@ -298,7 +298,7 @@ func Test_MaxQueryParallelismDisable(t *testing.T) {
h := base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
// simulate some work
time.Sleep(20 * time.Millisecond)
return base.NewEmptyPrometheusResponse(), nil
return base.NewEmptyPrometheusResponse(model.ValMatrix), nil
})
ctx := user.InjectOrgID(context.Background(), "foo")

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/queryrangebase/marshaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func BenchmarkPrometheusCodec_DecodeResponse(b *testing.B) {
b.ReportAllocs()

for n := 0; n < b.N; n++ {
_, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{
_, err := PrometheusCodecForRangeQueries.DecodeResponse(context.Background(), &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader(encodedRes)),
ContentLength: int64(len(encodedRes)),
Expand All @@ -51,7 +51,7 @@ func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) {
b.ReportAllocs()

for n := 0; n < b.N; n++ {
_, err := PrometheusCodec.EncodeResponse(context.Background(), nil, res)
_, err := PrometheusCodecForRangeQueries.EncodeResponse(context.Background(), nil, res)
require.NoError(b, err)
}
}
Expand Down
28 changes: 20 additions & 8 deletions pkg/querier/queryrange/queryrangebase/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,26 @@ var (
errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per time series. Try increasing the value of the step parameter")

// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
PrometheusCodec = &prometheusCodec{}
// PrometheusCodecForRangeQueries is a codec to encode and decode Loki range metric query requests and responses.
PrometheusCodecForRangeQueries = &prometheusCodec{
resultType: model.ValMatrix,
}

// PrometheusCodecForInstantQueries is a codec to encode and decode Loki range metric query requests and responses.
PrometheusCodecForInstantQueries = &prometheusCodec{
resultType: model.ValVector,
}

// Name of the cache control header.
cacheControlHeader = "Cache-Control"
)

type prometheusCodec struct{}
type prometheusCodec struct {
// prometheusCodec is used to merge multiple response of either range (matrix) or instant queries(vector).
// when creating empty responses during merge, it need to be aware what kind of valueType it should create with.
// helps other middlewares to filter the correct result type.
resultType model.ValueType
}

// WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp.
func (q *PrometheusRequest) WithStartEnd(start, end time.Time) Request {
Expand Down Expand Up @@ -125,19 +137,19 @@ func (resp *PrometheusResponse) SetHeader(name, value string) {
}

// NewEmptyPrometheusResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusResponse() *PrometheusResponse {
func NewEmptyPrometheusResponse(v model.ValueType) *PrometheusResponse {
return &PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: model.ValMatrix.String(),
ResultType: v.String(),
Result: []SampleStream{},
},
}
}

func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
func (p prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
if len(responses) == 0 {
return NewEmptyPrometheusResponse(), nil
return NewEmptyPrometheusResponse(p.resultType), nil
}

promResponses := make([]*PrometheusResponse, 0, len(responses))
Expand All @@ -155,7 +167,7 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
response := PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: model.ValMatrix.String(),
ResultType: p.resultType.String(),
Result: matrixMerge(promResponses),
},
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/queryrange/queryrangebase/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestResponse(t *testing.T) {
Header: http.Header{"Content-Type": []string{"application/json"}},
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))),
}
resp, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil)
resp, err := PrometheusCodecForRangeQueries.DecodeResponse(context.Background(), response, nil)
require.NoError(t, err)
assert.Equal(t, tc.expected, resp)

Expand All @@ -44,7 +44,7 @@ func TestResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))),
ContentLength: int64(len(tc.body)),
}
resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp)
resp2, err := PrometheusCodecForRangeQueries.EncodeResponse(context.Background(), nil, resp)
require.NoError(t, err)
assert.Equal(t, response, resp2)
})
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestMergeAPIResponses(t *testing.T) {
},
}} {
t.Run(tc.name, func(t *testing.T) {
output, err := PrometheusCodec.MergeResponse(tc.input...)
output, err := PrometheusCodecForRangeQueries.MergeResponse(tc.input...)
require.NoError(t, err)
require.Equal(t, tc.expected, output)
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/queryrange/queryrangebase/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestResultsCache(t *testing.T) {
c,
resultscache.ConstSplitter(day),
mockLimits{},
PrometheusCodec,
PrometheusCodecForRangeQueries,
PrometheusResponseExtractor{},
nil,
nil,
Expand Down Expand Up @@ -461,7 +461,7 @@ func TestResultsCacheRecent(t *testing.T) {
c,
resultscache.ConstSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusCodecForRangeQueries,
PrometheusResponseExtractor{},
nil,
nil,
Expand Down Expand Up @@ -572,7 +572,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
c,
resultscache.ConstSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusCodecForRangeQueries,
PrometheusResponseExtractor{},
nil,
tc.shouldCache,
Expand Down
Loading
Loading