Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spike mode for dynamic series adjustment #70

Merged
merged 4 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ var (
seriesChangeRate = kingpin.Flag("series-change-rate", "The rate at which the number of active series changes over time. Applies to 'gradual-change' mode.").Default("100").Int()
maxSeriesCount = kingpin.Flag("max-series-count", "Maximum number of series to serve. Applies to 'gradual-change' mode.").Default("1000").Int()
minSeriesCount = kingpin.Flag("min-series-count", "Minimum number of series to serve. Applies to 'gradual-change' mode.").Default("100").Int()
spikeMultiplier = kingpin.Flag("spike-multiplier", "Multiplier for the spike mode.").Default("1.5").Float64()
metricLength = kingpin.Flag("metricname-length", "Modify length of metric names.").Default("5").Int()
labelLength = kingpin.Flag("labelname-length", "Modify length of label names.").Default("5").Int()
constLabels = kingpin.Flag("const-label", "Constant label to add to every metric. Format is labelName=labelValue. Flag can be specified multiple times.").Strings()
valueInterval = kingpin.Flag("value-interval", "Change series values every {interval} seconds.").Default("30").Int()
labelInterval = kingpin.Flag("series-interval", "Change series_id label values every {interval} seconds.").Default("60").Int()
metricInterval = kingpin.Flag("metric-interval", "Change __name__ label values every {interval} seconds.").Default("120").Int()
seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change' and 'double-halve' modes.").Default("30").Int()
seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve'").Default("default").String()
seriesChangeInterval = kingpin.Flag("series-change-interval", "Change the number of series every {interval} seconds. Applies to 'gradual-change', 'double-halve' and 'spike' modes.").Default("30").Int()
seriesOperationMode = kingpin.Flag("series-operation-mode", "Mode of operation: 'gradual-change', 'double-halve', 'spike'").Default("default").String()
port = kingpin.Flag("port", "Port to serve at").Default("9001").Int()
remoteURL = kingpin.Flag("remote-url", "URL to send samples via remote_write API.").URL()
remotePprofURLs = kingpin.Flag("remote-pprof-urls", "a list of urls to download pprofs during the remote write: --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/heap --remote-pprof-urls=http://127.0.0.1:10902/debug/pprof/profile").URLList()
Expand Down Expand Up @@ -73,7 +74,14 @@ func main() {
" Usage: ./avalanche --series-operation-mode=gradual-change --series-change-interval=30 --series-change-rate=100 --max-series-count=2000 --min-series-count=200\n" +
" Description: This mode gradually increases the series count by seriesChangeRate on each tick up to maxSeriesCount,\n" +
" then decreases it back to the minSeriesCount, and repeats this cycle indefinitely.\n" +
" The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1."
" The series count is incremented by seriesChangeRate on each tick, ensuring it never drops below 1." +
"\n" +
" spike:\n" +
" Periodically spikes the series count by a given multiplier.\n" +
" Usage: ./avalanche --series-operation-mode=spike --series-change-interval=180 --series-count=100 --spike-multiplier=1.5\n" +
" Description: This mode periodically increases the series count by a spike multiplier on one tick and\n" +
" then returns it to the original count on the next tick. This pattern repeats indefinitely,\n" +
" creating a spiking effect in the series count.\n"

kingpin.Parse()
if *maxSeriesCount <= *minSeriesCount {
Expand All @@ -91,7 +99,7 @@ func main() {

stop := make(chan struct{})
defer close(stop)
updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *seriesOperationMode, *constLabels, stop)
updateNotify, err := metrics.RunMetrics(*metricCount, *labelCount, *seriesCount, *seriesChangeRate, *maxSeriesCount, *minSeriesCount, *metricLength, *labelLength, *valueInterval, *labelInterval, *metricInterval, *seriesChangeInterval, *spikeMultiplier, *seriesOperationMode, *constLabels, stop)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to do it now, but we should look at breaking RunMetrics into separate functions for each of the run modes. The current functions such as this new handleSpikeMode could create the other go routines that we create within RunMetrics currently.

This would mean we don't have to pass so many parameters around across a few functions.

if err != nil {
log.Fatal(err)
}
Expand Down
37 changes: 36 additions & 1 deletion metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,32 @@ func handleGradualChangeMode(metricCount, metricLength, metricCycle, seriesCycle
}
}

func handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64, changeSeriesChan <-chan time.Time, updateNotify chan struct{}) {
initialSeriesCount := *currentSeriesCount
for tick := range changeSeriesChan {
metricsMux.Lock()
unregisterMetrics()
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle)
metricsMux.Unlock()

if *currentSeriesCount > initialSeriesCount {
*currentSeriesCount = initialSeriesCount
} else {
*currentSeriesCount = int(float64(initialSeriesCount) * spikeMultiplier)
}

fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount)

select {
case updateNotify <- struct{}{}:
default:
Comment on lines +218 to +220
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what's happening here, and looking at the other handle functions we're doing the same, what do we need this select and the update notify channel for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the updateNotify channel are used here to:

avalanche/metrics/write.go

Lines 140 to 149 in 4b732e1

select {
case <-c.config.UpdateNotify:
log.Println("updating remote write metrics")
tss, err = collectMetrics()
if err != nil {
merr.Add(err)
}
default:
tss = updateTimetamps(tss)
}

Each time a notification is received, the metrics are updated and logged.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! We learned something. It looks like updateNotify is only used for the write mode, where avalanche is remote writing somewhere. So this isn't needed for what we're trying to do with prombench, but it is required in general 👍

}
}
}

