Skip to content

Commit

Permalink
support split and align of instant subquery for cache reuse
Browse files Browse the repository at this point in the history
Fix test cases that failed with this changes

Signed-off-by: Kaviraj <[email protected]>
  • Loading branch information
kavirajk committed Feb 12, 2024
1 parent 634d7f8 commit d538f67
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 80 deletions.
78 changes: 78 additions & 0 deletions pkg/logql/rangemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ type RangeMapper struct {
splitByInterval time.Duration
metrics *MapperMetrics
stats *MapperStats

// SplitAlign
splitAlignTs time.Time
}

func NewRangeMapperWithSplitAlign(interval time.Duration, splitAlign time.Time, metrics *MapperMetrics, stats *MapperStats) (RangeMapper, error) {
rm, err := NewRangeMapper(interval, metrics, stats)
if err != nil {
return RangeMapper{}, err
}
rm.splitAlignTs = splitAlign

return rm, nil
}

// NewRangeMapper creates a new RangeMapper instance with the given duration as
Expand Down Expand Up @@ -327,6 +340,71 @@ func (m RangeMapper) getOriginalOffset(expr syntax.SampleExpr) (offset time.Dura
// rangeInterval should be greater than m.splitByInterval, otherwise the resultant expression
// will have an unnecessary aggregation operation
func (m RangeMapper) mapConcatSampleExpr(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
if m.splitAlignTs.IsZero() {
return m.rangeSplit(expr, rangeInterval, recorder)
}
return m.rangeSplitAlign(expr, rangeInterval, recorder)
}

// rangeSplitAlign try to split given `rangeInterval` into units of `m.splitByInterval` by making sure `rangeInterval` is aligned with `m.splitByInterval` for as much as the units as possible.
// Consider following example with real use case.
// Instant Query: `sum(rate({foo="bar"}[3h])`
// execTs: 12:34:00
// splitBy: 1h
// Given above parameters, queries will be split into following
// 1. sum(rate({foo="bar"}[34m]))
// 2. sum(rate({foo="bar"}[1h] offset 34m))
// 3. sum(rate({foo="bar"}[1h] offset 1h34m))
// 4. sum(rate({foo="bar"}[26m] offset 2h34m))
// All with execTs: 12:34:00. Here (2) and (3) range values (1h) aligned with splitByInteval (1h).
func (m RangeMapper) rangeSplitAlign(
expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder,
) syntax.SampleExpr {
if rangeInterval <= m.splitByInterval {
return expr
}

originalOffset, err := m.getOriginalOffset(expr)
if err != nil {
return expr
}

splits := int(math.Ceil(float64(rangeInterval) / float64(m.splitByInterval))) // without first aligned subquery
align := m.splitAlignTs.Sub(m.splitAlignTs.Truncate(m.splitByInterval)) // say, 12:34:00 - 12:00:00(truncated) = 34m
if align != 0 {
splits += 1
}

var (
newRng time.Duration = align
newOffset time.Duration = originalOffset
downstreams *ConcatSampleExpr
)

// first subquery
downstreams = appendDownstream(downstreams, expr, newRng, newOffset)

newOffset += align // e.g: offset 34m
newRng = m.splitByInterval // [1h]

// Rest of the subqueries. Always aligned with splitBy
for i := 0; i < splits-2; i++ {
downstreams = appendDownstream(downstreams, expr, newRng, newOffset)
newOffset += m.splitByInterval
}

// last subquery
newRng = m.splitByInterval - align // e.g: [24h]
downstreams = appendDownstream(downstreams, expr, newRng, newOffset)

// update stats and metrics
m.stats.AddSplitQueries(splits)
recorder.Add(splits, MetricsKey)

return downstreams
}

func (m RangeMapper) rangeSplit(expr syntax.SampleExpr, rangeInterval time.Duration, recorder *downstreamRecorder) syntax.SampleExpr {
splitCount := int(math.Ceil(float64(rangeInterval) / float64(m.splitByInterval)))
if splitCount <= 1 {
return expr
Expand Down
1 change: 0 additions & 1 deletion pkg/loki/config_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,6 @@ func applyEmbeddedCacheConfig(r *ConfigWrapper) {
instantMetricCacheConfig := r.QueryRange.InstantMetricCacheConfig.CacheConfig
if !cache.IsCacheConfigured(instantMetricCacheConfig) {
prefix := instantMetricCacheConfig.Prefix
// We use the same config as the query range results cache.
r.QueryRange.InstantMetricCacheConfig.CacheConfig = r.QueryRange.ResultsCacheConfig.CacheConfig
r.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix = prefix
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/loki/config_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,49 @@ query_range:
})
})

t.Run("for the instant-metric results cache config", func(t *testing.T) {
t.Run("no embedded cache enabled by default if Redis is set", func(t *testing.T) {
configFileString := `---
query_range:
instant_metric_results_cache:
cache:
redis:
endpoint: endpoint.redis.org`

config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.EqualValues(t, "endpoint.redis.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Redis.Endpoint)
assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix)
assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled)
})

t.Run("no embedded cache enabled by default if Memcache is set", func(t *testing.T) {
configFileString := `---
query_range:
instant_metric_results_cache:
cache:
memcached_client:
host: memcached.host.org`

config, _, _ := configWrapperFromYAML(t, configFileString, nil)
assert.EqualValues(t, "memcached.host.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.MemcacheClient.Host)
assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix)
assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled)
})

t.Run("embedded cache is enabled by default if no other cache is set", func(t *testing.T) {
config, _, _ := configWrapperFromYAML(t, minimalConfig, nil)
assert.True(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled)
assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix)
})

