Skip to content

Commit

Permalink
fixed regressions around series leak due to races; refactor for clari…
Browse files Browse the repository at this point in the history
…ty (#87)

Signed-off-by: bwplotka <[email protected]>
  • Loading branch information
bwplotka authored Sep 26, 2024
1 parent b7db738 commit 1b88ab7
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 109 deletions.
204 changes: 97 additions & 107 deletions metrics/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Collector struct {
histograms []*prometheus.HistogramVec
nativeHistograms []*prometheus.HistogramVec
summaries []*prometheus.SummaryVec
labelKeys []string

updateNotifyCh chan struct{}
stopCh chan struct{}
Expand Down Expand Up @@ -198,22 +199,23 @@ func help(mName string) string {
return fmt.Sprintf("Metric %v is generated by https://github.com/prometheus-community/avalanche project allowing you to load test your Prometheus or Prometheus-compatible systems. It's not too long, not too short to simulate, often chunky descriptions on user metrics. It also contains metric name, so help is slighly different across metrics.", mName)
}

func (c *Collector) recreateMetrics(metricCycle int, labelKeys []string) {
func (c *Collector) recreateMetrics(unsafeGetState readOnlyStateFn) {
c.mu.Lock()
defer c.mu.Unlock()
s := unsafeGetState()
for id := range c.gauges {
mName := fmt.Sprintf("avalanche_gauge_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_gauge_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
gauge := prometheus.NewGaugeVec(
prometheus.GaugeOpts{Name: mName, Help: help(mName)},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.gauges[id] = gauge
}
for id := range c.counters {
mName := fmt.Sprintf("avalanche_counter_metric_%s_%v_%v_total", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_counter_metric_%s_%v_%v_total", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
counter := prometheus.NewCounterVec(
prometheus.CounterOpts{Name: mName, Help: help(mName)},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.counters[id] = counter
}
Expand All @@ -223,28 +225,28 @@ func (c *Collector) recreateMetrics(metricCycle int, labelKeys []string) {
bkts[i] = 0.0001 * math.Pow10(i)
}
for id := range c.histograms {
mName := fmt.Sprintf("avalanche_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
histogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: mName, Help: help(mName), Buckets: bkts},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.histograms[id] = histogram
}

for id := range c.nativeHistograms {
mName := fmt.Sprintf("avalanche_native_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_native_histogram_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
histogram := prometheus.NewHistogramVec(
prometheus.HistogramOpts{Name: mName, Help: help(mName), NativeHistogramBucketFactor: 1.1},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.nativeHistograms[id] = histogram
}

for id := range c.summaries {
mName := fmt.Sprintf("avalanche_summary_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), metricCycle, id)
mName := fmt.Sprintf("avalanche_summary_metric_%s_%v_%v", strings.Repeat("m", c.cfg.MetricLength), s.metricCycle, id)
summary := prometheus.NewSummaryVec(
prometheus.SummaryOpts{Name: mName, Help: help(mName)},
append([]string{"series_id", "cycle_id"}, labelKeys...),
append([]string{"series_id", "cycle_id"}, c.labelKeys...),
)
c.summaries[id] = summary
}
Expand All @@ -265,58 +267,47 @@ type seriesDeleter interface {
Delete(labels prometheus.Labels) bool
}

func deleteValues[T seriesDeleter](metrics []T, labelKeys, labelValues []string, seriesCount, seriesCycle int) {
func deleteValues[T seriesDeleter](metrics []T, labelKeys []string, s metricState) {
for _, metric := range metrics {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for idx := 0; idx < s.seriesCount; idx++ {
labels := seriesLabels(idx, s.seriesCycle, labelKeys, s.labelValues)
metric.Delete(labels)
}
}
}

func (c *Collector) cycleValues(labelKeys, labelValues []string, seriesCount, seriesCycle int) {
func (c *Collector) cycleValues(unsafeGetState readOnlyStateFn) {
c.mu.Lock()
defer c.mu.Unlock()

for _, metric := range c.gauges {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
s := unsafeGetState()
for idx := 0; idx < s.seriesCount; idx++ {
labels := seriesLabels(idx, s.seriesCycle, c.labelKeys, s.labelValues)
for _, metric := range c.gauges {
metric.With(labels).Set(float64(c.valGen.Intn(100)))
}
}
for _, metric := range c.counters {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for _, metric := range c.counters {
metric.With(labels).Add(float64(c.valGen.Intn(100)))
}
}
for _, metric := range c.histograms {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for _, metric := range c.histograms {
metric.With(labels).Observe(float64(c.valGen.Intn(100)))
}
}
for _, metric := range c.nativeHistograms {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for _, metric := range c.nativeHistograms {
metric.With(labels).Observe(float64(c.valGen.Intn(100)))
}
}
for _, metric := range c.summaries {
for idx := 0; idx < seriesCount; idx++ {
labels := seriesLabels(idx, seriesCycle, labelKeys, labelValues)
for _, metric := range c.summaries {
metric.With(labels).Observe(float64(c.valGen.Intn(100)))
}
}
}

func (c *Collector) handleValueTicks(labelKeys, labelValues *[]string, currentSeriesCount, seriesCycle *int) {
func (c *Collector) handleValueTicks(unsafeGetState readOnlyStateFn) {
if c.valueTick == nil {
return
}
for tick := range c.valueTick.C {
fmt.Printf("%v: refreshing metric values\n", tick)
c.cycleValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
c.cycleValues(unsafeGetState)

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -325,21 +316,22 @@ func (c *Collector) handleValueTicks(labelKeys, labelValues *[]string, currentSe
}
}

func (c *Collector) handleSeriesTicks(labelKeys, labelValues *[]string, currentSeriesCount, seriesCycle *int) {
func (c *Collector) handleSeriesTicks(seriesCycle *int, unsafeGetState readOnlyStateFn) {
if c.seriesTick == nil {
return
}
for tick := range c.seriesTick.C {
c.mu.Lock()
fmt.Printf("%v: refreshing series cycle\n", tick)
deleteValues(c.gauges, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
deleteValues(c.counters, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
deleteValues(c.histograms, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
deleteValues(c.nativeHistograms, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
deleteValues(c.summaries, *labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
(*seriesCycle)++
s := unsafeGetState()
deleteValues(c.gauges, c.labelKeys, s)
deleteValues(c.counters, c.labelKeys, s)
deleteValues(c.histograms, c.labelKeys, s)
deleteValues(c.nativeHistograms, c.labelKeys, s)
deleteValues(c.summaries, c.labelKeys, s)
*seriesCycle++
c.mu.Unlock()
c.cycleValues(*labelKeys, *labelValues, *currentSeriesCount, *seriesCycle)
c.cycleValues(unsafeGetState)

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -348,34 +340,34 @@ func (c *Collector) handleSeriesTicks(labelKeys, labelValues *[]string, currentS
}
}

func (c *Collector) handleMetricTicks(metricCycle *int, labelKeys *[]string) {
func (c *Collector) handleMetricTicks(metricCycle *int, unsafeGetState readOnlyStateFn) {
if c.metricTick == nil {
return
}

for tick := range c.metricTick.C {
fmt.Printf("%v: refreshing metric cycle\n", tick)
(*metricCycle)++
c.recreateMetrics(*metricCycle, *labelKeys)
*metricCycle++
c.recreateMetrics(unsafeGetState)
select {
case c.updateNotifyCh <- struct{}{}:
default:
}
}
}

func changeSeriesGradual(seriesChangeRate, maxSeriesCount, minSeriesCount, currentSeriesCount *int, seriesIncrease *bool) {
func changeSeriesGradual(seriesChangeRate, maxSeriesCount, minSeriesCount int, currentSeriesCount *int, seriesIncrease *bool) {
fmt.Printf("Current series count: %d\n", *currentSeriesCount)
if *seriesIncrease {
*currentSeriesCount += *seriesChangeRate
if *currentSeriesCount >= *maxSeriesCount {
*currentSeriesCount = *maxSeriesCount
*currentSeriesCount += seriesChangeRate
if *currentSeriesCount >= maxSeriesCount {
*currentSeriesCount = maxSeriesCount
*seriesIncrease = false
}
} else {
*currentSeriesCount -= *seriesChangeRate
if *currentSeriesCount < *minSeriesCount {
*currentSeriesCount = *minSeriesCount
*currentSeriesCount -= seriesChangeRate
if *currentSeriesCount < minSeriesCount {
*currentSeriesCount = minSeriesCount
*seriesIncrease = true
}
}
Expand All @@ -393,19 +385,20 @@ func changeSeriesDoubleHalve(currentSeriesCount *int, seriesIncrease *bool) {
*seriesIncrease = !*seriesIncrease
}

func (c *Collector) handleDoubleHalveMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int) {
func (c *Collector) handleDoubleHalveMode(seriesCount *int, unsafeGetState readOnlyStateFn) {
if c.changeSeriesTick == nil {
return
}

seriesIncrease := true
for tick := range c.changeSeriesTick.C {
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle)
c.recreateMetrics(unsafeGetState)
c.cycleValues(unsafeGetState)

changeSeriesDoubleHalve(currentSeriesCount, &seriesIncrease)

fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount)
c.mu.Lock()
changeSeriesDoubleHalve(seriesCount, &seriesIncrease)
fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *seriesCount)
c.mu.Unlock()

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -414,20 +407,20 @@ func (c *Collector) handleDoubleHalveMode(metricCycle, seriesCycle int, labelKey
}
}

func (c *Collector) handleGradualChangeMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, seriesCount *int) {
func (c *Collector) handleGradualChangeMode(seriesCount *int, unsafeGetState readOnlyStateFn) {
if c.changeSeriesTick == nil {
return
}

*seriesCount = c.cfg.MinSeriesCount
seriesIncrease := true
for tick := range c.changeSeriesTick.C {
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, *seriesCount, seriesCycle)

changeSeriesGradual(&c.cfg.SeriesChangeRate, &c.cfg.MaxSeriesCount, &c.cfg.MinSeriesCount, seriesCount, &seriesIncrease)
c.recreateMetrics(unsafeGetState)
c.cycleValues(unsafeGetState)

c.mu.Lock()
changeSeriesGradual(c.cfg.SeriesChangeRate, c.cfg.MaxSeriesCount, c.cfg.MinSeriesCount, seriesCount, &seriesIncrease)
fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *seriesCount)
c.mu.Unlock()

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -436,23 +429,24 @@ func (c *Collector) handleGradualChangeMode(metricCycle, seriesCycle int, labelK
}
}

func (c *Collector) handleSpikeMode(metricCycle, seriesCycle int, labelKeys, labelValues []string, currentSeriesCount *int, spikeMultiplier float64) {
func (c *Collector) handleSpikeMode(seriesCount *int, unsafeGetState readOnlyStateFn, spikeMultiplier float64) {
if c.changeSeriesTick == nil {
return
}

initialSeriesCount := *currentSeriesCount
initialSeriesCount := *seriesCount
for tick := range c.changeSeriesTick.C {
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, *currentSeriesCount, seriesCycle)
c.recreateMetrics(unsafeGetState)
c.cycleValues(unsafeGetState)

if *currentSeriesCount > initialSeriesCount {
*currentSeriesCount = initialSeriesCount
c.mu.Lock()
if *seriesCount > initialSeriesCount {
*seriesCount = initialSeriesCount
} else {
*currentSeriesCount = int(float64(initialSeriesCount) * spikeMultiplier)
*seriesCount = int(float64(initialSeriesCount) * spikeMultiplier)
}

fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *currentSeriesCount)
fmt.Printf("%v: Adjusting series count. New count: %d\n", tick, *seriesCount)
c.mu.Unlock()

select {
case c.updateNotifyCh <- struct{}{}:
Expand All @@ -461,6 +455,15 @@ func (c *Collector) handleSpikeMode(metricCycle, seriesCycle int, labelKeys, lab
}
}

// metricState represents current state of ids, cycles and current series.
type metricState struct {
seriesCount, seriesCycle, metricCycle int

labelValues []string
}

type readOnlyStateFn func() metricState

// Run creates a set of Prometheus test series that update over time.
// NOTE: Only one execution of RunMetrics is currently expected.
func (c *Collector) Run() error {
Expand All @@ -478,54 +481,41 @@ func (c *Collector) Run() error {
labelValues = append(labelValues, split[1])
}

metricCycle := 0
seriesCycle := 0
currentSeriesCount := c.cfg.SeriesCount
mutableState := &metricState{seriesCount: c.cfg.SeriesCount}
// unsafe means you need to lock c.mu to use it.
unsafeReadOnlyGetState := func() metricState { return *mutableState }

c.mu.Lock()
c.mu.Lock() // Just to make race detector happy, not really needed in practice.
c.gauges = make([]*prometheus.GaugeVec, c.cfg.GaugeMetricCount)
c.counters = make([]*prometheus.CounterVec, c.cfg.CounterMetricCount)
c.histograms = make([]*prometheus.HistogramVec, c.cfg.HistogramMetricCount)
c.nativeHistograms = make([]*prometheus.HistogramVec, c.cfg.NativeHistogramMetricCount)
c.summaries = make([]*prometheus.SummaryVec, c.cfg.SummaryMetricCount)
c.mu.Unlock()

c.recreateMetrics(unsafeReadOnlyGetState)

switch c.cfg.SeriesOperationMode {
case "double-halve":
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
fmt.Printf("Starting double-halve mode; starting series: %d, change series interval: %d seconds\n", currentSeriesCount, c.cfg.SeriesChangeInterval)
go c.handleDoubleHalveMode(metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount)
go c.handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleMetricTicks(&metricCycle, &labelKeys)
fmt.Printf("Starting double-halve mode; starting series: %d, change series interval: %d seconds\n", c.cfg.SeriesCount, c.cfg.SeriesChangeInterval)
go c.handleDoubleHalveMode(&mutableState.seriesCount, unsafeReadOnlyGetState)

case "gradual-change":
fmt.Printf("Starting gradual-change mode; min series: %d, max series: %d, series change rate: %d, change series interval: %d seconds\n", c.cfg.MinSeriesCount, c.cfg.MaxSeriesCount, c.cfg.SeriesChangeRate, c.cfg.SeriesChangeInterval)

c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, c.cfg.MinSeriesCount, seriesCycle)
go c.handleGradualChangeMode(metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount)
go c.handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleMetricTicks(&metricCycle, &labelKeys)
c.mu.Lock()
mutableState.seriesCount = c.cfg.MinSeriesCount
c.mu.Unlock()
go c.handleGradualChangeMode(&mutableState.seriesCount, unsafeReadOnlyGetState)

case "spike":
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", currentSeriesCount, c.cfg.SpikeMultiplier, c.cfg.SeriesChangeInterval)
go c.handleSpikeMode(metricCycle, seriesCycle, labelKeys, labelValues, &currentSeriesCount, c.cfg.SpikeMultiplier)
go c.handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleMetricTicks(&metricCycle, &labelKeys)

default:
c.recreateMetrics(metricCycle, labelKeys)
c.cycleValues(labelKeys, labelValues, currentSeriesCount, seriesCycle)
go c.handleValueTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleSeriesTicks(&labelKeys, &labelValues, &currentSeriesCount, &seriesCycle)
go c.handleMetricTicks(&metricCycle, &labelKeys)
fmt.Printf("Starting spike mode; initial series: %d, spike multiplier: %f, spike interval: %v\n", c.cfg.SeriesCount, c.cfg.SpikeMultiplier, c.cfg.SeriesChangeInterval)
go c.handleSpikeMode(&mutableState.seriesCount, unsafeReadOnlyGetState, c.cfg.SpikeMultiplier)
}
c.cycleValues(unsafeReadOnlyGetState)

go c.handleValueTicks(unsafeReadOnlyGetState)
go c.handleSeriesTicks(&mutableState.seriesCycle, unsafeReadOnlyGetState)
go c.handleMetricTicks(&mutableState.metricCycle, unsafeReadOnlyGetState)

<-c.stopCh
return nil
Expand Down
Loading

0 comments on commit 1b88ab7

Please sign in to comment.