// RunMetrics creates a set of Prometheus test series that update over time
func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) {
func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval int, spikeMultiplier float64, seriesOperationMode string, constLabels []string, stop chan struct{}) (chan struct{}, error) {
labelKeys := make([]string, labelCount)
for idx := 0; idx < labelCount; idx++ {
labelKeys[idx] = fmt.Sprintf("label_key_%s_%v", strings.Repeat("k", labelLength), idx)
Expand Down Expand Up @@ -249,6 +273,17 @@ func RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSerie
go handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, valueTick)
go handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, seriesTick)

case "spike":
if spikeMultiplier < 1 {
return nil, fmt.Errorf("error: spikeMultiplier must be greater than or equal to 1, got %f", spikeMultiplier)
}
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, spikeMultiplier, seriesChangeInterval)
go handleSpikeMode(metricCount, metricLength, metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount, spikeMultiplier, changeSeriesTick.C, updateNotify)
go handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, valueTick)
go handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle, updateNotify, seriesTick)

default:
registerMetrics(metricCount, metricLength, metricCycle, labelKeys)
cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
Expand Down
51 changes: 48 additions & 3 deletions metrics/serve_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) {
labelCount = 1
maxSeriesCount = 10
minSeriesCount = 1
spikeMultiplier = 1.5
seriesChangeRate = 1
metricLength = 1
labelLength = 1
Expand All @@ -60,7 +61,7 @@ func TestRunMetricsSeriesCountChangeDoubleHalve(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)
time.Sleep(2 * time.Second)
for i := 0; i < 4; i++ {
Expand All @@ -84,6 +85,7 @@ func TestRunMetricsGradualChange(t *testing.T) {
seriesCount = 100
maxSeriesCount = 30
minSeriesCount = 10
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
Expand All @@ -100,7 +102,7 @@ func TestRunMetricsGradualChange(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
Expand Down Expand Up @@ -142,6 +144,7 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) {
seriesCount = 100
maxSeriesCount = 10
minSeriesCount = 100
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
Expand All @@ -158,6 +161,48 @@ func TestRunMetricsWithInvalidSeriesCounts(t *testing.T) {

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, operationMode, []string{constLabel}, stop)
_, err := RunMetrics(metricCount, labelCount, seriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.Error(t, err)
}

func TestRunMetricsSpikeChange(t *testing.T) {
const (
metricCount = 1
labelCount = 1
initialSeriesCount = 100
maxSeriesCount = 30
minSeriesCount = 10
spikeMultiplier = 1.5
seriesChangeRate = 10
metricLength = 1
labelLength = 1
valueInterval = 100
seriesInterval = 100
metricInterval = 100
seriesChangeInterval = 10
operationMode = "spike"
constLabel = "constLabel=test"
)

stop := make(chan struct{})
defer close(stop)

promRegistry = prometheus.NewRegistry()

_, err := RunMetrics(metricCount, labelCount, initialSeriesCount, seriesChangeRate, maxSeriesCount, minSeriesCount, metricLength, labelLength, valueInterval, seriesInterval, metricInterval, seriesChangeInterval, spikeMultiplier, operationMode, []string{constLabel}, stop)
assert.NoError(t, err)

time.Sleep(2 * time.Second)
for i := 0; i < 4; i++ {
time.Sleep(time.Duration(seriesChangeInterval) * time.Second)
if i%2 == 0 {
currentCount := countSeries(t, promRegistry)
expectedCount := initialSeriesCount
assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Halved series count should be %d but got %d", expectedCount, currentCount))
} else {
currentCount := countSeries(t, promRegistry)
expectedCount := int(float64(initialSeriesCount) * spikeMultiplier)
assert.Equal(t, expectedCount, currentCount, fmt.Sprintf("Multiplied the series count by %.1f, should be %d but got %d", spikeMultiplier, expectedCount, currentCount))
}
}
}
Loading