t.Run("gets results cache config if not configured directly", func(t *testing.T) {
config, _, _ := configWrapperFromYAML(t, defaultResulsCacheString, nil)
assert.EqualValues(t, "memcached.host.org", config.QueryRange.InstantMetricCacheConfig.CacheConfig.MemcacheClient.Host)
assert.EqualValues(t, "frontend.instant-metric-results-cache.", config.QueryRange.InstantMetricCacheConfig.CacheConfig.Prefix)
assert.False(t, config.QueryRange.InstantMetricCacheConfig.CacheConfig.EmbeddedCache.Enabled)
})
})

t.Run("for the labels results cache config", func(t *testing.T) {
t.Run("no embedded cache enabled by default if Redis is set", func(t *testing.T) {
configFileString := `---
Expand Down
110 changes: 64 additions & 46 deletions pkg/querier/queryrange/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,10 +427,12 @@ func Test_codec_DecodeResponse(t *testing.T) {
func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
// test fixtures from pkg/util/marshal_test
var queryTests = []struct {
name string
actual parser.Value
expected string
}{
{
"basic",
logqlmodel.Streams{
logproto.Stream{
Entries: []logproto.Entry{
Expand Down Expand Up @@ -462,6 +464,7 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
},
// vector test
{
"vector",
promql.Vector{
{
T: 1568404331324,
Expand Down Expand Up @@ -524,6 +527,7 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
},
// matrix test
{
"matrix",
promql.Matrix{
{
Floats: []promql.FPoint{
Expand Down Expand Up @@ -607,50 +611,53 @@ func Test_codec_DecodeProtobufResponseParity(t *testing.T) {
}
codec := RequestProtobufCodec{}
for i, queryTest := range queryTests {
params := url.Values{
"query": []string{`{app="foo"}`},
}
u := &url.URL{
Path: "/loki/api/v1/query_range",
RawQuery: params.Encode(),
}
httpReq := &http.Request{
Method: "GET",
RequestURI: u.String(),
URL: u,
}
req, err := codec.DecodeRequest(context.TODO(), httpReq, nil)
require.NoError(t, err)
i := i
t.Run(queryTest.name, func(t *testing.T) {
params := url.Values{
"query": []string{`{app="foo"}`},
}
u := &url.URL{
Path: "/loki/api/v1/query_range",
RawQuery: params.Encode(),
}
httpReq := &http.Request{
Method: "GET",
RequestURI: u.String(),
URL: u,
}
req, err := codec.DecodeRequest(context.TODO(), httpReq, nil)
require.NoError(t, err)

// parser.Value -> queryrange.QueryResponse
var b bytes.Buffer
result := logqlmodel.Result{
Data: queryTest.actual,
Statistics: statsResult,
}
err = WriteQueryResponseProtobuf(&logql.LiteralParams{}, result, &b)
require.NoError(t, err)
// parser.Value -> queryrange.QueryResponse
var b bytes.Buffer
result := logqlmodel.Result{
Data: queryTest.actual,
Statistics: statsResult,
}
err = WriteQueryResponseProtobuf(&logql.LiteralParams{}, result, &b)
require.NoError(t, err)

// queryrange.QueryResponse -> queryrangebase.Response
querierResp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(&b),
Header: http.Header{
"Content-Type": []string{ProtobufType},
},
}
resp, err := codec.DecodeResponse(context.TODO(), querierResp, req)
require.NoError(t, err)
// queryrange.QueryResponse -> queryrangebase.Response
querierResp := &http.Response{
StatusCode: 200,
Body: io.NopCloser(&b),
Header: http.Header{
"Content-Type": []string{ProtobufType},
},
}
resp, err := codec.DecodeResponse(context.TODO(), querierResp, req)
require.NoError(t, err)

// queryrange.Response -> JSON
ctx := user.InjectOrgID(context.Background(), "1")
httpResp, err := codec.EncodeResponse(ctx, httpReq, resp)
require.NoError(t, err)
// queryrange.Response -> JSON
ctx := user.InjectOrgID(context.Background(), "1")
httpResp, err := codec.EncodeResponse(ctx, httpReq, resp)
require.NoError(t, err)

body, _ := io.ReadAll(httpResp.Body)
require.JSONEqf(t, queryTest.expected, string(body), "Protobuf Decode Query Test %d failed", i)
body, err := io.ReadAll(httpResp.Body)
require.NoError(t, err)
require.JSONEqf(t, queryTest.expected, string(body), "Protobuf Decode Query Test %d failed", i)
})
}

}

func Test_codec_EncodeRequest(t *testing.T) {
Expand Down Expand Up @@ -1645,6 +1652,16 @@ var (
"downloadTime": 0,
"queryLengthServed": 0
},
"instantMetricResult": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0,
"queryLengthServed": 0
},
"result": {
"entriesFound": 0,
"entriesRequested": 0,
Expand Down Expand Up @@ -2027,13 +2044,14 @@ var (
},

Caches: stats.Caches{
Chunk: stats.Cache{},
Index: stats.Cache{},
StatsResult: stats.Cache{},
VolumeResult: stats.Cache{},
SeriesResult: stats.Cache{},
LabelResult: stats.Cache{},
Result: stats.Cache{},
Chunk: stats.Cache{},
Index: stats.Cache{},
StatsResult: stats.Cache{},
VolumeResult: stats.Cache{},
SeriesResult: stats.Cache{},
LabelResult: stats.Cache{},
Result: stats.Cache{},
InstantMetricResult: stats.Cache{},
},
}
)
Expand Down
15 changes: 12 additions & 3 deletions pkg/querier/queryrange/downstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
type DownstreamHandler struct {
limits Limits
next queryrangebase.Handler

splitAlign bool
}

func ParamsToLokiRequest(params logql.Params) queryrangebase.Request {
Expand Down Expand Up @@ -94,6 +96,7 @@ func (h DownstreamHandler) Downstreamer(ctx context.Context) logql.Downstreamer
parallelism: p,
locks: locks,
handler: h.next,
splitAlign: h.splitAlign,
}
}

Expand All @@ -102,6 +105,8 @@ type instance struct {
parallelism int
locks chan struct{}
handler queryrangebase.Handler

splitAlign bool
}

// withoutOffset returns the given query string with offsets removed and timestamp adjusted accordingly. If no offset is present in original query, it will be returned as is.
Expand Down Expand Up @@ -132,9 +137,13 @@ func withoutOffset(query logql.DownstreamQuery) (string, time.Time, time.Time) {

func (in instance) Downstream(ctx context.Context, queries []logql.DownstreamQuery) ([]logqlmodel.Result, error) {
return in.For(ctx, queries, func(qry logql.DownstreamQuery) (logqlmodel.Result, error) {
qs, newStart, newEnd := withoutOffset(qry)

req := ParamsToLokiRequest(qry.Params).WithQuery(qs).WithStartEnd(newStart, newEnd)
var req queryrangebase.Request
if in.splitAlign {
qs, newStart, newEnd := withoutOffset(qry)
req = ParamsToLokiRequest(qry.Params).WithQuery(qs).WithStartEnd(newStart, newEnd)
} else {
req = ParamsToLokiRequest(qry.Params).WithQuery(qry.Params.GetExpression().String())
}
sp, ctx := opentracing.StartSpanFromContext(ctx, "DownstreamHandler.instance")
defer sp.Finish()
logger := spanlogger.FromContext(ctx)
Expand Down
10 changes: 10 additions & 0 deletions pkg/querier/queryrange/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ var emptyStats = `"stats": {
"downloadTime": 0,
"queryLengthServed": 0
},
"instantMetricResult": {
"entriesFound": 0,
"entriesRequested": 0,
"entriesStored": 0,
"bytesReceived": 0,
"bytesSent": 0,
"requests": 0,
"downloadTime": 0,
"queryLengthServed": 0
},
"result": {
"entriesFound": 0,
"entriesRequested": 0,
Expand Down
Loading

0 comments on commit d538f67

Please sign in to comment.