Skip to content

Commit

Permalink
feat: Add spike mode for dynamic series adjustment
Browse files Browse the repository at this point in the history
  • Loading branch information
yomek33 committed Jul 30, 2024
1 parent 6918552 commit 9804cf3
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 7 deletions.
14 changes: 11 additions & 3 deletions cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ var (
seriesChangeRate = kingpin.Flag("series-change-rate", "The rate at which the number of active series changes over time. Applies to 'gradual-change' mode.").Default("100").Int()
maxSeriesCount = kingpin.Flag("max-series-count", "Maximum number of series to serve. Applies to 'gradual-change' mode.").Default("1000").Int()
minSeriesCount = kingpin.Flag("min-series-count", "Minimum number of series to serve. Applies to 'gradual-change' mode.").Default("100").Int()
spikeMultiplier = kingpin.Flag("spike-multiplier", "Multiplier for the spike mode.").Default("1.5").Float64()
metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int()
labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int()
constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings()
valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int()
labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int()
metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int()
seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change' and 'double-halve' modes.").Default("10").Int()
seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve'").Default("default").String()
seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve', 'spike'").Default("default").String()
port = kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList()
Expand Down Expand Up @@ -73,7 +74,14 @@ func main() {
" Usage: ./avalanche --operation-mode=gradual-change --series-change-interval=30 --series-change-rate=10 --series-count=20\n" +
" Description: This mode gradually increases the series count by seriesChangeRate on each tick up to maxSeriesCount,\n" +
" then decreases it back to the starting value, and repeats this cycle indefinitely.\n" +
" The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1."
" The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1." +
"\n" +
" spike:\n" +
" Periodically spikes the series count by a given multiplier.\n" +
" Usage: ./avalanche --series-operation-mode=spike --series-change-interval=180 --series-count=100 --spike-multiplier=1.5\n" +
" Description: This mode periodically increases the series count by a spike multiplier on one tick and\n" +
" then returns it to the original count on the next tick. This pattern repeats indefinitely,\n" +
" creating a spiking effect in the series count.\n"

kingpin.Parse()
if *maxSeriesCount <= *minSeriesCount {
Expand All @@ -91,7 +99,7 @@ func main() {

stop := make(chan struct{})
defer close(stop)
updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *seriesOperationMode, *constLabels, stop)
updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *spikeMultiplier, *seriesOperationMode, *constLabels, stop)
if err != nil {
log.Fatal(err)
}
Expand Down
40 changes: 39 additions & 1 deletion metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,35 @@ func handleGradualChangeMode(metricCount, metricLength, metricCycle, seriesCycle
}
}

func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) {
inSpike := false
initialSeriesCount := *currentSeriesCount
for tick := range changeSeriesChan {
metricsMux.Lock()
unregisterMetrics()
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle)
metricsMux.Unlock()

if inSpike {
inSpike = false
*currentSeriesCount = initialSeriesCount
} else {
inSpike = true
*currentSeriesCount = int(float64(initialSeriesCount) * spikeMultiplier)
}

fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount)

select {
case updateNotify <- struct{}{}:
default:
}
}
}

// RunMetrics creates a set of Prometheus test series that update over time
func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) {
func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, spikeMultiplier float64, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) {
labelKeys := make([]string, labelCount)
for idx := 0; idx < labelCount; idx++ {
labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", labelLength), idx)
Expand Down Expand Up @@ -249,6 +276,17 @@ func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSerie
go handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, valueTick)
go handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, seriesTick)

case "spike":
if spikeMultiplier < 1 {
return nil, fmt.Errorf("error: spikeMultiplier must be greater than or equal to 1, got %f", spikeMultiplier)
}
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, spikeMultiplier, seriesChangeInterval)
go handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount, spikeMultiplier, changeSeriesTick.C, updateNotify)
go handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, valueTick)
go handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, seriesTick)

default:
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
Expand Down
51 changes: 48 additions & 3 deletions metrics/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) {
labelCount = 1
maxSeriesCount = 10
minSeriesCount = 1
spikeMultiplier = 1.5
seriesChangeRate = 1
metricLength = 1
labelLength = 1
Expand All @@ -60,7 +61,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)
time.Sleep(2 * time.Second)
for i := 0; i < 4; i++ {
Expand All @@ -84,6 +85,7 @@ func TestRunMetricsGradualChange(t *testing.T) {
seriesCount = 100
maxSeriesCount = 30
minSeriesCount = 10
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
Expand All @@ -100,7 +102,7 @@ func TestRunMetricsGradualChange(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -142,6 +144,7 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) {
seriesCount = 100
maxSeriesCount = 10
minSeriesCount = 100
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
Expand All @@ -158,6 +161,48 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.Error(t, err)
}

func TestRunMetricsSpikeChange(t *testing.T) {
const (
metricCount = 1
labelCount = 1
initialSeriesCount = 100
maxSeriesCount = 30
minSeriesCount = 10
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
valueInterval = 100
seriesInterval = 100
metricInterval = 100
seriesChangeInterval = 10
operationMode = "spike"
constLabel = "constLabel=test"
)

stop := make(chan struct{})
defer close(stop)

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
for i := 0; i < 4; i++ {
time.Sleep(time.Duration(seriesChangeInterval) * time.Second)
if i%2 == 0 {
currentCount := countSeries(t, promRegistry)
expectedCount := initialSeriesCount
assert.Equal(t, expectedCount, currentCount, "Halved series count should be %d but got %d", int(expectedCount), currentCount)
} else {
currentCount := countSeries(t, promRegistry)
expectedCount := initialSeriesCount * spikeMultiplier
assert.Equal(t, int(expectedCount), currentCount, "Doubled series count should be %d but got %d", int(expectedCount), float64(currentCount))
}
}
}

0 comments on commit 9804cf3

Please sign in to comment.