Skip to content

Commit

Permalink
Merge pull request #19 from Alb0t/ala-breaking-change-move-to-prom-hi…
Browse files Browse the repository at this point in the history
…stograms

Implement byte histograms
  • Loading branch information
Alb0t authored Jul 26, 2023
2 parents 2b3495a + 0996f05 commit dd5a9cd
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 77 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ debug
gorelease.sh
aerospike-ttl-exporter
testconf.yaml
__debug_bin
__debug_bin*
43 changes: 17 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,23 @@ The data currently exported by Aerospike histogram dumps is not accurate enough
Example output:
```
....
aerospike_expiration_ttl_counts_hist_bucket{namespace="myOtherNS",set="",le="1.79712e+08"} 70028
aerospike_expiration_ttl_counts_hist_bucket{namespace="myOtherNS",set="",le="1.80576e+08"} 70028
aerospike_expiration_ttl_counts_hist_bucket{namespace="myOtherNS",set="",le="+Inf"} 70028
aerospike_expiration_ttl_counts_hist_sum{namespace="myOtherNS",set=""} 3.68036698307e+11
aerospike_expiration_ttl_counts_hist_count{namespace="myOtherNS",set=""} 70028
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="1.3824e+07"} 145142
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="1.4688e+07"} 186596
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="1.56384e+07"} 223357
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="1.9008e+07"} 241662
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="+Inf"} 241699
aerospike_expiration_ttl_counts_hist_sum{namespace="myNS",set="Beans"} 3.166097393414e+12
aerospike_expiration_ttl_counts_hist_count{namespace="myNS",set="Beans"} 241699
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="boo",le="1.3824e+07"} 9056
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="boo",le="1.4688e+07"} 11760
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="boo",le="1.56384e+07"} 13648
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="boo",le="1.9008e+07"} 16000
aerospike_expiration_ttl_counts_hist_bucket{namespace="myNS",set="boo",le="+Inf"} 16000
aerospike_expiration_ttl_counts_hist_sum{namespace="myNS",set="boo"} 2.1257415038e+11
aerospike_expiration_ttl_counts_hist_count{namespace="myNS",set="boo"} 16000
aerospike_ttl_build_info{version="3.0.0"} 1
aerospike_ttl_scan_last_updated{namespace="myOtherNS",set=""} 1.690219845e+09
aerospike_ttl_scan_last_updated{namespace="myNS",set="Beans"} 1.690219844e+09
aerospike_ttl_scan_last_updated{namespace="myNS",set="boo"} 1.690219848e+09
aerospike_ttl_scan_time_seconds{namespace="myOtherNS",set=""} 1
aerospike_ttl_scan_time_seconds{namespace="myNS",set="Beans"} 6
aerospike_ttl_scan_time_seconds{namespace="myNS",set="boo"} 1
aerospike_ttl_build_info{version="3.1.0"} 1
aerospike_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="1.3824e+07"} 858
aerospike_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="1.4688e+07"} 901
aerospike_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="1.56384e+07"} 971
aerospike_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="1.9008e+07"} 1004
aerospike_ttl_counts_hist_bucket{namespace="myNS",set="Beans",le="+Inf"} 1004
aerospike_ttl_counts_hist_sum{namespace="myNS",set="Beans"} 9.037094916e+09
aerospike_ttl_counts_hist_count{namespace="myNS",set="Beans"} 1004
aerospike_ttl_kib_hist_bucket{namespace="myNS",set="Beans",storage_type="device",le="1.3824e+07"} 1938
aerospike_ttl_kib_hist_bucket{namespace="myNS",set="Beans",storage_type="device",le="1.4688e+07"} 2046
aerospike_ttl_kib_hist_bucket{namespace="myNS",set="Beans",storage_type="device",le="1.56384e+07"} 2196
aerospike_ttl_kib_hist_bucket{namespace="myNS",set="Beans",storage_type="device",le="1.9008e+07"} 4041
aerospike_ttl_kib_hist_bucket{namespace="myNS",set="Beans",storage_type="device",le="+Inf"} 4041
aerospike_ttl_kib_hist_sum{namespace="myNS",set="Beans",storage_type="device"} 5.244241494e+10
aerospike_ttl_kib_hist_count{namespace="myNS",set="Beans",storage_type="device"} 4041
aerospike_ttl_scan_last_updated{namespace="myNS",set="Beans"} 1.690408779e+09
aerospike_ttl_scan_time_seconds{namespace="myNS",set="Beans"} 103
```

