Skip to content

Commit

Permalink
fix: (Bug) correct resultType when storing instant query results in c…
Browse files Browse the repository at this point in the history
…ache (#12312)

Signed-off-by: Kaviraj <[email protected]>
  • Loading branch information
kavirajk authored Mar 31, 2024
1 parent 246623f commit 7480468
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 28 deletions.
4 changes: 4 additions & 0 deletions cmd/loki/loki-local-with-memcached.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ common:
kvstore:
store: inmemory

limits_config:
split_instant_metric_queries_by_interval: '10m'


query_range:
align_queries_with_step: true
cache_index_stats_results: true
Expand Down
14 changes: 11 additions & 3 deletions pkg/querier/queryrange/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
json "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"

"github.com/grafana/loki/pkg/loghttp"
Expand Down Expand Up @@ -1271,15 +1272,22 @@ func (Codec) MergeResponse(responses ...queryrangebase.Response) (queryrangebase
return nil, errors.New("merging responses requires at least one response")
}
var mergedStats stats.Result
switch responses[0].(type) {
switch res := responses[0].(type) {
// LokiPromResponse type is used for both instant and range queries.
// Meaning, values that are merged can be either vector or matrix types.
case *LokiPromResponse:

codec := queryrangebase.PrometheusCodecForRangeQueries
if res.Response.Data.ResultType == model.ValVector.String() {
codec = queryrangebase.PrometheusCodecForInstantQueries
}

promResponses := make([]queryrangebase.Response, 0, len(responses))
for _, res := range responses {
mergedStats.MergeSplit(res.(*LokiPromResponse).Statistics)
promResponses = append(promResponses, res.(*LokiPromResponse).Response)
}
promRes, err := queryrangebase.PrometheusCodec.MergeResponse(promResponses...)
promRes, err := codec.MergeResponse(promResponses...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1800,7 +1808,7 @@ func NewEmptyResponse(r queryrangebase.Request) (queryrangebase.Response, error)
}
if _, ok := expr.(syntax.SampleExpr); ok {
return &LokiPromResponse{
Response: queryrangebase.NewEmptyPrometheusResponse(),
Response: queryrangebase.NewEmptyPrometheusResponse(model.ValMatrix), // range metric query
}, nil
}
return &LokiResponse{
Expand Down
2 changes: 0 additions & 2 deletions pkg/querier/queryrange/instant_metric_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ func (cfg *InstantMetricCacheConfig) Validate() error {
return cfg.ResultsCacheConfig.Validate()
}

type instantMetricExtractor struct{}

func NewInstantMetricCacheMiddleware(
log log.Logger,
limits Limits,
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/queryrange/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func Test_MaxQueryParallelism(t *testing.T) {
defer count.Dec()
// simulate some work
time.Sleep(20 * time.Millisecond)
return base.NewEmptyPrometheusResponse(), nil
return base.NewEmptyPrometheusResponse(model.ValMatrix), nil
})
ctx := user.InjectOrgID(context.Background(), "foo")

Expand Down Expand Up @@ -271,7 +271,7 @@ func Test_MaxQueryParallelismLateScheduling(t *testing.T) {
h := base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
// simulate some work
time.Sleep(20 * time.Millisecond)
return base.NewEmptyPrometheusResponse(), nil
return base.NewEmptyPrometheusResponse(model.ValMatrix), nil
})
ctx := user.InjectOrgID(context.Background(), "foo")

Expand All @@ -298,7 +298,7 @@ func Test_MaxQueryParallelismDisable(t *testing.T) {
h := base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
// simulate some work
time.Sleep(20 * time.Millisecond)
return base.NewEmptyPrometheusResponse(), nil
return base.NewEmptyPrometheusResponse(model.ValMatrix), nil
})
ctx := user.InjectOrgID(context.Background(), "foo")

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/queryrange/queryrangebase/marshaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func BenchmarkPrometheusCodec_DecodeResponse(b *testing.B) {
b.ReportAllocs()

for n := 0; n < b.N; n++ {
_, err := PrometheusCodec.DecodeResponse(context.Background(), &http.Response{
_, err := PrometheusCodecForRangeQueries.DecodeResponse(context.Background(), &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader(encodedRes)),
ContentLength: int64(len(encodedRes)),
Expand All @@ -51,7 +51,7 @@ func BenchmarkPrometheusCodec_EncodeResponse(b *testing.B) {
b.ReportAllocs()

for n := 0; n < b.N; n++ {
_, err := PrometheusCodec.EncodeResponse(context.Background(), nil, res)
_, err := PrometheusCodecForRangeQueries.EncodeResponse(context.Background(), nil, res)
require.NoError(b, err)
}
}
Expand Down
28 changes: 20 additions & 8 deletions pkg/querier/queryrange/queryrangebase/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,26 @@ var (
errNegativeStep = httpgrpc.Errorf(http.StatusBadRequest, "zero or negative query resolution step widths are not accepted. Try a positive integer")
errStepTooSmall = httpgrpc.Errorf(http.StatusBadRequest, "exceeded maximum resolution of 11,000 points per time series. Try increasing the value of the step parameter")

// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
PrometheusCodec = &prometheusCodec{}
// PrometheusCodecForRangeQueries is a codec to encode and decode Loki range metric query requests and responses.
PrometheusCodecForRangeQueries = &prometheusCodec{
resultType: model.ValMatrix,
}

// PrometheusCodecForInstantQueries is a codec to encode and decode Loki range metric query requests and responses.
PrometheusCodecForInstantQueries = &prometheusCodec{
resultType: model.ValVector,
}

// Name of the cache control header.
cacheControlHeader = "Cache-Control"
)

type prometheusCodec struct{}
type prometheusCodec struct {
// prometheusCodec is used to merge multiple response of either range (matrix) or instant queries(vector).
// when creating empty responses during merge, it need to be aware what kind of valueType it should create with.
// helps other middlewares to filter the correct result type.
resultType model.ValueType
}

// WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp.
func (q *PrometheusRequest) WithStartEnd(start, end time.Time) Request {
Expand Down Expand Up @@ -125,19 +137,19 @@ func (resp *PrometheusResponse) SetHeader(name, value string) {
}

// NewEmptyPrometheusResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusResponse() *PrometheusResponse {
func NewEmptyPrometheusResponse(v model.ValueType) *PrometheusResponse {
return &PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: model.ValMatrix.String(),
ResultType: v.String(),
Result: []SampleStream{},
},
}
}

func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
func (p prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
if len(responses) == 0 {
return NewEmptyPrometheusResponse(), nil
return NewEmptyPrometheusResponse(p.resultType), nil
}

promResponses := make([]*PrometheusResponse, 0, len(responses))
Expand All @@ -155,7 +167,7 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
response := PrometheusResponse{
Status: StatusSuccess,
Data: PrometheusData{
ResultType: model.ValMatrix.String(),
ResultType: p.resultType.String(),
Result: matrixMerge(promResponses),
},
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/queryrange/queryrangebase/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestResponse(t *testing.T) {
Header: http.Header{"Content-Type": []string{"application/json"}},
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))),
}
resp, err := PrometheusCodec.DecodeResponse(context.Background(), response, nil)
resp, err := PrometheusCodecForRangeQueries.DecodeResponse(context.Background(), response, nil)
require.NoError(t, err)
assert.Equal(t, tc.expected, resp)

Expand All @@ -44,7 +44,7 @@ func TestResponse(t *testing.T) {
Body: io.NopCloser(bytes.NewBuffer([]byte(tc.body))),
ContentLength: int64(len(tc.body)),
}
resp2, err := PrometheusCodec.EncodeResponse(context.Background(), nil, resp)
resp2, err := PrometheusCodecForRangeQueries.EncodeResponse(context.Background(), nil, resp)
require.NoError(t, err)
assert.Equal(t, response, resp2)
})
Expand Down Expand Up @@ -262,7 +262,7 @@ func TestMergeAPIResponses(t *testing.T) {
},
}} {
t.Run(tc.name, func(t *testing.T) {
output, err := PrometheusCodec.MergeResponse(tc.input...)
output, err := PrometheusCodecForRangeQueries.MergeResponse(tc.input...)
require.NoError(t, err)
require.Equal(t, tc.expected, output)
})
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/queryrange/queryrangebase/results_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func TestResultsCache(t *testing.T) {
c,
resultscache.ConstSplitter(day),
mockLimits{},
PrometheusCodec,
PrometheusCodecForRangeQueries,
PrometheusResponseExtractor{},
nil,
nil,
Expand Down Expand Up @@ -461,7 +461,7 @@ func TestResultsCacheRecent(t *testing.T) {
c,
resultscache.ConstSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusCodecForRangeQueries,
PrometheusResponseExtractor{},
nil,
nil,
Expand Down Expand Up @@ -572,7 +572,7 @@ func TestResultsCacheShouldCacheFunc(t *testing.T) {
c,
resultscache.ConstSplitter(day),
mockLimits{maxCacheFreshness: 10 * time.Minute},
PrometheusCodec,
PrometheusCodecForRangeQueries,
PrometheusResponseExtractor{},
nil,
tc.shouldCache,
Expand Down
Loading

0 comments on commit 7480468

Please sign in to comment.