From 799f9c57481f287099eb00e572ec308b25bf414f Mon Sep 17 00:00:00 2001 From: jkschulz <35477443+jkschulz@users.noreply.github.com> Date: Tue, 12 Jan 2021 16:39:59 -0500 Subject: [PATCH] Fix reset handling by checking against previous value instead of reset value. (#263) --- retrieval/series_cache.go | 19 ++++- retrieval/transform_test.go | 164 +++++++++++++++++++++++++++++++++++- 2 files changed, 178 insertions(+), 5 deletions(-) diff --git a/retrieval/series_cache.go b/retrieval/series_cache.go index e68154cc..0fd7c538 100644 --- a/retrieval/series_cache.go +++ b/retrieval/series_cache.go @@ -100,9 +100,21 @@ type seriesCacheEntry struct { suffix string hash uint64 - hasReset bool + // Whether the series has been reset/initialized yet. This is false only for + // the first sample of a new series in the cache, which causes the initial + // "reset". After that, it is always true. + hasReset bool + + // The value and timestamp of the latest reset. The timestamp is when it + // occurred, and the value is what it was reset to. resetValue will initially + // be the value of the first sample, and then 0 for every subsequent reset. resetValue float64 resetTimestamp int64 + + // Value of the most recent point seen for the time series. If a new value is + // less than the previous, then the series has reset. + previousValue float64 + // maxSegment indicates the maximum WAL segment index in which // the series was first logged. // By providing it as an upper bound, we can safely delete a series entry @@ -295,12 +307,14 @@ func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, f if !hasReset { e.resetTimestamp = t e.resetValue = v + e.previousValue = v // If we just initialized the reset timestamp, this sample should be skipped. // We don't know the window over which the current cumulative value was built up over. // The next sample for will be considered from this point onwards. return 0, 0, false } - if v < e.resetValue { + if v < e.previousValue { + // If the value has dropped, there's been a reset. // If the series was reset, set the reset timestamp to be one millisecond // before the timestamp of the current sample. // We don't know the true reset time but this ensures the range is non-zero @@ -308,6 +322,7 @@ func (c *seriesCache) getResetAdjusted(ref uint64, t int64, v float64) (int64, f e.resetValue = 0 e.resetTimestamp = t - 1 } + e.previousValue = v return e.resetTimestamp, v - e.resetValue, true } diff --git a/retrieval/transform_test.go b/retrieval/transform_test.go index b0d79f7a..b446042c 100644 --- a/retrieval/transform_test.go +++ b/retrieval/transform_test.go @@ -128,7 +128,7 @@ func TestSampleBuilder(t *testing.T) { {Ref: 2, T: 2000, V: 5.5}, {Ref: 2, T: 3000, V: 8}, {Ref: 2, T: 4000, V: 9}, - {Ref: 2, T: 5000, V: 3}, + {Ref: 2, T: 5000, V: 7}, {Ref: 1, T: 1000, V: 200}, {Ref: 3, T: 3000, V: 1}, {Ref: 4, T: 4000, V: 2}, @@ -184,7 +184,7 @@ func TestSampleBuilder(t *testing.T) { }, }}, }, - { // 3 + { // 3: Reset series since sample's value is less than previous value. Resource: &monitoredres_pb.MonitoredResource{ Type: "resource2", Labels: map[string]string{"resource_a": "resource2_a"}, @@ -201,7 +201,7 @@ func TestSampleBuilder(t *testing.T) { EndTime: ×tamp_pb.Timestamp{Seconds: 5}, }, Value: &monitoring_pb.TypedValue{ - Value: &monitoring_pb.TypedValue_DoubleValue{3}, + Value: &monitoring_pb.TypedValue_DoubleValue{7}, }, }}, }, @@ -973,6 +973,164 @@ func TestSampleBuilder(t *testing.T) { }, }, }, + // Samples resulting in multiple resets for a single time series. + { + targets: targetMap{ + "job1/instance1": &targets.Target{ + Labels: promlabels.FromStrings("job", "job1", "instance", "instance1"), + DiscoveredLabels: promlabels.FromStrings("__resource_a", "resource2_a"), + }, + }, + series: seriesMap{ + 1: labels.FromStrings("job", "job1", "instance", "instance1", "__name__", "metric1_count"), + }, + metadata: metadataMap{ + "job1/instance1/metric1": &metadata.Entry{Metric: "metric1_count", MetricType: textparse.MetricTypeSummary, ValueType: metric_pb.MetricDescriptor_DOUBLE}, + }, + metricPrefix: "test.googleapis.com", + input: []tsdb.RefSample{ + // The first result will always be nil due to reset timestamp handling. + {Ref: 1, T: 2000, V: 5}, // reset since first value; use as baseline + {Ref: 1, T: 3000, V: 8}, + {Ref: 1, T: 4000, V: 9}, + {Ref: 1, T: 5000, V: 8}, // reset since value dropped (8<9) + {Ref: 1, T: 6000, V: 4}, // reset since value dropped (4<8) + {Ref: 1, T: 7000, V: 12}, + {Ref: 1, T: 8000, V: 1}, // reset since value dropped (1<12) + }, + result: []*monitoring_pb.TimeSeries{ + nil, // first sample of new series is always nil; used as reset baseline + { + Resource: &monitoredres_pb.MonitoredResource{ + Type: "resource2", + Labels: map[string]string{"resource_a": "resource2_a"}, + }, + Metric: &metric_pb.Metric{ + Type: "test.googleapis.com/metric1_count", + Labels: map[string]string{}, + }, + MetricKind: metric_pb.MetricDescriptor_CUMULATIVE, + ValueType: metric_pb.MetricDescriptor_INT64, + Points: []*monitoring_pb.Point{{ + Interval: &monitoring_pb.TimeInterval{ + StartTime: ×tamp_pb.Timestamp{Seconds: 2}, + EndTime: ×tamp_pb.Timestamp{Seconds: 3}, + }, + Value: &monitoring_pb.TypedValue{ + Value: &monitoring_pb.TypedValue_Int64Value{3}, + }, + }}, + }, + { + Resource: &monitoredres_pb.MonitoredResource{ + Type: "resource2", + Labels: map[string]string{"resource_a": "resource2_a"}, + }, + Metric: &metric_pb.Metric{ + Type: "test.googleapis.com/metric1_count", + Labels: map[string]string{}, + }, + MetricKind: metric_pb.MetricDescriptor_CUMULATIVE, + ValueType: metric_pb.MetricDescriptor_INT64, + Points: []*monitoring_pb.Point{{ + Interval: &monitoring_pb.TimeInterval{ + StartTime: ×tamp_pb.Timestamp{Seconds: 2}, + EndTime: ×tamp_pb.Timestamp{Seconds: 4}, + }, + Value: &monitoring_pb.TypedValue{ + Value: &monitoring_pb.TypedValue_Int64Value{4}, + }, + }}, + }, + // reset since value dropped (8<9) + { + Resource: &monitoredres_pb.MonitoredResource{ + Type: "resource2", + Labels: map[string]string{"resource_a": "resource2_a"}, + }, + Metric: &metric_pb.Metric{ + Type: "test.googleapis.com/metric1_count", + Labels: map[string]string{}, + }, + MetricKind: metric_pb.MetricDescriptor_CUMULATIVE, + ValueType: metric_pb.MetricDescriptor_INT64, + Points: []*monitoring_pb.Point{{ + Interval: &monitoring_pb.TimeInterval{ + StartTime: ×tamp_pb.Timestamp{Seconds: 4, Nanos: 1e9 - 1e6}, + EndTime: ×tamp_pb.Timestamp{Seconds: 5}, + }, + Value: &monitoring_pb.TypedValue{ + Value: &monitoring_pb.TypedValue_Int64Value{8}, + }, + }}, + }, + // reset since value dropped (4<8) + { + Resource: &monitoredres_pb.MonitoredResource{ + Type: "resource2", + Labels: map[string]string{"resource_a": "resource2_a"}, + }, + Metric: &metric_pb.Metric{ + Type: "test.googleapis.com/metric1_count", + Labels: map[string]string{}, + }, + MetricKind: metric_pb.MetricDescriptor_CUMULATIVE, + ValueType: metric_pb.MetricDescriptor_INT64, + Points: []*monitoring_pb.Point{{ + Interval: &monitoring_pb.TimeInterval{ + StartTime: ×tamp_pb.Timestamp{Seconds: 5, Nanos: 1e9 - 1e6}, + EndTime: ×tamp_pb.Timestamp{Seconds: 6}, + }, + Value: &monitoring_pb.TypedValue{ + Value: &monitoring_pb.TypedValue_Int64Value{4}, + }, + }}, + }, + { + Resource: &monitoredres_pb.MonitoredResource{ + Type: "resource2", + Labels: map[string]string{"resource_a": "resource2_a"}, + }, + Metric: &metric_pb.Metric{ + Type: "test.googleapis.com/metric1_count", + Labels: map[string]string{}, + }, + MetricKind: metric_pb.MetricDescriptor_CUMULATIVE, + ValueType: metric_pb.MetricDescriptor_INT64, + Points: []*monitoring_pb.Point{{ + Interval: &monitoring_pb.TimeInterval{ + StartTime: ×tamp_pb.Timestamp{Seconds: 5, Nanos: 1e9 - 1e6}, + EndTime: ×tamp_pb.Timestamp{Seconds: 7}, + }, + Value: &monitoring_pb.TypedValue{ + Value: &monitoring_pb.TypedValue_Int64Value{12}, + }, + }}, + }, + // reset since value dropped (1<12) + { + Resource: &monitoredres_pb.MonitoredResource{ + Type: "resource2", + Labels: map[string]string{"resource_a": "resource2_a"}, + }, + Metric: &metric_pb.Metric{ + Type: "test.googleapis.com/metric1_count", + Labels: map[string]string{}, + }, + MetricKind: metric_pb.MetricDescriptor_CUMULATIVE, + ValueType: metric_pb.MetricDescriptor_INT64, + Points: []*monitoring_pb.Point{{ + Interval: &monitoring_pb.TimeInterval{ + StartTime: ×tamp_pb.Timestamp{Seconds: 7, Nanos: 1e9 - 1e6}, + EndTime: ×tamp_pb.Timestamp{Seconds: 8}, + }, + Value: &monitoring_pb.TypedValue{ + Value: &monitoring_pb.TypedValue_Int64Value{1}, + }, + }}, + }, + }, + }, } ctx, cancel := context.WithCancel(context.Background()) defer cancel()