diff --git a/cmd/avalanche.go b/cmd/avalanche.go index 65fe770..04e527d 100644 --- a/cmd/avalanche.go +++ b/cmd/avalanche.go @@ -37,6 +37,7 @@ var ( seriesChangeRate = kingpin.Flag("series-change-rate", "The rate at which the number of active series changes over time. Applies to 'gradual-change' mode.").Default("100").Int() maxSeriesCount = kingpin.Flag("max-series-count", "Maximum number of series to serve. Applies to 'gradual-change' mode.").Default("1000").Int() minSeriesCount = kingpin.Flag("min-series-count", "Minimum number of series to serve. Applies to 'gradual-change' mode.").Default("100").Int() + spikeMultiplier = kingpin.Flag("spike-multiplier", "Multiplier for the spike mode.").Default("1.5").Float64() metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int() labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int() constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings() @@ -44,7 +45,7 @@ var ( labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int() metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int() seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change' and 'double-halve' modes.").Default("10").Int() - seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve'").Default("default").String() + seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve', 'spike'").Default("default").String() port = kingpin.Flag("port", "Port to serve at").Default("9001").Int() remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL() remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList() @@ -73,7 +74,14 @@ func main() { " Usage: ./avalanche --operation-mode=gradual-change --series-change-interval=30 --series-change-rate=10 --series-count=20\n" + " Description: This mode gradually increases the series count by seriesChangeRate on each tick up to maxSeriesCount,\n" + " then decreases it back to the starting value, and repeats this cycle indefinitely.\n" + - " The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1." + " The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1." + + "\n" + + " spike:\n" + + " Periodically spikes the series count by a given multiplier.\n" + + " Usage: ./avalanche --series-operation-mode=spike --series-change-interval=180 --series-count=100 --spike-multiplier=1.5\n" + + " Description: This mode periodically increases the series count by a spike multiplier on one tick and\n" + + " then returns it to the original count on the next tick. This pattern repeats indefinitely,\n" + + " creating a spiking effect in the series count.\n" kingpin.Parse() if *maxSeriesCount <= *minSeriesCount { @@ -91,7 +99,7 @@ func main() { stop := make(chan struct{}) defer close(stop) - updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *seriesOperationMode, *constLabels, stop) + updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *spikeMultiplier, *seriesOperationMode, *constLabels, stop) if err != nil { log.Fatal(err) } diff --git a/metrics/serve.go b/metrics/serve.go index 30dcac1..4b95e33 100644 --- a/metrics/serve.go +++ b/metrics/serve.go @@ -198,8 +198,35 @@ func handleGradualChangeMode(metricCount, metricLength, metricCycle, seriesCycle } } +func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) { + inSpike := false + initialSeriesCount := *currentSeriesCount + for tick := range changeSeriesChan { + metricsMux.Lock() + unregisterMetrics() + registerMetrics(metricCount, metricLength, metricCycle, labelKeys) + cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle) + metricsMux.Unlock() + + if inSpike { + inSpike = false + *currentSeriesCount = initialSeriesCount + } else { + inSpike = true + *currentSeriesCount = int(float64(initialSeriesCount) * spikeMultiplier) + } + + fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount) + + select { + case updateNotify <- struct{}{}: + default: + } + } +} + // RunMetrics creates a set of Prometheus test series that update over time -func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) { +func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, spikeMultiplier float64, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) { labelKeys := make([]string, labelCount) for idx := 0; idx < labelCount; idx++ { labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", labelLength), idx) @@ -249,6 +276,17 @@ func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSerie go handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, valueTick) go handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, seriesTick) + case "spike": + if spikeMultiplier < 1 { + return nil, fmt.Errorf("error: spikeMultiplier must be greater than or equal to 1, got %f", spikeMultiplier) + } + registerMetrics(metricCount, metricLength, metricCycle, labelKeys) + cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) + fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, spikeMultiplier, seriesChangeInterval) + go handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle, labelKeys, labelValues, ¤tSeriesCount, spikeMultiplier, changeSeriesTick.C, updateNotify) + go handleValueTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, valueTick) + go handleSeriesTicks(&labelKeys, &labelValues, ¤tSeriesCount, &seriesCycle, updateNotify, seriesTick) + default: registerMetrics(metricCount, metricLength, metricCycle, labelKeys) cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle) diff --git a/metrics/serve_test.go b/metrics/serve_test.go index fd8800c..fba6a74 100644 --- a/metrics/serve_test.go +++ b/metrics/serve_test.go @@ -44,6 +44,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) { labelCount = 1 maxSeriesCount = 10 minSeriesCount = 1 + spikeMultiplier = 1.5 seriesChangeRate = 1 metricLength = 1 labelLength = 1 @@ -60,7 +61,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) { promRegistry = prometheus.NewRegistry() - _, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop) + _, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) assert.NoError(t, err) time.Sleep(2 * time.Second) for i := 0; i < 4; i++ { @@ -84,6 +85,7 @@ func TestRunMetricsGradualChange(t *testing.T) { seriesCount = 100 maxSeriesCount = 30 minSeriesCount = 10 + spikeMultiplier = 1.5 seriesChangeRate = 10 metricLength = 1 labelLength = 1 @@ -100,7 +102,7 @@ func TestRunMetricsGradualChange(t *testing.T) { promRegistry = prometheus.NewRegistry() - _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop) + _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) assert.NoError(t, err) time.Sleep(2 * time.Second) @@ -142,6 +144,7 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) { seriesCount = 100 maxSeriesCount = 10 minSeriesCount = 100 + spikeMultiplier = 1.5 seriesChangeRate = 10 metricLength = 1 labelLength = 1 @@ -158,6 +161,48 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) { promRegistry = prometheus.NewRegistry() - _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop) + _, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) assert.Error(t, err) } + +func TestRunMetricsSpikeChange(t *testing.T) { + const ( + metricCount = 1 + labelCount = 1 + initialSeriesCount = 100 + maxSeriesCount = 30 + minSeriesCount = 10 + spikeMultiplier = 1.5 + seriesChangeRate = 10 + metricLength = 1 + labelLength = 1 + valueInterval = 100 + seriesInterval = 100 + metricInterval = 100 + seriesChangeInterval = 10 + operationMode = "spike" + constLabel = "constLabel=test" + ) + + stop := make(chan struct{}) + defer close(stop) + + promRegistry = prometheus.NewRegistry() + + _, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop) + assert.NoError(t, err) + + time.Sleep(2 * time.Second) + for i := 0; i < 4; i++ { + time.Sleep(time.Duration(seriesChangeInterval) * time.Second) + if i%2 == 0 { + currentCount := countSeries(t, promRegistry) + expectedCount := initialSeriesCount + assert.Equal(t, expectedCount, currentCount, "Halved series count should be %d but got %d", int(expectedCount), currentCount) + } else { + currentCount := countSeries(t, promRegistry) + expectedCount := initialSeriesCount * spikeMultiplier + assert.Equal(t, int(expectedCount), currentCount, "Doubled series count should be %d but got %d", int(expectedCount), float64(currentCount)) + } + } +}