Skip to content

Commit

Permalink
Merge pull request #70 from yomek33/series-spike
Browse files Browse the repository at this point in the history
 spike mode for dynamic series adjustment
  • Loading branch information
cstyan authored Aug 22, 2024
2 parents 4b732e1 + d4f9640 commit 3558d56
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 8 deletions.
16 changes: 12 additions & 4 deletions cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ var (
seriesChangeRate = kingpin.Flag("series-change-rate", "The rate at which the number of active series changes over time. Applies to 'gradual-change' mode.").Default("100").Int()
maxSeriesCount = kingpin.Flag("max-series-count", "Maximum number of series to serve. Applies to 'gradual-change' mode.").Default("1000").Int()
minSeriesCount = kingpin.Flag("min-series-count", "Minimum number of series to serve. Applies to 'gradual-change' mode.").Default("100").Int()
spikeMultiplier = kingpin.Flag("spike-multiplier", "Multiplier for the spike mode.").Default("1.5").Float64()
metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int()
labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int()
constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings()
valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int()
labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int()
metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int()
seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change' and 'double-halve' modes.").Default("30").Int()
seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve'").Default("default").String()
seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change', 'double-halve' and 'spike' modes.").Default("30").Int()
seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve', 'spike'").Default("default").String()
port = kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList()
Expand Down Expand Up @@ -73,7 +74,14 @@ func main() {
" Usage: ./avalanche --series-operation-mode=gradual-change --series-change-interval=30 --series-change-rate=100 --max-series-count=2000 --min-series-count=200\n" +
" Description: This mode gradually increases the series count by seriesChangeRate on each tick up to maxSeriesCount,\n" +
" then decreases it back to the minSeriesCount, and repeats this cycle indefinitely.\n" +
" The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1."
" The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1." +
"\n" +
" spike:\n" +
" Periodically spikes the series count by a given multiplier.\n" +
" Usage: ./avalanche --series-operation-mode=spike --series-change-interval=180 --series-count=100 --spike-multiplier=1.5\n" +
" Description: This mode periodically increases the series count by a spike multiplier on one tick and\n" +
" then returns it to the original count on the next tick. This pattern repeats indefinitely,\n" +
" creating a spiking effect in the series count.\n"

kingpin.Parse()
if *maxSeriesCount <= *minSeriesCount {
Expand All @@ -91,7 +99,7 @@ func main() {

stop := make(chan struct{})
defer close(stop)
updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *seriesOperationMode, *constLabels, stop)
updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *spikeMultiplier, *seriesOperationMode, *constLabels, stop)
if err != nil {
log.Fatal(err)
}
Expand Down
37 changes: 36 additions & 1 deletion metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,32 @@ func handleGradualChangeMode(metricCount, metricLength, metricCycle, seriesCycle
}
}

func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) {
initialSeriesCount := *currentSeriesCount
for tick := range changeSeriesChan {
metricsMux.Lock()
unregisterMetrics()
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle)
metricsMux.Unlock()

if *currentSeriesCount > initialSeriesCount {
*currentSeriesCount = initialSeriesCount
} else {
*currentSeriesCount = int(float64(initialSeriesCount) * spikeMultiplier)
}

fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount)

select {
case updateNotify <- struct{}{}:
default:
}
}
}

// RunMetrics creates a set of Prometheus test series that update over time
func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) {
func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, spikeMultiplier float64, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) {
labelKeys := make([]string, labelCount)
for idx := 0; idx < labelCount; idx++ {
labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", labelLength), idx)
Expand Down Expand Up @@ -249,6 +273,17 @@ func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSerie
go handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, valueTick)
go handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, seriesTick)

case "spike":
if spikeMultiplier < 1 {
return nil, fmt.Errorf("error: spikeMultiplier must be greater than or equal to 1, got %f", spikeMultiplier)
}
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, spikeMultiplier, seriesChangeInterval)
go handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount, spikeMultiplier, changeSeriesTick.C, updateNotify)
go handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, valueTick)
go handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, seriesTick)

default:
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
Expand Down
51 changes: 48 additions & 3 deletions metrics/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) {
labelCount = 1
maxSeriesCount = 10
minSeriesCount = 1
spikeMultiplier = 1.5
seriesChangeRate = 1
metricLength = 1
labelLength = 1
Expand All @@ -60,7 +61,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)
time.Sleep(2 * time.Second)
for i := 0; i < 4; i++ {
Expand All @@ -84,6 +85,7 @@ func TestRunMetricsGradualChange(t *testing.T) {
seriesCount = 100
maxSeriesCount = 30
minSeriesCount = 10
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
Expand All @@ -100,7 +102,7 @@ func TestRunMetricsGradualChange(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -142,6 +144,7 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) {
seriesCount = 100
maxSeriesCount = 10
minSeriesCount = 100
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
Expand All @@ -158,6 +161,48 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.Error(t, err)
}

func TestRunMetricsSpikeChange(t *testing.T) {
const (
metricCount = 1
labelCount = 1
initialSeriesCount = 100
maxSeriesCount = 30
minSeriesCount = 10
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
valueInterval = 100
seriesInterval = 100
metricInterval = 100
seriesChangeInterval = 10
operationMode = "spike"
constLabel = "constLabel=test"
)

stop := make(chan struct{})
defer close(stop)

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
for i := 0; i < 4; i++ {
time.Sleep(time.Duration(seriesChangeInterval) * time.Second)
if i%2 == 0 {
currentCount := countSeries(t, promRegistry)
expectedCount := initialSeriesCount
assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Halved series count should be %d but got %d", expectedCount, currentCount))
} else {
currentCount := countSeries(t, promRegistry)
expectedCount := int(float64(initialSeriesCount) * spikeMultiplier)
assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Multiplied the series count by %.1f, should be %d but got %d", spikeMultiplier, expectedCount, currentCount))
}
}
}

0 comments on commit 3558d56

Please sign in to comment.