diff --git a/pkg/querier/queryrange/downstreamer.go b/pkg/querier/queryrange/downstreamer.go index 051471576d582..89d58b0632b2c 100644 --- a/pkg/querier/queryrange/downstreamer.go +++ b/pkg/querier/queryrange/downstreamer.go @@ -114,12 +114,15 @@ func withoutOffset(query logql.DownstreamQuery) (string, time.Time, time.Time) { switch rng := e.(type) { case *syntax.RangeAggregationExpr: off := rng.Left.Offset - rng.Left.Offset = 0 // remove offset - newEnd = query.Params.End().Add(-off) + newEnd = query.Params.End() newStart = query.Params.Start() - if query.Params.Start() == query.Params.End() { // instant query - newStart = newStart.Add(-off) + if off != 0 { + rng.Left.Offset = 0 // remove offset + newEnd.Add(-off) + if query.Params.Start() == query.Params.End() { // instant query + newStart = newStart.Add(-off) + } } } }) diff --git a/pkg/querier/queryrange/downstreamer_test.go b/pkg/querier/queryrange/downstreamer_test.go index 007166c30c305..2aa9ba211fef5 100644 --- a/pkg/querier/queryrange/downstreamer_test.go +++ b/pkg/querier/queryrange/downstreamer_test.go @@ -14,6 +14,7 @@ import ( "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" @@ -390,6 +391,67 @@ func TestInstanceDownstream(t *testing.T) { require.Nil(t, err) require.Equal(t, 1, len(results)) require.Equal(t, expected.Data, results[0].Data) + + t.Run("Downstream with offset removed", func(t *testing.T) { + params, err := logql.NewLiteralParams( + `sum(rate({foo="bar"}[2h] offset 1h))`, + time.Now(), + time.Now(), + 0, + 0, + logproto.BACKWARD, + 1000, + nil, + ) + require.NoError(t, err) + + expectedResp := func() *LokiResponse { + return &LokiResponse{ + Data: LokiData{ + Result: []logproto.Stream{{ + Labels: `{foo="bar"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 0), Line: "foo"}, + }, + }}, + }, + Statistics: stats.Result{ + Summary: stats.Summary{QueueTime: 1, ExecTime: 2}, + }, + } + } + + queries := []logql.DownstreamQuery{ + { + Params: params, + }, + } + + var got queryrangebase.Request + var want queryrangebase.Request + handler := queryrangebase.HandlerFunc( + func(_ context.Context, req queryrangebase.Request) (queryrangebase.Response, error) { + // for some reason these seemingly can't be checked in their own goroutines, + // so we assign them to scoped variables for later comparison. + got = req + want = ParamsToLokiRequest(params).WithQuery(`sum(rate({foo="bar"}[2h]))`) // without offset + + return expectedResp(), nil + }, + ) + + results, err := DownstreamHandler{ + limits: fakeLimits{}, + next: handler, + }.Downstreamer(context.Background()).Downstream(context.Background(), queries) + + assert.Equal(t, got, want) + + require.Nil(t, err) + require.Equal(t, 1, len(results)) + require.Equal(t, expected.Data, results[0].Data) + + }) } func TestCancelWhileWaitingResponse(t *testing.T) {