Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed regressions around series leak due to races; refactor for clarity #87

Merged
merged 1 commit into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 97 additions & 107 deletions metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Collector struct {
histograms []*prometheus.HistogramVec
nativeHistograms []*prometheus.HistogramVec
summaries []*prometheus.SummaryVec
labelKeys []string

updateNotifyCh chan struct{}
stopCh chan struct{}
Expand Down Expand Up @@ -198,22 +199,23 @@ func help(mName string) string {
return fmt.Sprintf("Metric %v is generated by https://github.com/prometheus-community/avalanche project allowing you to load test your Prometheus or Prometheus-compatible systems. It's not too long, not too short to simulate, often chunky descriptions on user metrics. It also contains metric name, so help is slighly different across metrics.", mName)
}

func (c *Collector) recreateMetrics(metricCycle int, labelKeys []string) {
func (c *Collector) recreateMetrics(unsafeGetState readOnlyStateFn) {
c.mu.Lock()
defer c.mu.Unlock()
s := unsafeGetState()
for id := range c.gauges {
mName := fmt.Sprintf("avalanche_gauge_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_gauge_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: mName, Help: help(mName)},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.gauges[id] = gauge
}
for id := range c.counters {
mName := fmt.Sprintf("avalanche_counter_metric_%s_%v_%v_total", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_counter_metric_%s_%v_%v_total", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
counter := prometheus.NewCounterVec(
prometheus.CounterOpts{Name: mName, Help: help(mName)},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.counters[id] = counter
}
Expand All @@ -223,28 +225,28 @@ func (c *Collector) recreateMetrics(metricCycle int, labelKeys []string) {
bkts[i] = 0.0001 * math.Pow10(i)
}
for id := range c.histograms {
mName := fmt.Sprintf("avalanche_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
histogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: mName, Help: help(mName), Buckets: bkts},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.histograms[id] = histogram
}

for id := range c.nativeHistograms {
mName := fmt.Sprintf("avalanche_native_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_native_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
histogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: mName, Help: help(mName), NativeHistogramBucketFactor: 1.1},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.nativeHistograms[id] = histogram
}

for id := range c.summaries {
mName := fmt.Sprintf("avalanche_summary_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_summary_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
summary := prometheus.NewSummaryVec(
prometheus.SummaryOpts{Name: mName, Help: help(mName)},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.summaries[id] = summary
}
Expand All @@ -265,58 +267,47 @@ type seriesDeleter interface {
Delete(labels prometheus.Labels) bool
}

func deleteValues[T seriesDeleter](metrics []T, labelKeys, labelValues []string, seriesCount, seriesCycle int) {
func deleteValues[T seriesDeleter](metrics []T, labelKeys []string, s metricState) {
for _, metric := range metrics {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for idx := 0; idx < s.seriesCount; idx++ {
labels := seriesLabels(idx, s.seriesCycle, labelKeys, s.labelValues)
metric.Delete(labels)
}
}
}

func (c *Collector) cycleValues(labelKeys, labelValues []string, seriesCount, seriesCycle int) {
func (c *Collector) cycleValues(unsafeGetState readOnlyStateFn) {
c.mu.Lock()
defer c.mu.Unlock()

for _, metric := range c.gauges {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
s := unsafeGetState()
for idx := 0; idx < s.seriesCount; idx++ {
labels := seriesLabels(idx, s.seriesCycle, c.labelKeys, s.labelValues)
for _, metric := range c.gauges {
metric.With(labels).Set(float64(c.valGen.Intn(100)))
}
}
for _, metric := range c.counters {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for _, metric := range c.counters {
metric.With(labels).Add(float64(c.valGen.Intn(100)))
}
}
for _, metric := range c.histograms {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for _, metric := range c.histograms {
metric.With(labels).Observe(float64(c.valGen.Intn(100)))
}
}
for _, metric := range c.nativeHistograms {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for _, metric := range c.nativeHistograms {
metric.With(labels).Observe(float64(c.valGen.Intn(100)))
}
}
for _, metric := range c.summaries {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for _, metric := range c.summaries {
metric.With(labels).Observe(float64(c.valGen.Intn(100)))
}
}
}

func (c *Collector) handleValueTicks(labelKeys, labelValues *[]string, currentSeriesCount, seriesCycle *int) {
func (c *Collector) handleValueTicks(unsafeGetState readOnlyStateFn) {
if c.valueTick == nil {
return
}
for tick := range c.valueTick.C {
fmt.Printf("%v: refreshing metric values\n", tick)
c.cycleValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
c.cycleValues(unsafeGetState)

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -325,21 +316,22 @@ func (c *Collector) handleValueTicks(labelKeys, labelValues *[]string, currentSe
}
}

func (c *Collector) handleSeriesTicks(labelKeys, labelValues *[]string, currentSeriesCount, seriesCycle *int) {
func (c *Collector) handleSeriesTicks(seriesCycle *int, unsafeGetState readOnlyStateFn) {
if c.seriesTick == nil {
return
}
for tick := range c.seriesTick.C {
c.mu.Lock()
fmt.Printf("%v: refreshing series cycle\n", tick)
deleteValues(c.gauges, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
deleteValues(c.counters, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
deleteValues(c.histograms, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
deleteValues(c.nativeHistograms, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
deleteValues(c.summaries, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
(*seriesCycle)++
s := unsafeGetState()
deleteValues(c.gauges, c.labelKeys, s)
deleteValues(c.counters, c.labelKeys, s)
deleteValues(c.histograms, c.labelKeys, s)
deleteValues(c.nativeHistograms, c.labelKeys, s)
deleteValues(c.summaries, c.labelKeys, s)
*seriesCycle++
c.mu.Unlock()
c.cycleValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
c.cycleValues(unsafeGetState)

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -348,34 +340,34 @@ func (c *Collector) handleSeriesTicks(labelKeys, labelValues *[]string, currentS
}
}

func (c *Collector) handleMetricTicks(metricCycle *int, labelKeys *[]string) {
func (c *Collector) handleMetricTicks(metricCycle *int, unsafeGetState readOnlyStateFn) {
if c.metricTick == nil {
return
}

for tick := range c.metricTick.C {
fmt.Printf("%v: refreshing metric cycle\n", tick)
(*metricCycle)++
c.recreateMetrics(*metricCycle, *labelKeys)
*metricCycle++
c.recreateMetrics(unsafeGetState)
select {
case c.updateNotifyCh <- struct{}{}:
default:
}
}
}

func changeSeriesGradual(seriesChangeRate, maxSeriesCount, minSeriesCount, currentSeriesCount *int, seriesIncrease *bool) {
func changeSeriesGradual(seriesChangeRate, maxSeriesCount, minSeriesCount int, currentSeriesCount *int, seriesIncrease *bool) {
fmt.Printf("Current series count: %d\n", *currentSeriesCount)
if *seriesIncrease {
*currentSeriesCount += *seriesChangeRate
if *currentSeriesCount >= *maxSeriesCount {
*currentSeriesCount = *maxSeriesCount
*currentSeriesCount += seriesChangeRate
if *currentSeriesCount >= maxSeriesCount {
*currentSeriesCount = maxSeriesCount
*seriesIncrease = false
}
} else {
*currentSeriesCount -= *seriesChangeRate
if *currentSeriesCount < *minSeriesCount {
*currentSeriesCount = *minSeriesCount
*currentSeriesCount -= seriesChangeRate
if *currentSeriesCount < minSeriesCount {
*currentSeriesCount = minSeriesCount
*seriesIncrease = true
}
}
Expand All @@ -393,19 +385,20 @@ func changeSeriesDoubleHalve(currentSeriesCount *int, seriesIncrease *bool) {
*seriesIncrease = !*seriesIncrease
}

func (c *Collector) handleDoubleHalveMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int) {
func (c *Collector) handleDoubleHalveMode(seriesCount *int, unsafeGetState readOnlyStateFn) {
if c.changeSeriesTick == nil {
return
}

seriesIncrease := true
for tick := range c.changeSeriesTick.C {
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle)
c.recreateMetrics(unsafeGetState)
c.cycleValues(unsafeGetState)

changeSeriesDoubleHalve(currentSeriesCount, &seriesIncrease)

fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount)
c.mu.Lock()
changeSeriesDoubleHalve(seriesCount, &seriesIncrease)
fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *seriesCount)
c.mu.Unlock()

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -414,20 +407,20 @@ func (c *Collector) handleDoubleHalveMode(metricCycle, seriesCycle int, labelKey
}
}

func (c *Collector) handleGradualChangeMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, seriesCount *int) {
func (c *Collector) handleGradualChangeMode(seriesCount *int, unsafeGetState readOnlyStateFn) {
if c.changeSeriesTick == nil {
return
}

*seriesCount = c.cfg.MinSeriesCount
seriesIncrease := true
for tick := range c.changeSeriesTick.C {
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, *seriesCount, seriesCycle)

changeSeriesGradual(&c.cfg.SeriesChangeRate, &c.cfg.MaxSeriesCount, &c.cfg.MinSeriesCount, seriesCount, &seriesIncrease)
c.recreateMetrics(unsafeGetState)
c.cycleValues(unsafeGetState)

c.mu.Lock()
changeSeriesGradual(c.cfg.SeriesChangeRate, c.cfg.MaxSeriesCount, c.cfg.MinSeriesCount, seriesCount, &seriesIncrease)
fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *seriesCount)
c.mu.Unlock()

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -436,23 +429,24 @@ func (c *Collector) handleGradualChangeMode(metricCycle, seriesCycle int, labelK
}
}

func (c *Collector) handleSpikeMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64) {
func (c *Collector) handleSpikeMode(seriesCount *int, unsafeGetState readOnlyStateFn, spikeMultiplier float64) {
if c.changeSeriesTick == nil {
return
}

initialSeriesCount := *currentSeriesCount
initialSeriesCount := *seriesCount
for tick := range c.changeSeriesTick.C {
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle)
c.recreateMetrics(unsafeGetState)
c.cycleValues(unsafeGetState)

if *currentSeriesCount > initialSeriesCount {
*currentSeriesCount = initialSeriesCount
c.mu.Lock()
if *seriesCount > initialSeriesCount {
*seriesCount = initialSeriesCount
} else {
*currentSeriesCount = int(float64(initialSeriesCount) * spikeMultiplier)
*seriesCount = int(float64(initialSeriesCount) * spikeMultiplier)
}

fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount)
fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *seriesCount)
c.mu.Unlock()

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -461,6 +455,15 @@ func (c *Collector) handleSpikeMode(metricCycle, seriesCycle int, labelKeys, lab
}
}

// metricState represents current state of ids, cycles and current series.
type metricState struct {
seriesCount, seriesCycle, metricCycle int

labelValues []string
}

type readOnlyStateFn func() metricState

// Run creates a set of Prometheus test series that update over time.
// NOTE: Only one execution of RunMetrics is currently expected.
func (c *Collector) Run() error {
Expand All @@ -478,54 +481,41 @@ func (c *Collector) Run() error {
labelValues = append(labelValues, split[1])
}

metricCycle := 0
seriesCycle := 0
currentSeriesCount := c.cfg.SeriesCount
mutableState := &metricState{seriesCount: c.cfg.SeriesCount}
// unsafe means you need to lock c.mu to use it.
unsafeReadOnlyGetState := func() metricState { return *mutableState }

c.mu.Lock()
c.mu.Lock() // Just to make race detector happy, not really needed in practice.
c.gauges = make([]*prometheus.GaugeVec, c.cfg.GaugeMetricCount)
c.counters = make([]*prometheus.CounterVec, c.cfg.CounterMetricCount)
c.histograms = make([]*prometheus.HistogramVec, c.cfg.HistogramMetricCount)
c.nativeHistograms = make([]*prometheus.HistogramVec, c.cfg.NativeHistogramMetricCount)
c.summaries = make([]*prometheus.SummaryVec, c.cfg.SummaryMetricCount)
c.mu.Unlock()

c.recreateMetrics(unsafeReadOnlyGetState)

switch c.cfg.SeriesOperationMode {
case "double-halve":
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
fmt.Printf("Starting double-halve mode; starting series: %d, change series interval: %d seconds\n", currentSeriesCount, c.cfg.SeriesChangeInterval)
go c.handleDoubleHalveMode(metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount)
go c.handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleMetricTicks(&metricCycle, &labelKeys)
fmt.Printf("Starting double-halve mode; starting series: %d, change series interval: %d seconds\n", c.cfg.SeriesCount, c.cfg.SeriesChangeInterval)
go c.handleDoubleHalveMode(&mutableState.seriesCount, unsafeReadOnlyGetState)

case "gradual-change":
fmt.Printf("Starting gradual-change mode; min series: %d, max series: %d, series change rate: %d, change series interval: %d seconds\n", c.cfg.MinSeriesCount, c.cfg.MaxSeriesCount, c.cfg.SeriesChangeRate, c.cfg.SeriesChangeInterval)

c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, c.cfg.MinSeriesCount, seriesCycle)
go c.handleGradualChangeMode(metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount)
go c.handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleMetricTicks(&metricCycle, &labelKeys)
c.mu.Lock()
mutableState.seriesCount = c.cfg.MinSeriesCount
c.mu.Unlock()
go c.handleGradualChangeMode(&mutableState.seriesCount, unsafeReadOnlyGetState)

case "spike":
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, c.cfg.SpikeMultiplier, c.cfg.SeriesChangeInterval)
go c.handleSpikeMode(metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount, c.cfg.SpikeMultiplier)
go c.handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleMetricTicks(&metricCycle, &labelKeys)

default:
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
go c.handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleMetricTicks(&metricCycle, &labelKeys)
fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", c.cfg.SeriesCount, c.cfg.SpikeMultiplier, c.cfg.SeriesChangeInterval)
go c.handleSpikeMode(&mutableState.seriesCount, unsafeReadOnlyGetState, c.cfg.SpikeMultiplier)
}
c.cycleValues(unsafeReadOnlyGetState)

go c.handleValueTicks(unsafeReadOnlyGetState)
go c.handleSeriesTicks(&mutableState.seriesCycle, unsafeReadOnlyGetState)
go c.handleMetricTicks(&mutableState.metricCycle, unsafeReadOnlyGetState)

<-c.stopCh
return nil
Expand Down
Loading
Loading