From fa0fc274aae814360e00be9a508fc054a289b82e Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 3 Nov 2023 14:00:53 -0600 Subject: [PATCH 1/2] always align volume range timestamp to end of step --- pkg/querier/queryrange/volume.go | 12 ++++-------- pkg/querier/queryrange/volume_test.go | 20 ++++++++++++++++++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pkg/querier/queryrange/volume.go b/pkg/querier/queryrange/volume.go index 305397ff6d6e0..b12fbd48d2459 100644 --- a/pkg/querier/queryrange/volume.go +++ b/pkg/querier/queryrange/volume.go @@ -34,14 +34,10 @@ func NewVolumeMiddleware() queryrangebase.Middleware { interval := time.Duration(volReq.Step * 1e6) util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) { - // Range query buckets are aligned to the starting timestamp - // Instant queries are for "this instant", which aligns to the end of the requested range - bucket := start - if interval == 0 { - bucket = end - } - - reqs[bucket] = &logproto.VolumeRequest{ + // Always align to the end of the requested range + // For range queries, this aligns to the end of the period we're returning a bytes aggregation for + // For instant queries, which are for "this instant", this aligns to the end of the requested range + reqs[end] = &logproto.VolumeRequest{ From: model.TimeFromUnix(start.Unix()), Through: model.TimeFromUnix(end.Unix()), Matchers: volReq.Matchers, diff --git a/pkg/querier/queryrange/volume_test.go b/pkg/querier/queryrange/volume_test.go index 5cbce28ac9e95..8d8b8d48a3f23 100644 --- a/pkg/querier/queryrange/volume_test.go +++ b/pkg/querier/queryrange/volume_test.go @@ -328,4 +328,24 @@ func Test_VolumeMiddleware(t *testing.T) { require.Equal(t, 1, len(promResp.Data.Result)) require.Equal(t, 2, len(promResp.Data.Result[0].Samples)) }) + + t.Run("timestamps are aligned with the end of steps", func(t *testing.T) { + volumeReq := &logproto.VolumeRequest{ + From: 1000000000000, + Through: 1000000005000, // 5s range + Matchers: `{foo="bar"}`, + Limit: seriesvolume.DefaultLimit, + Step: 1000, // 1s + AggregateBy: seriesvolume.Series, + } + promResp := makeVolumeRequest(volumeReq) + + require.Equal(t, int64(1000000000999), + promResp.Data.Result[0].Samples[0].TimestampMs, + "first timestamp should be one millisecond before the end of the first step") + require.Equal(t, + int64(1000000005000), + promResp.Data.Result[0].Samples[4].TimestampMs, + "last timestamp should be equal to the end of the requested query range") + }) } From 42fbe29729edf6fb64ad28e166e5a66fd6db27df Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Fri, 3 Nov 2023 14:51:40 -0600 Subject: [PATCH 2/2] fix test to match new range query ts logic --- pkg/querier/queryrange/roundtrip_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/querier/queryrange/roundtrip_test.go b/pkg/querier/queryrange/roundtrip_test.go index 375b624ae5b85..4c19a1ffc1202 100644 --- a/pkg/querier/queryrange/roundtrip_test.go +++ b/pkg/querier/queryrange/roundtrip_test.go @@ -509,7 +509,7 @@ func TestVolumeTripperware(t *testing.T) { From: model.TimeFromUnixNano(testTime.Add(-25 * time.Hour).UnixNano()), // bigger than split by interval limit Through: model.TimeFromUnixNano(testTime.UnixNano()), Limit: 10, - Step: 0, // Travis/Trevor: this should be ignored and set to 0. Karsten: Why? + Step: 0, // Setting this value to 0 is what makes this an instant query AggregateBy: seriesvolume.DefaultAggregateBy, } @@ -567,7 +567,7 @@ func TestVolumeTripperware(t *testing.T) { Matchers: `{job="varlogs"}`, From: model.TimeFromUnixNano(start.UnixNano()), // bigger than split by interval limit Through: model.TimeFromUnixNano(end.UnixNano()), - Step: time.Hour.Milliseconds(), + Step: time.Hour.Milliseconds(), // a non-zero value makes this a range query Limit: 10, AggregateBy: seriesvolume.DefaultAggregateBy, } @@ -587,10 +587,10 @@ func TestVolumeTripperware(t *testing.T) { require.Equal(t, 6, *count) // 6 queries from splitting into step buckets barBazExpectedSamples := []logproto.LegacySample{} - util.ForInterval(time.Hour, start, end, true, func(s, _ time.Time) { + util.ForInterval(time.Hour, start, end, true, func(_, e time.Time) { barBazExpectedSamples = append(barBazExpectedSamples, logproto.LegacySample{ Value: 3350, - TimestampMs: s.Unix() * 1e3, + TimestampMs: e.UnixMilli(), }) }) sort.Slice(barBazExpectedSamples, func(i, j int) bool { @@ -598,10 +598,10 @@ func TestVolumeTripperware(t *testing.T) { }) fooBarExpectedSamples := []logproto.LegacySample{} - util.ForInterval(time.Hour, start, end, true, func(s, _ time.Time) { + util.ForInterval(time.Hour, start, end, true, func(_, e time.Time) { fooBarExpectedSamples = append(fooBarExpectedSamples, logproto.LegacySample{ Value: 1024, - TimestampMs: s.Unix() * 1e3, + TimestampMs: e.UnixMilli(), }) }) sort.Slice(fooBarExpectedSamples, func(i, j int) bool {