# To use:
Expand Down
78 changes: 47 additions & 31 deletions collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"gopkg.in/yaml.v3"
)

var buildVersion = "3.0.1"
var buildVersion = "3.1.2"
var configFile = flag.String("configFile", "/etc/ttl-aerospike-exporter.yaml", "The yaml config file for the exporter")
var ns_set_to_histograms = make(map[string]map[string]*prometheus.HistogramVec)

Expand Down Expand Up @@ -43,9 +43,8 @@ var scanLastUpdated = prometheus.NewGaugeVec(
)

// these are global because im lazy
var running = false // bool to track whether a scan is running already or not.
var localIps = make(map[string]bool) // map to prevent duplicates, and a list of what our local ips are
var resultMap = make(map[string]map[uint32]int) // map of namespace:set -> { ttl, count } stored globally so we can report 0 on unseen metrics if the server suddenly doesn't have any
var running = false // bool to track whether a scan is running already or not.
var localIps = make(map[string]bool) // map to prevent duplicates, and a list of what our local ips are
var config conf

type conf struct {
Expand All @@ -66,20 +65,22 @@ type serviceConf struct {
}

type monconf struct {
Namespace string `yaml:"namespace"`
Set string `yaml:"set"`
Recordcount int `yaml:"recordCount,omitempty"`
ScanPercent float64 `yaml:"scanPercent,omitempty"`
NumberOfBucketsToExport int `yaml:"numberOfBucketsToExport,omitempty"`
BucketWidth int `yaml:"bucketWidth,omitempty"`
BucketStart int `yaml:"bucketStart,omitempty"`
StaticBucketList []float64 `yaml:"staticBucketList,omitempty"`
ReportCount int `yaml:"reportCount,omitempty"`
ScanTotalTimeout string `yaml:"scanTotalTimeout"`
ScanSocketTimeout string `yaml:"scanSocketTimeout"`
PolicyTotalTimeout string `yaml:"policyTotalTimeout"`
PolicySocketTimeout string `yaml:"policySocketTimeout"`
RecordsPerSecond int `yaml:"recordsPerSecond"`
Namespace string `yaml:"namespace"`
Set string `yaml:"set"`
Recordcount int `yaml:"recordCount,omitempty"`
ScanPercent float64 `yaml:"scanPercent,omitempty"`
NumberOfBucketsToExport int `yaml:"numberOfBucketsToExport,omitempty"`
BucketWidth int `yaml:"bucketWidth,omitempty"`
BucketStart int `yaml:"bucketStart,omitempty"`
StaticBucketList []float64 `yaml:"staticBucketList,omitempty"`
ReportCount int `yaml:"reportCount,omitempty"`
ScanTotalTimeout string `yaml:"scanTotalTimeout"`
ScanSocketTimeout string `yaml:"scanSocketTimeout"`
PolicyTotalTimeout string `yaml:"policyTotalTimeout"`
PolicySocketTimeout string `yaml:"policySocketTimeout"`
RecordsPerSecond int `yaml:"recordsPerSecond"`
KByteHistogram map[string]bool `yaml:"kbyteHistogram,omitempty"`
KByteHistogramResolution float64 `yaml:"kbyteHistogramResolution,omitempty"`
}

func (c *conf) setConf() {
Expand Down Expand Up @@ -128,23 +129,38 @@ func init() {
buckets = prometheus.LinearBuckets(bucket_start, bucket_width, number_of_buckets)
}

//Buckets: []float64{0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0}, // Custom static buckets
histograms := make(map[string]*prometheus.HistogramVec)

expirationTTLCountsHist := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "aerospike_expiration_ttl_counts_hist",
Help: "h",
Buckets: buckets,
ConstLabels: prometheus.Labels{"namespace": namespace, "set": set},
}, []string{},
)
prometheus.MustRegister(expirationTTLCountsHist)
if histogramConf.KByteHistogram["deviceSize"] || histogramConf.KByteHistogram["memorySize"] {
expirationTTLBytesHist := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "aerospike_ttl",
Name: "kib_hist",
Help: "Histogram of how many bytes fall into each ttl bucket. Memory will be the in-memory data size and does not include PI or SI.",
Buckets: buckets,
ConstLabels: prometheus.Labels{"namespace": namespace, "set": set},
}, []string{"storage_type"},
)
prometheus.MustRegister(expirationTTLBytesHist)
histograms["bytes"] = expirationTTLBytesHist
}

