Skip to content

Commit

Permalink
Out-of-order Flag : Added option for sending out of order samples (#68)
Browse files Browse the repository at this point in the history
* changed implementation of shuffleTimeStamps

Signed-off-by: Vanshikav123 <[email protected]>

* resolving conflicts and improve test

Signed-off-by: Vanshikav123 <[email protected]>

---------

Signed-off-by: Vanshikav123 <[email protected]>
Signed-off-by: Vanshika <[email protected]>
  • Loading branch information
Vanshikav123 authored Sep 16, 2024
1 parent 7534117 commit 8494cab
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 6 deletions.
6 changes: 4 additions & 2 deletions cmd/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var (
remoteTenant = kingpin.Flag("remote-tenant", "Tenant ID to include in remote_write send").Default("0").String()
tlsClientInsecure = kingpin.Flag("tls-client-insecure", "Skip certificate check on tls connection").Default("false").Bool()
remoteTenantHeader = kingpin.Flag("remote-tenant-header", "Tenant ID to include in remote_write send. The default, is the default tenant header expected by Cortex.").Default("X-Scope-OrgID").String()
outOfOrder = kingpin.Flag("out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").Bool()
)

func main() {
Expand Down Expand Up @@ -123,6 +124,7 @@ func main() {
InsecureSkipVerify: *tlsClientInsecure,
},
TenantHeader: *remoteTenantHeader,
OutOfOrder: *outOfOrder,
}

// Collect Pprof during the write only if not collecting within a regular interval.
Expand All @@ -136,7 +138,7 @@ func main() {
)
if *remotePprofInterval > 0 {
if len(*remotePprofURLs) == 0 {
log.Fatal("remote profiling interval specified wihout any remote pprof urls")
log.Fatal("remote profiling interval specified without any remote pprof urls")
}
suffix := rand.Intn(1000)
go func() {
Expand Down Expand Up @@ -170,7 +172,7 @@ func main() {
return
}

fmt.Printf("Serving ur metrics at localhost:%v/metrics\n", *port)
fmt.Printf("Serving your metrics at localhost:%v/metrics\n", *port)
err = metrics.ServeMetrics(*port)
if err != nil {
log.Fatal(err)
Expand Down
23 changes: 19 additions & 4 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type ConfigWrite struct {
Tenant string
TLSClientConfig tls.Config
TenantHeader string
OutOfOrder bool
}

// Client for the remote write requests.
Expand Down Expand Up @@ -108,7 +109,7 @@ func cloneRequest(r *http.Request) *http.Request {
}

func (c *Client) write() error {
tss, err := collectMetrics()
tss, err := collectMetrics(c.config.OutOfOrder)
if err != nil {
return err
}
Expand Down Expand Up @@ -140,7 +141,7 @@ func (c *Client) write() error {
select {
case <-c.config.UpdateNotify:
log.Println("updating remote write metrics")
tss, err = collectMetrics()
tss, err = collectMetrics(c.config.OutOfOrder)
if err != nil {
merr.Add(err)
}
Expand Down Expand Up @@ -193,14 +194,28 @@ func updateTimetamps(tss []prompb.TimeSeries) []prompb.TimeSeries {
return tss
}

func collectMetrics() ([]prompb.TimeSeries, error) {
func collectMetrics(outOfOrder bool) ([]prompb.TimeSeries, error) {
metricsMux.Lock()
defer metricsMux.Unlock()
metricFamilies, err := promRegistry.Gather()
if err != nil {
return nil, err
}
return ToTimeSeriesSlice(metricFamilies), nil
tss := ToTimeSeriesSlice(metricFamilies)
if outOfOrder {
tss = shuffleTimestamps(tss)
}
return tss, nil
}
func shuffleTimestamps(tss []prompb.TimeSeries) []prompb.TimeSeries {
now := time.Now().UnixMilli()
offsets := []int64{0, -60 * 1000, -5 * 60 * 1000}

for i := range tss {
offset := offsets[i%len(offsets)]
tss[i].Samples[0].Timestamp = now + offset
}
return tss
}

// ToTimeSeriesSlice converts a slice of metricFamilies containing samples into a slice of TimeSeries
Expand Down
60 changes: 60 additions & 0 deletions metrics/write_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"testing"
"time"

"github.com/prometheus/prometheus/prompb"
)

func TestShuffleTimestamps(t *testing.T) {
now := time.Now().UnixMilli()

tss := []prompb.TimeSeries{
{Samples: []prompb.Sample{{Timestamp: now}}},
{Samples: []prompb.Sample{{Timestamp: now}}},
{Samples: []prompb.Sample{{Timestamp: now}}},
}

shuffledTSS := shuffleTimestamps(tss)

offsets := []int64{0, -60 * 1000, -5 * 60 * 1000}
for _, ts := range shuffledTSS {
timestampValid := false
for _, offset := range offsets {
expectedTimestamp := now + offset
if ts.Samples[0].Timestamp == expectedTimestamp {
timestampValid = true
break
}
}
if !timestampValid {
t.Errorf("Timestamp %v is not in the expected offsets: %v", ts.Samples[0].Timestamp, offsets)
}
}

outOfOrder := false
for i := 1; i < len(shuffledTSS); i++ {
if shuffledTSS[i].Samples[0].Timestamp < shuffledTSS[i-1].Samples[0].Timestamp {
outOfOrder = true
break
}
}

if !outOfOrder {
t.Error("Timestamps are not out of order")
}
}

0 comments on commit 8494cab

Please sign in to comment.