From 47eeb45e1ae2079cff3519d4efd125a712c61f31 Mon Sep 17 00:00:00 2001 From: yomek33 Date: Wed, 31 Jul 2024 04:28:26 +0900 Subject: [PATCH 1/4] feat: Add spike mode for dynamic series adjustment Signed-off-by: yomek33 --- cmd/avalanche.go | 14 +++++++++--- metrics/serve.go | 40 ++++++++++++++++++++++++++++++++- metrics/serve_test.go | 51 ++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 98 insertions(+), 7 deletions(-) diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 2e5af7d..11ea64a 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -37,14 +37,15 @@ var ( seriesChangeRate = kingpin.Flag("series-change-rate", "The rate at which the number of active series changes over time. Applies to 'gradual-change' mode.").Default("100").Int() maxSeriesCount = kingpin.Flag("max-series-count", "Maximum number of series to serve. Applies to 'gradual-change' mode.").Default("1000").Int() minSeriesCount = kingpin.Flag("min-series-count", "Minimum number of series to serve. Applies to 'gradual-change' mode.").Default("100").Int() + spikeMultiplier = kingpin.Flag("spike-multiplier", "Multiplier for the spike mode.").Default("1.5").Float64() metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int() labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int() constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings() valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int() labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int() metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int() - seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change' and 'double-halve' modes.").Default("30").Int() - seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve'").Default("default").String() + seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change', 'double-halve' and 'spike' modes.").Default("30").Int() + seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve', 'spike'").Default("default").String() port = kingpin.Flag("port", "Port to serve at").Default("9001").Int() remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() @@ -74,6 +75,13 @@ func main() { " Description: This mode gradually increases the series count by seriesChangeRate on each tick up to maxSeriesCount,\n" + " then decreases it back to the minSeriesCount, and repeats this cycle indefinitely.\n" + " The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1." + "\n" + + " spike:\n" + + " Periodically spikes the series count by a given multiplier.\n" + + " Usage: ./avalanche --series-operation-mode=spike --series-change-interval=180 --series-count=100 --spike-multiplier=1.5\n" + + " Description: This mode periodically increases the series count by a spike multiplier on one tick and\n" + + " then returns it to the original count on the next tick. This pattern repeats indefinitely,\n" + + " creating a spiking effect in the series count.\n" kingpin.Parse() if *maxSeriesCount <= *minSeriesCount { @@ -91,7 +99,7 @@ func main() { stop := make(chan struct{}) defer close(stop) - updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *seriesOperationMode, *constLabels, stop) + updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *spikeMultiplier, *seriesOperationMode, *constLabels, stop) if err != nil { log.Fatal(err) } diff --git a/metrics/serve.go b/metrics/serve.go index 30dcac1..4b95e33 100644 --- a/metrics/serve.go +++ b/metrics/serve.go @@ -198,8 +198,35 @@ func handleGradualChangeMode(metricCount, metricLength, metricCycle, seriesCycle } } +func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) { + inSpike := false + initialSeriesCount := *currentSeriesCount + for tick := range changeSeriesChan { + metricsMux.Lock() + unregisterMetrics() + registerMetrics(metricCount, metricLength, metricCycle, labelKeys) + cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle) + metricsMux.Unlock() + + if inSpike { + inSpike = false + *currentSeriesCount = initialSeriesCount + } else { + inSpike = true + *currentSeriesCount = int(float64(initialSeriesCount) * spikeMultiplier) + } + + fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount) + + select { + case updateNotify <- struct{}{}: + default: + } + } +} + // RunMetrics creates a set of Prometheus test series that update over time -func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) { +func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, spikeMultiplier float64, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) { labelKeys := make([]string, labelCount) for idx := 0; idx < labelCount; idx++ { labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", labelLength), idx) @@ -249,6 +276,17 @@ func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSerie go handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, valueTick) go handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, seriesTick) + case "spike": + if spikeMultiplier < 1 { + return nil, fmt.Errorf("error: spikeMultiplier must be greater than or equal to 1, got %f", spikeMultiplier) + } + registerMetrics(metricCount, metricLength, metricCycle, labelKeys) + cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) + fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, spikeMultiplier, seriesChangeInterval) + go handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle, labelKeys, labelValues, ¤tSeriesCount, spikeMultiplier, changeSeriesTick.C, updateNotify) + go handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, valueTick) + go handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, seriesTick) + default: registerMetrics(metricCount, metricLength, metricCycle, labelKeys) cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) diff --git a/metrics/serve_test.go b/metrics/serve_test.go index fd8800c..fba6a74 100644 --- a/metrics/serve_test.go +++ b/metrics/serve_test.go @@ -44,6 +44,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) { labelCount = 1 maxSeriesCount = 10 minSeriesCount = 1 + spikeMultiplier = 1.5 seriesChangeRate = 1 metricLength = 1 labelLength = 1 @@ -60,7 +61,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) { promRegistry = prometheus.NewRegistry() - _, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop) + _, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) assert.NoError(t, err) time.Sleep(2 * time.Second) for i := 0; i < 4; i++ { @@ -84,6 +85,7 @@ func TestRunMetricsGradualChange(t *testing.T) { seriesCount = 100 maxSeriesCount = 30 minSeriesCount = 10 + spikeMultiplier = 1.5 seriesChangeRate = 10 metricLength = 1 labelLength = 1 @@ -100,7 +102,7 @@ func TestRunMetricsGradualChange(t *testing.T) { promRegistry = prometheus.NewRegistry() - _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop) + _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) assert.NoError(t, err) time.Sleep(2 * time.Second) @@ -142,6 +144,7 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) { seriesCount = 100 maxSeriesCount = 10 minSeriesCount = 100 + spikeMultiplier = 1.5 seriesChangeRate = 10 metricLength = 1 labelLength = 1 @@ -158,6 +161,48 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) { promRegistry = prometheus.NewRegistry() - _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop) + _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) assert.Error(t, err) } + +func TestRunMetricsSpikeChange(t *testing.T) { + const ( + metricCount = 1 + labelCount = 1 + initialSeriesCount = 100 + maxSeriesCount = 30 + minSeriesCount = 10 + spikeMultiplier = 1.5 + seriesChangeRate = 10 + metricLength = 1 + labelLength = 1 + valueInterval = 100 + seriesInterval = 100 + metricInterval = 100 + seriesChangeInterval = 10 + operationMode = "spike" + constLabel = "constLabel=test" + ) + + stop := make(chan struct{}) + defer close(stop) + + promRegistry = prometheus.NewRegistry() + + _, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) + assert.NoError(t, err) + + time.Sleep(2 * time.Second) + for i := 0; i < 4; i++ { + time.Sleep(time.Duration(seriesChangeInterval) * time.Second) + if i%2 == 0 { + currentCount := countSeries(t, promRegistry) + expectedCount := initialSeriesCount + assert.Equal(t, expectedCount, currentCount, "Halved series count should be %d but got %d", int(expectedCount), currentCount) + } else { + currentCount := countSeries(t, promRegistry) + expectedCount := initialSeriesCount * spikeMultiplier + assert.Equal(t, int(expectedCount), currentCount, "Doubled series count should be %d but got %d", int(expectedCount), float64(currentCount)) + } + } +} From 84b0abb75d57846ac27a5c7d3ecce381d292093e Mon Sep 17 00:00:00 2001 From: yomek33 Date: Fri, 2 Aug 2024 21:48:29 +0900 Subject: [PATCH 2/4] FIX logic better Signed-off-by: yomek33 --- metrics/serve.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/metrics/serve.go b/metrics/serve.go index 4b95e33..e0d033c 100644 --- a/metrics/serve.go +++ b/metrics/serve.go @@ -199,7 +199,6 @@ func handleGradualChangeMode(metricCount, metricLength, metricCycle, seriesCycle } func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) { - inSpike := false initialSeriesCount := *currentSeriesCount for tick := range changeSeriesChan { metricsMux.Lock() @@ -208,11 +207,9 @@ func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, la cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle) metricsMux.Unlock() - if inSpike { - inSpike = false + if *currentSeriesCount > initialSeriesCount { *currentSeriesCount = initialSeriesCount } else { - inSpike = true *currentSeriesCount = int(float64(initialSeriesCount) * spikeMultiplier) } From b7277dcf0199acbeb19a7047d6f21477e119a2b5 Mon Sep 17 00:00:00 2001 From: yomek33 Date: Wed, 7 Aug 2024 13:39:26 +0900 Subject: [PATCH 3/4] format Signed-off-by: yomek33 --- cmd/avalanche.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 11ea64a..7d00b5c 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -74,7 +74,7 @@ func main() { " Usage: ./avalanche --series-operation-mode=gradual-change --series-change-interval=30 --series-change-rate=100 --max-series-count=2000 --min-series-count=200\n" + " Description: This mode gradually increases the series count by seriesChangeRate on each tick up to maxSeriesCount,\n" + " then decreases it back to the minSeriesCount, and repeats this cycle indefinitely.\n" + - " The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1." + " The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1." + "\n" + " spike:\n" + " Periodically spikes the series count by a given multiplier.\n" + From d4f9640cfb7c5af935faa0581e6825b111e37eb2 Mon Sep 17 00:00:00 2001 From: yomek33 Date: Thu, 22 Aug 2024 20:20:32 +0900 Subject: [PATCH 4/4] Improve text in TestRunMetricsSpikeChange Signed-off-by: yomek33 --- metrics/serve_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metrics/serve_test.go b/metrics/serve_test.go index fba6a74..357f82e 100644 --- a/metrics/serve_test.go +++ b/metrics/serve_test.go @@ -198,11 +198,11 @@ func TestRunMetricsSpikeChange(t *testing.T) { if i%2 == 0 { currentCount := countSeries(t, promRegistry) expectedCount := initialSeriesCount - assert.Equal(t, expectedCount, currentCount, "Halved series count should be %d but got %d", int(expectedCount), currentCount) + assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Halved series count should be %d but got %d", expectedCount, currentCount)) } else { currentCount := countSeries(t, promRegistry) - expectedCount := initialSeriesCount * spikeMultiplier - assert.Equal(t, int(expectedCount), currentCount, "Doubled series count should be %d but got %d", int(expectedCount), float64(currentCount)) + expectedCount := int(float64(initialSeriesCount) * spikeMultiplier) + assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Multiplied the series count by %.1f, should be %d but got %d", spikeMultiplier, expectedCount, currentCount)) } } }