Skip to content

Commit

Permalink
always align volume range timestamp to end of step (#11136)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
We had originally assumed that it would be weird for a user to get back
a timeseries response from `volume_range` that didn't include a
datapoint with a timestamp at the start of their requested range. This
was a bad assumption. The `volume_range` endpoint is inherently doing
aggregations. For each step, a datapoint is calculated representing the
percent of chunk volumes that the selector appeared in. It therefore
stands to reason that the steps volume was not that value until the end
of the step time range, as we are to assume volume is monotonically
increasing. The fact that we were using the start of the range made it
difficult to turn this data into a per-second rate, which this PR aims
to fix.

**Which issue(s) this PR fixes**:
Fixes #11134
  • Loading branch information
trevorwhitney authored Nov 8, 2023
1 parent fa378ac commit b1f9be5
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
12 changes: 6 additions & 6 deletions pkg/querier/queryrange/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func TestVolumeTripperware(t *testing.T) {
From: model.TimeFromUnixNano(testTime.Add(-25 * time.Hour).UnixNano()), // bigger than split by interval limit
Through: model.TimeFromUnixNano(testTime.UnixNano()),
Limit: 10,
Step: 0, // Travis/Trevor: this should be ignored and set to 0. Karsten: Why?
Step: 0, // Setting this value to 0 is what makes this an instant query
AggregateBy: seriesvolume.DefaultAggregateBy,
}

Expand Down Expand Up @@ -567,7 +567,7 @@ func TestVolumeTripperware(t *testing.T) {
Matchers: `{job="varlogs"}`,
From: model.TimeFromUnixNano(start.UnixNano()), // bigger than split by interval limit
Through: model.TimeFromUnixNano(end.UnixNano()),
Step: time.Hour.Milliseconds(),
Step: time.Hour.Milliseconds(), // a non-zero value makes this a range query
Limit: 10,
AggregateBy: seriesvolume.DefaultAggregateBy,
}
Expand All @@ -587,21 +587,21 @@ func TestVolumeTripperware(t *testing.T) {
require.Equal(t, 6, *count) // 6 queries from splitting into step buckets

barBazExpectedSamples := []logproto.LegacySample{}
util.ForInterval(time.Hour, start, end, true, func(s, _ time.Time) {
util.ForInterval(time.Hour, start, end, true, func(_, e time.Time) {
barBazExpectedSamples = append(barBazExpectedSamples, logproto.LegacySample{
Value: 3350,
TimestampMs: s.Unix() * 1e3,
TimestampMs: e.UnixMilli(),
})
})
sort.Slice(barBazExpectedSamples, func(i, j int) bool {
return barBazExpectedSamples[i].TimestampMs < barBazExpectedSamples[j].TimestampMs
})

fooBarExpectedSamples := []logproto.LegacySample{}
util.ForInterval(time.Hour, start, end, true, func(s, _ time.Time) {
util.ForInterval(time.Hour, start, end, true, func(_, e time.Time) {
fooBarExpectedSamples = append(fooBarExpectedSamples, logproto.LegacySample{
Value: 1024,
TimestampMs: s.Unix() * 1e3,
TimestampMs: e.UnixMilli(),
})
})
sort.Slice(fooBarExpectedSamples, func(i, j int) bool {
Expand Down
12 changes: 4 additions & 8 deletions pkg/querier/queryrange/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,10 @@ func NewVolumeMiddleware() queryrangebase.Middleware {
interval := time.Duration(volReq.Step * 1e6)

util.ForInterval(interval, startTS, endTS, true, func(start, end time.Time) {
// Range query buckets are aligned to the starting timestamp
// Instant queries are for "this instant", which aligns to the end of the requested range
bucket := start
if interval == 0 {
bucket = end
}

reqs[bucket] = &logproto.VolumeRequest{
// Always align to the end of the requested range
// For range queries, this aligns to the end of the period we're returning a bytes aggregation for
// For instant queries, which are for "this instant", this aligns to the end of the requested range
reqs[end] = &logproto.VolumeRequest{
From: model.TimeFromUnix(start.Unix()),
Through: model.TimeFromUnix(end.Unix()),
Matchers: volReq.Matchers,
Expand Down
20 changes: 20 additions & 0 deletions pkg/querier/queryrange/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,4 +328,24 @@ func Test_VolumeMiddleware(t *testing.T) {
require.Equal(t, 1, len(promResp.Data.Result))
require.Equal(t, 2, len(promResp.Data.Result[0].Samples))
})

t.Run("timestamps are aligned with the end of steps", func(t *testing.T) {
volumeReq := &logproto.VolumeRequest{
From: 1000000000000,
Through: 1000000005000, // 5s range
Matchers: `{foo="bar"}`,
Limit: seriesvolume.DefaultLimit,
Step: 1000, // 1s
AggregateBy: seriesvolume.Series,
}
promResp := makeVolumeRequest(volumeReq)

require.Equal(t, int64(1000000000999),
promResp.Data.Result[0].Samples[0].TimestampMs,
"first timestamp should be one millisecond before the end of the first step")
require.Equal(t,
int64(1000000005000),
promResp.Data.Result[0].Samples[4].TimestampMs,
"last timestamp should be equal to the end of the requested query range")
})
}

0 comments on commit b1f9be5

Please sign in to comment.