Skip to content

Commit

Permalink
Fix reset handling by checking against previous value instead of rese…
Browse files Browse the repository at this point in the history
…t value. (#263)
  • Loading branch information
jkschulz authored Jan 12, 2021
1 parent 1361301 commit 799f9c5
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 5 deletions.
19 changes: 17 additions & 2 deletions retrieval/series_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,21 @@ type seriesCacheEntry struct {
suffix string
hash uint64

hasReset bool
// Whether the series has been reset/initialized yet. This is false only for
// the first sample of a new series in the cache, which causes the initial
// "reset". After that, it is always true.
hasReset bool

// The value and timestamp of the latest reset. The timestamp is when it
// occurred, and the value is what it was reset to. resetValue will initially
// be the value of the first sample, and then 0 for every subsequent reset.
resetValue float64
resetTimestamp int64

// Value of the most recent point seen for the time series. If a new value is
// less than the previous, then the series has reset.
previousValue float64

// maxSegment indicates the maximum WAL segment index in which
// the series was first logged.
// By providing it as an upper bound, we can safely delete a series entry
Expand Down Expand Up @@ -295,19 +307,22 @@ func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, f
if !hasReset {
e.resetTimestamp = t
e.resetValue = v
e.previousValue = v
// If we just initialized the reset timestamp, this sample should be skipped.
// We don't know the window over which the current cumulative value was built up over.
// The next sample for will be considered from this point onwards.
return 0, 0, false
}
if v < e.resetValue {
if v < e.previousValue {
// If the value has dropped, there's been a reset.
// If the series was reset, set the reset timestamp to be one millisecond
// before the timestamp of the current sample.
// We don't know the true reset time but this ensures the range is non-zero
// while unlikely to conflict with any previous sample.
e.resetValue = 0
e.resetTimestamp = t - 1
}
e.previousValue = v
return e.resetTimestamp, v - e.resetValue, true
}

Expand Down
164 changes: 161 additions & 3 deletions retrieval/transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestSampleBuilder(t *testing.T) {
{Ref: 2, T: 2000, V: 5.5},
{Ref: 2, T: 3000, V: 8},
{Ref: 2, T: 4000, V: 9},
{Ref: 2, T: 5000, V: 3},
{Ref: 2, T: 5000, V: 7},
{Ref: 1, T: 1000, V: 200},
{Ref: 3, T: 3000, V: 1},
{Ref: 4, T: 4000, V: 2},
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestSampleBuilder(t *testing.T) {
},
}},
},
{ // 3
{ // 3: Reset series since sample's value is less than previous value.
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
Expand All @@ -201,7 +201,7 @@ func TestSampleBuilder(t *testing.T) {
EndTime: &timestamp_pb.Timestamp{Seconds: 5},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_DoubleValue{3},
Value: &monitoring_pb.TypedValue_DoubleValue{7},
},
}},
},
Expand Down Expand Up @@ -973,6 +973,164 @@ func TestSampleBuilder(t *testing.T) {
},
},
},
// Samples resulting in multiple resets for a single time series.
{
targets: targetMap{
"job1/instance1": &targets.Target{
Labels: promlabels.FromStrings("job", "job1", "instance", "instance1"),
DiscoveredLabels: promlabels.FromStrings("__resource_a", "resource2_a"),
},
},
series: seriesMap{
1: labels.FromStrings("job", "job1", "instance", "instance1", "__name__", "metric1_count"),
},
metadata: metadataMap{
"job1/instance1/metric1": &metadata.Entry{Metric: "metric1_count", MetricType: textparse.MetricTypeSummary, ValueType: metric_pb.MetricDescriptor_DOUBLE},
},
metricPrefix: "test.googleapis.com",
input: []tsdb.RefSample{
// The first result will always be nil due to reset timestamp handling.
{Ref: 1, T: 2000, V: 5}, // reset since first value; use as baseline
{Ref: 1, T: 3000, V: 8},
{Ref: 1, T: 4000, V: 9},
{Ref: 1, T: 5000, V: 8}, // reset since value dropped (8<9)
{Ref: 1, T: 6000, V: 4}, // reset since value dropped (4<8)
{Ref: 1, T: 7000, V: 12},
{Ref: 1, T: 8000, V: 1}, // reset since value dropped (1<12)
},
result: []*monitoring_pb.TimeSeries{
nil, // first sample of new series is always nil; used as reset baseline
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 2},
EndTime: &timestamp_pb.Timestamp{Seconds: 3},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{3},
},
}},
},
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 2},
EndTime: &timestamp_pb.Timestamp{Seconds: 4},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{4},
},
}},
},
// reset since value dropped (8<9)
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 4, Nanos: 1e9 - 1e6},
EndTime: &timestamp_pb.Timestamp{Seconds: 5},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{8},
},
}},
},
// reset since value dropped (4<8)
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 5, Nanos: 1e9 - 1e6},
EndTime: &timestamp_pb.Timestamp{Seconds: 6},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{4},
},
}},
},
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 5, Nanos: 1e9 - 1e6},
EndTime: &timestamp_pb.Timestamp{Seconds: 7},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{12},
},
}},
},
// reset since value dropped (1<12)
{
Resource: &monitoredres_pb.MonitoredResource{
Type: "resource2",
Labels: map[string]string{"resource_a": "resource2_a"},
},
Metric: &metric_pb.Metric{
Type: "test.googleapis.com/metric1_count",
Labels: map[string]string{},
},
MetricKind: metric_pb.MetricDescriptor_CUMULATIVE,
ValueType: metric_pb.MetricDescriptor_INT64,
Points: []*monitoring_pb.Point{{
Interval: &monitoring_pb.TimeInterval{
StartTime: &timestamp_pb.Timestamp{Seconds: 7, Nanos: 1e9 - 1e6},
EndTime: &timestamp_pb.Timestamp{Seconds: 8},
},
Value: &monitoring_pb.TypedValue{
Value: &monitoring_pb.TypedValue_Int64Value{1},
},
}},
},
},
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 799f9c5

Please sign in to comment.