histograms := make(map[string]*prometheus.HistogramVec)
histograms["counts"] = expirationTTLCountsHist
if true {
expirationTTLCountsHist := prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "aerospike_ttl",
Name: "counts_hist",
Help: "Histogram of how many records fall into each ttl bucket.",
Buckets: buckets,
ConstLabels: prometheus.Labels{"namespace": namespace, "set": set},
}, []string{},
)
prometheus.MustRegister(expirationTTLCountsHist)
histograms["counts"] = expirationTTLCountsHist
}

// Add the HistogramVec to the inner map
ns_set_to_histograms[namespace+"_"+set] = histograms
ns_set_to_histograms[namespace+":"+set] = histograms

//now we can call something like ns_set_to_histograms[mynamespace_myset].Observe in the future.
}
Expand Down
10 changes: 10 additions & 0 deletions conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,13 @@ monitor:
policyTotalTimeout: 20m # https://golang.org/pkg/time/#ParseDuration
policySocketTimeout: 20m # https://golang.org/pkg/time/#ParseDuration
RecordsPerSecond: 100 # not sure if this works on older versions, but it does right now as of v5. 0 means no limit.
## KiB exports
# note on 'resolution here: this can have drastic perf implications
# This directly affects how many times we call histogram.Observe.
# ex. If you have a ~128,000 byte size record and resolution is set to 0.001 we will call observe 128,000 times.
# this does mean we lose some resolution on how large the records are, because they'll be rounded down by 'n' bytes.
# value is in KiB, recommend starting at something like 0.334 so our resolution will be around 334 bytes
kbyteHistogramResolution: 0.334
kbyteHistogram:
deviceSize: true
memorySize: false
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ require (
// github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.11.0 // indirect
github.com/prometheus/procfs v0.11.1 // indirect
github.com/sirupsen/logrus v1.9.3
github.com/yuin/gopher-lua v1.1.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1
Expand Down
5 changes: 3 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/prometheus/procfs v0.11.0 h1:5EAgkfkMl659uZPbe9AS2N68a7Cc1TJbPEuGzFuRbyk=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwaUuI=
github.com/prometheus/procfs v0.11.1/go.mod h1:eesXgaPo1q7lBpVMoMy0ZOFTth9hBn4W/y0/p/ScXhY=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
Expand Down Expand Up @@ -469,6 +469,7 @@ golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down
2 changes: 0 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package main

import (
"net/http"
"runtime"

"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
)

func main() {
runtime.GOMAXPROCS(1)
//This section will start the HTTP server and expose
//any metrics on the /metrics endpoint.
http.Handle("/metrics", promhttp.Handler())
Expand Down
86 changes: 73 additions & 13 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var cp = as.NewClientPolicy()
var err error
var buf bytes.Buffer
var backoff = 1.0
var measureOps []*as.Operation
var opPolicy *as.WritePolicy

const NON_EXPIRABLE_TTL_VALUE = 4294967295

Expand Down Expand Up @@ -73,11 +75,12 @@ func aeroInit() error {
if client != nil && client.IsConnected() {
logrus.Warn("Client was connected but aeroinit called. Reopening connection")
client.Close()

}
// TODO: make these configurable.
// cp.ConnectionQueueSize = 20
// cp.MinConnectionsPerNode = 10
// cp.ConnectionQueueSize = 3
// cp.MinConnectionsPerNode = 1
// cp.TendInterval = 3
cp.IdleTimeout = 55 * time.Second
//function to define policies and connect to aerospike.
logrus.Info("Connecting to ", config.Service.AerospikeAddr, "...")
Expand All @@ -96,7 +99,6 @@ func aeroInit() error {
return err
}
logrus.Info("Connected:", client.IsConnected())
//time.Sleep(15 * time.Second)
scanpol.IncludeBinData = false
return nil
}
Expand Down Expand Up @@ -260,6 +262,45 @@ func runner() {
}
}

// this stuff is pretty static. wanted it out of the way.
func initRecSizeVars() ([]*as.Operation, *as.WritePolicy) {
writePolicy := as.NewWritePolicy(0, 0)
writePolicy.Expiration = as.TTLDontUpdate //dont change the TTL of a record. should result in a no-op.
writePolicy.MaxRetries = 10
writePolicy.SleepBetweenRetries = 334 //334ms.
writePolicy.TotalTimeout = 0 //let socket time it out.
dev_size_exp := as.ExpDeviceSize()
mem_size_exp := as.ExpMemorySize()

// Since the only operations are deemed 'Read Op' this will be a no-op. The writePolicy is demanded by the client driver anyway.
operations := []*as.Operation{
as.ExpReadOp("devsize", dev_size_exp, as.ExpReadFlagDefault),
as.ExpReadOp("memsize", mem_size_exp, as.ExpReadFlagDefault),
}
return operations, writePolicy
}

func measureRecordSize(client *as.Client, key *as.Key, operations []*as.Operation, policy *as.WritePolicy) (float64, float64, error) {
// Apply the expression to a record
record, err := client.Operate(policy, key, operations...)
if err != nil {
log.Fatal(err)
}
// Print the result
memsize, mok := record.Bins["memsize"].(int)
if !mok {
logrus.Error("Could not convert 'memsize' to int")
}

devsize, dok := record.Bins["devsize"].(int)
if !dok {
logrus.Error("Could not convert 'devize' to int")
}

// return it as KiB
return float64(devsize / 1024), float64(memsize / 1024), err
}

// simple function to take a human duration input like 1m20s and return a time.Duration output
func parseDur(dur string) time.Duration {
parsedDur, err := time.ParseDuration(dur)
Expand Down Expand Up @@ -315,7 +356,11 @@ func updateStats(namespace string, set string, namespaceSet string, element monc
recs, _ := client.ScanNode(scanpol, localNode, namespace, set)
total := 0
totalInspected := 0
resultMap[namespaceSet] = make(map[uint32]int)

// if we intend to export mem/device size histograms, we'll need these vars
if element.KByteHistogram["memorySize"] || element.KByteHistogram["deviceSize"] {
measureOps, opPolicy = initRecSizeVars()
}
for rec := range recs.Results() {
if config.Service.Verbose {
if total%element.ReportCount == 0 { // this is after the scan is done. may not be valuable other than for debugging.
Expand All @@ -331,7 +376,30 @@ func updateStats(namespace string, set string, namespaceSet string, element monc
} else {
total++
expireTime := rec.Record.Expiration
resultMap[namespaceSet][expireTime]++
ns_set_to_histograms[namespaceSet]["counts"].WithLabelValues().Observe(float64(expireTime))

// handle byte histogram
// need to do an extra operation here unfortunately
// This should result in a no-op using "Operation" with "Expression" to return metadata only.
// should not incur IO expense.
if element.KByteHistogram["memorySize"] || element.KByteHistogram["deviceSize"] {
devsize, memsize, err := measureRecordSize(client, rec.Record.Key, measureOps, opPolicy)
if err != nil {
logrus.Errorf("Failure fetching record size. Err: %v", err)
}
if element.KByteHistogram["deviceSize"] {
// if this is 0, we wont even create the histogram. neat. hopefully that doesnt confuse people in the future
for i := 0.0; i < devsize; i += element.KByteHistogramResolution {
ns_set_to_histograms[namespaceSet]["bytes"].WithLabelValues("device").Observe(float64(expireTime))
}
}
if element.KByteHistogram["memorySize"] {
// same here if memsize is 0, we wont get a histogram.
for i := 0.0; i < memsize; i += element.KByteHistogramResolution {
ns_set_to_histograms[namespaceSet]["bytes"].WithLabelValues("memory").Observe(float64(expireTime))
}
}
}
}
} else {
logrus.Error("Error while inspecting scan results: ", rec.Err)
Expand All @@ -344,14 +412,6 @@ func updateStats(namespace string, set string, namespaceSet string, element monc
break
}
}

for key := range resultMap[namespaceSet] {
num_records := float64(resultMap[namespaceSet][key])
for i := 0.0; i < num_records; i++ {
ns_set_to_histograms[namespace+"_"+set]["counts"].WithLabelValues().Observe(float64(key))
}

}
logrus.WithFields(logrus.Fields{
"total(records exported)": total,
"totalInspected": totalInspected,
Expand Down

0 comments on commit dd5a9cd

Please sign in to comment.