diff --git a/pkg/expr/functions/timeLag/function.go b/pkg/expr/functions/timeLag/function.go index 3126f1cf..8ab6666c 100644 --- a/pkg/expr/functions/timeLag/function.go +++ b/pkg/expr/functions/timeLag/function.go @@ -41,31 +41,25 @@ func MakeTimeLag(consumerMetric, producerMetric *types.MetricData, name string) r.Values = make([]float64, len(consumerMetric.Values)) r.IsAbsent = make([]bool, len(consumerMetric.Values)) - var pIndex int32 = 0 + // Set initial producer index to -1 + var pIndex int32 = -1 for i, v := range consumerMetric.Values { // reset producer offset and scan it again if consumer offset decreased if i > 0 && consumerMetric.Values[i-1] > v { - pIndex = 0 + pIndex = -1 } + if consumerMetric.IsAbsent[i] || len(producerMetric.Values) == 0 { r.IsAbsent[i] = true continue } - npIndex := pIndex - // npIndex: find first index in producer metric that is higher than v - for (producerMetric.IsAbsent[npIndex] || producerMetric.Values[npIndex] <= v) && (npIndex+1) < pLen { - npIndex++ - for producerMetric.IsAbsent[npIndex] && (npIndex+1) < pLen { - npIndex++ - } - // maintain: pIndex is highest index for which producer metric <= v - if !producerMetric.IsAbsent[npIndex] && producerMetric.Values[npIndex] <= v { - pIndex = npIndex - } + // Move Producer index to the right as much as possible + for pIndex < (int32)(i) && (pIndex+1) < pLen && producerMetric.Values[pIndex+1] <= v { + pIndex += 1 } - // we can't compute timeLag for the value that is lower than the smallest data point in producer metric - if producerMetric.IsAbsent[pIndex] || producerMetric.Values[pIndex] > v { + + if pIndex == -1 || producerMetric.IsAbsent[pIndex] { r.IsAbsent[i] = true continue } diff --git a/pkg/expr/functions/timeLag/function_test.go b/pkg/expr/functions/timeLag/function_test.go index f16a3a5e..4831b0f4 100644 --- a/pkg/expr/functions/timeLag/function_test.go +++ b/pkg/expr/functions/timeLag/function_test.go @@ -49,6 +49,36 @@ func TestTimeLagSeriesMultiReturn(t *testing.T) { "timeLagSeries(metric2,metric2)": {types.MakeMetricData("timeLagSeries(metric2,metric2)", []float64{0, 0, 0, 0, 0}, 1, now32)}, }, }, + { + "timeLagSeries(metric1,metric2)", + map[parser.MetricRequest][]*types.MetricData{ + {"metric1", 0, 1}: { + types.MakeMetricData("metric1", []float64{1, 30, 40, 60, 80, 100, 100, 100, 100, 100, 150, 200, 200, 200}, 1, now32), + }, + {"metric2", 0, 1}: { + types.MakeMetricData("metric2", []float64{1, 100, 100, 100, 100, 100, 100, 100, 100, 200, 200, 200, 200, 200}, 1, now32), + }, + }, + "timeLagSeries", + map[string][]*types.MetricData{ + "timeLagSeries(metric1,metric2)": {types.MakeMetricData("timeLagSeries(metric1,metric2)", []float64{0, 1, 2, 3, 4, 0, 0, 0, 0, 1, 2, 0, 0, 0}, 1, now32)}, + }, + }, + { + "timeLagSeries(metric1,metric2)", + map[parser.MetricRequest][]*types.MetricData{ + {"metric1", 0, 1}: { + types.MakeMetricData("metric1", []float64{50, 50, 50, 60, 70, 75, 80, 90, 90, 90}, 1, now32), + }, + {"metric2", 0, 1}: { + types.MakeMetricData("metric2", []float64{50, 50, 50, 90, 90, 90, 90, 90, 90, 90}, 1, now32), + }, + }, + "timeLagSeries", + map[string][]*types.MetricData{ + "timeLagSeries(metric1,metric2)": {types.MakeMetricData("timeLagSeries(metric1,metric2)", []float64{0, 0, 0, 1, 2, 3, 4, 0, 0, 0}, 1, now32)}, + }, + }, } for _, tt := range tests { @@ -58,7 +88,6 @@ func TestTimeLagSeriesMultiReturn(t *testing.T) { th.TestMultiReturnEvalExpr(t, &tt) }) } - } func TestTimeLagSeries(t *testing.T) { @@ -72,7 +101,16 @@ func TestTimeLagSeries(t *testing.T) { {"metric2", 0, 1}: {types.MakeMetricData("metric2", []float64{2, math.NaN(), 3, math.NaN(), 5, 12}, 1, now32)}, }, []*types.MetricData{types.MakeMetricData("timeLagSeries(metric1,metric2)", - []float64{math.NaN(), math.NaN(), math.NaN(), 1, 2, 0}, 1, now32)}, + []float64{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 0}, 1, now32)}, + }, + { + "timeLagSeries(metric1,metric2)", + map[parser.MetricRequest][]*types.MetricData{ + {"metric1", 0, 1}: {types.MakeMetricData("metric1", []float64{4857060, 4857060, 4857060, 4857200, 4858101, 4859001, 4859901}, 1, now32)}, + {"metric2", 0, 1}: {types.MakeMetricData("metric2", []float64{4857060, 4857060, 4857060, 4884065, 4884065, 4884065, 4884065}, 1, now32)}, + }, + []*types.MetricData{types.MakeMetricData("timeLagSeries(metric1,metric2)", + []float64{0, 0, 0, 1, 2, 3, 4}, 1, now32)}, }, { "timeLagSeries(metric[12])", @@ -83,7 +121,7 @@ func TestTimeLagSeries(t *testing.T) { }, }, []*types.MetricData{types.MakeMetricData("timeLagSeries(metric[12])", - []float64{math.NaN(), math.NaN(), math.NaN(), 1, 2, 0}, 1, now32)}, + []float64{math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 0}, 1, now32)}, }, { "timeLagSeries(metric1,metric2)", @@ -108,6 +146,7 @@ func TestTimeLagSeries(t *testing.T) { for _, tt := range tests { tt := tt testName := tt.Target + t.Run(testName, func(t *testing.T) { th.TestEvalExpr(t, &tt) })