From 12dbd2c8de2f9d77af5b456dbdb2d69eab66cf96 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Thu, 28 Jun 2018 18:26:15 -0300 Subject: [PATCH 01/21] refactor: remove redundant code --- metrics.go | 5 ----- reporter.go | 1 - 2 files changed, 6 deletions(-) diff --git a/metrics.go b/metrics.go index 8f4c8ce..6ab2d2b 100644 --- a/metrics.go +++ b/metrics.go @@ -49,11 +49,6 @@ func (m *Metrics) Count(name string, value int64) error { return m.register(name, value) } -// Clear clears current buffer -func (m *Metrics) Clear() { - m.data = new(m.name) -} - // ToJSON serializes data to json func (m *Metrics) ToJSON() []byte { data, _ := json.Marshal(m.data) diff --git a/reporter.go b/reporter.go index bd8c946..0ad20b8 100644 --- a/reporter.go +++ b/reporter.go @@ -126,6 +126,5 @@ func (r *Reporter) FlushOnce() error { } }) r.Conn.Write(m.ToJSON()) - m.Clear() return nil } From 58e1f789c0e76e132f2f286b6a51f2704ef6ac05 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Thu, 28 Jun 2018 19:23:14 -0300 Subject: [PATCH 02/21] fix: make count cumulative --- reporter.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/reporter.go b/reporter.go index 0ad20b8..3baf4a9 100644 --- a/reporter.go +++ b/reporter.go @@ -20,7 +20,6 @@ type Reporter struct { percentiles []float64 p []string - ss map[string]int64 udpAddr *net.UDPAddr } @@ -47,7 +46,6 @@ func NewReporter(r metrics.Registry, addr string, name string) (*Reporter, error udpAddr: udpAddr, percentiles: []float64{0.50, 0.75, 0.95, 0.99, 0.999}, - ss: make(map[string]int64), }, nil } @@ -74,10 +72,7 @@ func (r *Reporter) FlushOnce() error { r.Registry.Each(func(name string, i interface{}) { switch metric := i.(type) { case metrics.Counter: - v := metric.Count() - l := r.ss[name] - m.Count(name, v-l) - r.ss[name] = v + m.Count(name, metric.Count()) case metrics.Gauge: m.Gauge(name, float64(metric.Value())) From 4c9bf5a80540f53017b72d592168fb327c8531db Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Thu, 28 Jun 2018 19:30:45 -0300 Subject: [PATCH 03/21] fix: remove 'gauge' suffix --- metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metrics.go b/metrics.go index 6ab2d2b..cd67706 100644 --- a/metrics.go +++ b/metrics.go @@ -41,7 +41,7 @@ func (m *Metrics) register(name string, value interface{}) error { // Gauge register a new gauge metric func (m *Metrics) Gauge(name string, value interface{}) error { - return m.register(name+".gauge", value) + return m.register(name, value) } // Count register a new gaugeFloat64 metric From e463d4d76e8c80fd9b5b6ca7c0a999a9a26f2730 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Thu, 28 Jun 2018 19:32:46 -0300 Subject: [PATCH 04/21] refactor: use a generic register function --- metrics.go | 13 ++----------- reporter.go | 42 +++++++++++++++++++++--------------------- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/metrics.go b/metrics.go index cd67706..fba5536 100644 --- a/metrics.go +++ b/metrics.go @@ -28,7 +28,8 @@ func new(name string) map[string]interface{} { } } -func (m *Metrics) register(name string, value interface{}) error { +// Register registers a new metric +func (m *Metrics) Register(name string, value interface{}) error { m.RLock() defer m.RUnlock() @@ -39,16 +40,6 @@ func (m *Metrics) register(name string, value interface{}) error { return nil } -// Gauge register a new gauge metric -func (m *Metrics) Gauge(name string, value interface{}) error { - return m.register(name, value) -} - -// Count register a new gaugeFloat64 metric -func (m *Metrics) Count(name string, value int64) error { - return m.register(name, value) -} - // ToJSON serializes data to json func (m *Metrics) ToJSON() []byte { data, _ := json.Marshal(m.data) diff --git a/reporter.go b/reporter.go index 3baf4a9..cd4e87d 100644 --- a/reporter.go +++ b/reporter.go @@ -72,50 +72,50 @@ func (r *Reporter) FlushOnce() error { r.Registry.Each(func(name string, i interface{}) { switch metric := i.(type) { case metrics.Counter: - m.Count(name, metric.Count()) + m.Register(name, metric.Count()) case metrics.Gauge: - m.Gauge(name, float64(metric.Value())) + m.Register(name, float64(metric.Value())) case metrics.GaugeFloat64: - m.Gauge(name, metric.Value()) + m.Register(name, metric.Value()) case metrics.Histogram: ms := metric.Snapshot() - m.Gauge(name+".count", float64(ms.Count())) - m.Gauge(name+".max", float64(ms.Max())) - m.Gauge(name+".min", float64(ms.Min())) - m.Gauge(name+".mean", ms.Mean()) - m.Gauge(name+".stddev", ms.StdDev()) - m.Gauge(name+".var", ms.Variance()) + m.Register(name+".count", float64(ms.Count())) + m.Register(name+".max", float64(ms.Max())) + m.Register(name+".min", float64(ms.Min())) + m.Register(name+".mean", ms.Mean()) + m.Register(name+".stddev", ms.StdDev()) + m.Register(name+".var", ms.Variance()) if len(r.percentiles) > 0 { values := ms.Percentiles(r.percentiles) for i, p := range r.p { - m.Gauge(name+p, values[i]) + m.Register(name+p, values[i]) } } case metrics.Meter: ms := metric.Snapshot() - m.Gauge(name+".count", float64(ms.Count())) - m.Gauge(name+".rate1", ms.Rate1()) - m.Gauge(name+".rate5", ms.Rate5()) - m.Gauge(name+".rate15", ms.Rate15()) - m.Gauge(name+".mean", ms.RateMean()) + m.Register(name+".count", float64(ms.Count())) + m.Register(name+".rate1", ms.Rate1()) + m.Register(name+".rate5", ms.Rate5()) + m.Register(name+".rate15", ms.Rate15()) + m.Register(name+".mean", ms.RateMean()) case metrics.Timer: ms := metric.Snapshot() - m.Gauge(name+".count", float64(ms.Count())) - m.Gauge(name+".max", time.Duration(ms.Max()).Seconds()*1000) - m.Gauge(name+".min", time.Duration(ms.Min()).Seconds()*1000) - m.Gauge(name+".mean", time.Duration(ms.Mean()).Seconds()*1000) - m.Gauge(name+".stddev", time.Duration(ms.StdDev()).Seconds()*1000) + m.Register(name+".count", float64(ms.Count())) + m.Register(name+".max", time.Duration(ms.Max()).Seconds()*1000) + m.Register(name+".min", time.Duration(ms.Min()).Seconds()*1000) + m.Register(name+".mean", time.Duration(ms.Mean()).Seconds()*1000) + m.Register(name+".stddev", time.Duration(ms.StdDev()).Seconds()*1000) if len(r.percentiles) > 0 { values := ms.Percentiles(r.percentiles) for i, p := range r.p { - m.Gauge(name+p, time.Duration(values[i]).Seconds()*1000) + m.Register(name+p, time.Duration(values[i]).Seconds()*1000) } } } From 4f973286784d391992158b258cbd38bc54973514 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Thu, 28 Jun 2018 19:53:55 -0300 Subject: [PATCH 05/21] refactor: use sprintf to create metric names --- reporter.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/reporter.go b/reporter.go index cd4e87d..fe23233 100644 --- a/reporter.go +++ b/reporter.go @@ -1,6 +1,7 @@ package logstash import ( + "fmt" "log" "net" "time" @@ -82,12 +83,12 @@ func (r *Reporter) FlushOnce() error { case metrics.Histogram: ms := metric.Snapshot() - m.Register(name+".count", float64(ms.Count())) - m.Register(name+".max", float64(ms.Max())) - m.Register(name+".min", float64(ms.Min())) - m.Register(name+".mean", ms.Mean()) - m.Register(name+".stddev", ms.StdDev()) - m.Register(name+".var", ms.Variance()) + m.Register(fmt.Sprintf("%s.count", name), float64(ms.Count())) + m.Register(fmt.Sprintf("%s.max", name), float64(ms.Max())) + m.Register(fmt.Sprintf("%s.min", name), float64(ms.Min())) + m.Register(fmt.Sprintf("%s.mean", name), ms.Mean()) + m.Register(fmt.Sprintf("%s.stddev", name), ms.StdDev()) + m.Register(fmt.Sprintf("%s.var", name), ms.Variance()) if len(r.percentiles) > 0 { values := ms.Percentiles(r.percentiles) @@ -98,19 +99,19 @@ func (r *Reporter) FlushOnce() error { case metrics.Meter: ms := metric.Snapshot() - m.Register(name+".count", float64(ms.Count())) - m.Register(name+".rate1", ms.Rate1()) - m.Register(name+".rate5", ms.Rate5()) - m.Register(name+".rate15", ms.Rate15()) - m.Register(name+".mean", ms.RateMean()) + m.Register(fmt.Sprintf("%s.count", name), float64(ms.Count())) + m.Register(fmt.Sprintf("%s.rate1", name), ms.Rate1()) + m.Register(fmt.Sprintf("%s.rate5", name), ms.Rate5()) + m.Register(fmt.Sprintf("%s.rate15", name), ms.Rate15()) + m.Register(fmt.Sprintf("%s.mean", name), ms.RateMean()) case metrics.Timer: ms := metric.Snapshot() - m.Register(name+".count", float64(ms.Count())) - m.Register(name+".max", time.Duration(ms.Max()).Seconds()*1000) - m.Register(name+".min", time.Duration(ms.Min()).Seconds()*1000) - m.Register(name+".mean", time.Duration(ms.Mean()).Seconds()*1000) - m.Register(name+".stddev", time.Duration(ms.StdDev()).Seconds()*1000) + m.Register(fmt.Sprintf("%s.count", name), float64(ms.Count())) + m.Register(fmt.Sprintf("%s.max", name), time.Duration(ms.Max()).Seconds()*1000) + m.Register(fmt.Sprintf("%s.min", name), time.Duration(ms.Min()).Seconds()*1000) + m.Register(fmt.Sprintf("%s.mean", name), time.Duration(ms.Mean()).Seconds()*1000) + m.Register(fmt.Sprintf("%s.stddev", name), time.Duration(ms.StdDev()).Seconds()*1000) if len(r.percentiles) > 0 { values := ms.Percentiles(r.percentiles) From 0d1811103b08f26417ca70872deedda3530ea719 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Thu, 28 Jun 2018 20:07:29 -0300 Subject: [PATCH 06/21] fix: remove unset field from reporter --- reporter.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/reporter.go b/reporter.go index fe23233..3266fbc 100644 --- a/reporter.go +++ b/reporter.go @@ -20,7 +20,6 @@ type Reporter struct { Version string percentiles []float64 - p []string udpAddr *net.UDPAddr } @@ -90,11 +89,8 @@ func (r *Reporter) FlushOnce() error { m.Register(fmt.Sprintf("%s.stddev", name), ms.StdDev()) m.Register(fmt.Sprintf("%s.var", name), ms.Variance()) - if len(r.percentiles) > 0 { - values := ms.Percentiles(r.percentiles) - for i, p := range r.p { - m.Register(name+p, values[i]) - } + for _, p := range r.percentiles { + m.Register(fmt.Sprintf("%s.p%g", name, p*100), ms.Percentile(p)) } case metrics.Meter: @@ -113,11 +109,9 @@ func (r *Reporter) FlushOnce() error { m.Register(fmt.Sprintf("%s.mean", name), time.Duration(ms.Mean()).Seconds()*1000) m.Register(fmt.Sprintf("%s.stddev", name), time.Duration(ms.StdDev()).Seconds()*1000) - if len(r.percentiles) > 0 { - values := ms.Percentiles(r.percentiles) - for i, p := range r.p { - m.Register(name+p, time.Duration(values[i]).Seconds()*1000) - } + for _, p := range r.percentiles { + duration := time.Duration(ms.Percentile(p)).Seconds() * 1000 + m.Register(fmt.Sprintf("%s.p%g", name, p*100), duration) } } }) From 9e203dbb190db2f910328a626cd9af421a6a0c97 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Fri, 29 Jun 2018 11:30:16 -0300 Subject: [PATCH 07/21] fix: add 'count' suffix to counter --- reporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reporter.go b/reporter.go index 3266fbc..b7e1e39 100644 --- a/reporter.go +++ b/reporter.go @@ -72,7 +72,7 @@ func (r *Reporter) FlushOnce() error { r.Registry.Each(func(name string, i interface{}) { switch metric := i.(type) { case metrics.Counter: - m.Register(name, metric.Count()) + m.Register(fmt.Sprintf("%s.count", name), metric.Count()) case metrics.Gauge: m.Register(name, float64(metric.Value())) From 6f59ed6c8b898fc7e21c1aa88a245e838b1bfd9a Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Fri, 29 Jun 2018 11:31:00 -0300 Subject: [PATCH 08/21] test: ensure reporter is sending a correct json --- reporter_test.go | 150 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 reporter_test.go diff --git a/reporter_test.go b/reporter_test.go new file mode 100644 index 0000000..baae218 --- /dev/null +++ b/reporter_test.go @@ -0,0 +1,150 @@ +package logstash + +import ( + "net" + "strconv" + "strings" + "testing" + + metrics "github.com/rcrowley/go-metrics" + "github.com/stretchr/testify/assert" +) + +type UDPServer struct { + conn *net.UDPConn +} + +func newUDPServer(port int) (*UDPServer, error) { + serverAddr, err := net.ResolveUDPAddr("udp", ":"+strconv.Itoa(port)) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", serverAddr) + if err != nil { + return nil, err + } + return &UDPServer{conn}, nil +} + +func (us *UDPServer) Read() (string, error) { + buffer := make([]byte, 4096) + _, _, err := us.conn.ReadFromUDP(buffer) + if err != nil { + return "", err + } + resizedStr := strings.Trim(string(buffer), "\x00") // Remove the empty chars at the end of the buffer + return resizedStr, nil +} + +func (us *UDPServer) Close() { + us.conn.Close() +} + +func TestFlushOnce(t *testing.T) { + serverAddr := "localhost:1984" + server, err := newUDPServer(1984) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + registry := metrics.NewRegistry() + reporter, err := NewReporter(registry, serverAddr, "dummy-client") + + // Insert metrics + metrics.GetOrRegisterCounter("test.counter", registry).Inc(6) + metrics.GetOrRegisterCounter("test.counter", registry).Inc(2) + metrics.GetOrRegisterGauge("test.gauge", registry).Update(2) + metrics.GetOrRegisterGauge("test.gauge", registry).Update(3) + metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(4) + metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(5) + sample := metrics.NewUniformSample(2) + metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(9) + metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(10) + // TODO test meter and timer + reporter.FlushOnce() + + received, err := server.Read() + if err != nil { + t.Fatal(err) + } + + expected := `{ + "metric": "doc", + "client":"dummy-client", + "count": 1, + "test.counter.count": 8, + "test.gauge": 3, + "test.gaugeFloat64": 5, + "test.histogram.count": 2, + "test.histogram.min": 9, + "test.histogram.max": 10, + "test.histogram.mean": 9.5, + "test.histogram.stddev": 0.5, + "test.histogram.var": 0.25, + "test.histogram.p50": 9.5, + "test.histogram.p75": 10, + "test.histogram.p95": 10, + "test.histogram.p99": 10, + "test.histogram.p99.9": 10 + }` + assert.JSONEq(t, expected, received) +} + +func TestFlushOnceKeepsPreviousValues(t *testing.T) { + serverAddr := "localhost:1984" + server, err := newUDPServer(1984) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + registry := metrics.NewRegistry() + reporter, err := NewReporter(registry, serverAddr, "dummy-client") + + // Insert metrics + sample := metrics.NewUniformSample(3) + metrics.GetOrRegisterCounter("test.counter", registry).Inc(6) + metrics.GetOrRegisterCounter("test.counter", registry).Inc(2) + metrics.GetOrRegisterGauge("test.gauge", registry).Update(2) + metrics.GetOrRegisterGauge("test.gauge", registry).Update(3) + metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(4) + metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(5) + metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(9) + metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(10) + reporter.FlushOnce() + server.Read() // Ignore current values + + metrics.GetOrRegisterCounter("test.counter", registry).Inc(4) + metrics.GetOrRegisterGauge("test.gauge", registry).Update(8) + metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(9) + metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(12) + // TODO test meter and timer + reporter.FlushOnce() + + received, err := server.Read() + if err != nil { + t.Fatal(err) + } + + expected := `{ + "metric": "doc", + "client":"dummy-client", + "count": 1, + "test.counter.count": 12, + "test.gauge": 8, + "test.gaugeFloat64": 9, + "test.histogram.count": 3, + "test.histogram.min": 9, + "test.histogram.max": 12, + "test.histogram.mean": 10.333333333333334, + "test.histogram.stddev": 1.247219128924647, + "test.histogram.var": 1.5555555555555556, + "test.histogram.p50": 10, + "test.histogram.p75": 12, + "test.histogram.p95": 12, + "test.histogram.p99": 12, + "test.histogram.p99.9": 12 + }` + assert.JSONEq(t, expected, received) +} From 3bc7cebce82363fa7aaa0e82d19acd0e8ac6f928 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 11:19:08 -0300 Subject: [PATCH 09/21] fix: avoid isolated numbers after dot this causes elasticsearch to think the number is a field name, which is invalid --- reporter.go | 4 +++- reporter_test.go | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/reporter.go b/reporter.go index b7e1e39..27a21ad 100644 --- a/reporter.go +++ b/reporter.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "net" + "strings" "time" metrics "github.com/rcrowley/go-metrics" @@ -90,7 +91,8 @@ func (r *Reporter) FlushOnce() error { m.Register(fmt.Sprintf("%s.var", name), ms.Variance()) for _, p := range r.percentiles { - m.Register(fmt.Sprintf("%s.p%g", name, p*100), ms.Percentile(p)) + pStr := strings.Replace(fmt.Sprintf("p%g", p*100), ".", "_", -1) + m.Register(fmt.Sprintf("%s.%s", name, pStr), ms.Percentile(p)) } case metrics.Meter: diff --git a/reporter_test.go b/reporter_test.go index baae218..a2e3d8c 100644 --- a/reporter_test.go +++ b/reporter_test.go @@ -86,7 +86,7 @@ func TestFlushOnce(t *testing.T) { "test.histogram.p75": 10, "test.histogram.p95": 10, "test.histogram.p99": 10, - "test.histogram.p99.9": 10 + "test.histogram.p99_9": 10 }` assert.JSONEq(t, expected, received) } @@ -144,7 +144,7 @@ func TestFlushOnceKeepsPreviousValues(t *testing.T) { "test.histogram.p75": 12, "test.histogram.p95": 12, "test.histogram.p99": 12, - "test.histogram.p99.9": 12 + "test.histogram.p99_9": 12 }` assert.JSONEq(t, expected, received) } From 04a3ecdb4269920c7e56eba4fdc5df6f7545da64 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 11:24:37 -0300 Subject: [PATCH 10/21] refactor: use elasticsearch's naming conventions in tests --- reporter_test.go | 96 ++++++++++++++++++++++++------------------------ 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/reporter_test.go b/reporter_test.go index a2e3d8c..3e74df6 100644 --- a/reporter_test.go +++ b/reporter_test.go @@ -52,15 +52,15 @@ func TestFlushOnce(t *testing.T) { reporter, err := NewReporter(registry, serverAddr, "dummy-client") // Insert metrics - metrics.GetOrRegisterCounter("test.counter", registry).Inc(6) - metrics.GetOrRegisterCounter("test.counter", registry).Inc(2) - metrics.GetOrRegisterGauge("test.gauge", registry).Update(2) - metrics.GetOrRegisterGauge("test.gauge", registry).Update(3) - metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(4) - metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(5) + metrics.GetOrRegisterCounter("test_counter", registry).Inc(6) + metrics.GetOrRegisterCounter("test_counter", registry).Inc(2) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(2) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(3) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(4) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(5) sample := metrics.NewUniformSample(2) - metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(9) - metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(10) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(9) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(10) // TODO test meter and timer reporter.FlushOnce() @@ -73,20 +73,20 @@ func TestFlushOnce(t *testing.T) { "metric": "doc", "client":"dummy-client", "count": 1, - "test.counter.count": 8, - "test.gauge": 3, - "test.gaugeFloat64": 5, - "test.histogram.count": 2, - "test.histogram.min": 9, - "test.histogram.max": 10, - "test.histogram.mean": 9.5, - "test.histogram.stddev": 0.5, - "test.histogram.var": 0.25, - "test.histogram.p50": 9.5, - "test.histogram.p75": 10, - "test.histogram.p95": 10, - "test.histogram.p99": 10, - "test.histogram.p99_9": 10 + "test_counter.count": 8, + "test_gauge": 3, + "test_gaugeFloat64": 5, + "test_histogram.count": 2, + "test_histogram.min": 9, + "test_histogram.max": 10, + "test_histogram.mean": 9.5, + "test_histogram.stddev": 0.5, + "test_histogram.var": 0.25, + "test_histogram.p50": 9.5, + "test_histogram.p75": 10, + "test_histogram.p95": 10, + "test_histogram.p99": 10, + "test_histogram.p99_9": 10 }` assert.JSONEq(t, expected, received) } @@ -104,21 +104,21 @@ func TestFlushOnceKeepsPreviousValues(t *testing.T) { // Insert metrics sample := metrics.NewUniformSample(3) - metrics.GetOrRegisterCounter("test.counter", registry).Inc(6) - metrics.GetOrRegisterCounter("test.counter", registry).Inc(2) - metrics.GetOrRegisterGauge("test.gauge", registry).Update(2) - metrics.GetOrRegisterGauge("test.gauge", registry).Update(3) - metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(4) - metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(5) - metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(9) - metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(10) + metrics.GetOrRegisterCounter("test_counter", registry).Inc(6) + metrics.GetOrRegisterCounter("test_counter", registry).Inc(2) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(2) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(3) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(4) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(5) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(9) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(10) reporter.FlushOnce() server.Read() // Ignore current values - metrics.GetOrRegisterCounter("test.counter", registry).Inc(4) - metrics.GetOrRegisterGauge("test.gauge", registry).Update(8) - metrics.GetOrRegisterGaugeFloat64("test.gaugeFloat64", registry).Update(9) - metrics.GetOrRegisterHistogram("test.histogram", registry, sample).Update(12) + metrics.GetOrRegisterCounter("test_counter", registry).Inc(4) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(8) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(9) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(12) // TODO test meter and timer reporter.FlushOnce() @@ -131,20 +131,20 @@ func TestFlushOnceKeepsPreviousValues(t *testing.T) { "metric": "doc", "client":"dummy-client", "count": 1, - "test.counter.count": 12, - "test.gauge": 8, - "test.gaugeFloat64": 9, - "test.histogram.count": 3, - "test.histogram.min": 9, - "test.histogram.max": 12, - "test.histogram.mean": 10.333333333333334, - "test.histogram.stddev": 1.247219128924647, - "test.histogram.var": 1.5555555555555556, - "test.histogram.p50": 10, - "test.histogram.p75": 12, - "test.histogram.p95": 12, - "test.histogram.p99": 12, - "test.histogram.p99_9": 12 + "test_counter.count": 12, + "test_gauge": 8, + "test_gaugeFloat64": 9, + "test_histogram.count": 3, + "test_histogram.min": 9, + "test_histogram.max": 12, + "test_histogram.mean": 10.333333333333334, + "test_histogram.stddev": 1.247219128924647, + "test_histogram.var": 1.5555555555555556, + "test_histogram.p50": 10, + "test_histogram.p75": 12, + "test_histogram.p95": 12, + "test_histogram.p99": 12, + "test_histogram.p99_9": 12 }` assert.JSONEq(t, expected, received) } From f58a0d00e77d91621920f1c5a3feb7d6ebab1118 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 11:29:48 -0300 Subject: [PATCH 11/21] refactor: rename test to better represent what it does --- reporter_test.go => integration_test.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename reporter_test.go => integration_test.go (100%) diff --git a/reporter_test.go b/integration_test.go similarity index 100% rename from reporter_test.go rename to integration_test.go From c7d0b02d763db9f16dc1a892f7607ffe6fa8239f Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 11:35:12 -0300 Subject: [PATCH 12/21] fix: avoid isolated number after dot for timer the last commit fixing this only fixed for histogram --- reporter.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/reporter.go b/reporter.go index 27a21ad..7880072 100644 --- a/reporter.go +++ b/reporter.go @@ -113,7 +113,8 @@ func (r *Reporter) FlushOnce() error { for _, p := range r.percentiles { duration := time.Duration(ms.Percentile(p)).Seconds() * 1000 - m.Register(fmt.Sprintf("%s.p%g", name, p*100), duration) + pStr := strings.Replace(fmt.Sprintf("p%g", p*100), ".", "_", -1) + m.Register(fmt.Sprintf("%s.%s", name, pStr), duration) } } }) From 3ada002c722c1299ee6fa56776834f942241669c Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 14:53:55 -0300 Subject: [PATCH 13/21] chore: add ci for tests --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..daf3fb8 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,2 @@ +language: go +script: go test -v ./... From e407a87469e70fb3acbbb99d9fbfd8363472c09d Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 16:09:15 -0300 Subject: [PATCH 14/21] refactor: move the map management to the reporter --- metrics.go | 47 -------------------------------------------- reporter.go | 56 +++++++++++++++++++++++++++++++---------------------- 2 files changed, 33 insertions(+), 70 deletions(-) delete mode 100644 metrics.go diff --git a/metrics.go b/metrics.go deleted file mode 100644 index fba5536..0000000 --- a/metrics.go +++ /dev/null @@ -1,47 +0,0 @@ -package logstash - -import ( - "encoding/json" - "errors" - "sync" -) - -// Metrics represents a metric that will be sent to logstash -type Metrics struct { - data map[string]interface{} - name string - sync.RWMutex -} - -// NewMetrics Metric{} constructor -func NewMetrics(name string) *Metrics { - return &Metrics{ - data: new(name), - } -} - -func new(name string) map[string]interface{} { - return map[string]interface{}{ - "metric": "doc", - "client": name, - "count": 1, - } -} - -// Register registers a new metric -func (m *Metrics) Register(name string, value interface{}) error { - m.RLock() - defer m.RUnlock() - - if name == "" { - return errors.New("Invalid metric name") - } - m.data[name] = value - return nil -} - -// ToJSON serializes data to json -func (m *Metrics) ToJSON() []byte { - data, _ := json.Marshal(m.data) - return data -} diff --git a/reporter.go b/reporter.go index 7880072..63aa594 100644 --- a/reporter.go +++ b/reporter.go @@ -1,6 +1,7 @@ package logstash import ( + "encoding/json" "fmt" "log" "net" @@ -68,56 +69,65 @@ func (r *Reporter) FlushEach(interval time.Duration) { // FlushOnce submits a snapshot of the registry. func (r *Reporter) FlushOnce() error { - m := NewMetrics(r.Name) + m := map[string]interface{}{ + "metric": "doc", + "client": r.Name, + "count": 1, + } r.Registry.Each(func(name string, i interface{}) { switch metric := i.(type) { case metrics.Counter: - m.Register(fmt.Sprintf("%s.count", name), metric.Count()) + m[fmt.Sprintf("%s.count", name)] = metric.Count() case metrics.Gauge: - m.Register(name, float64(metric.Value())) + m[name] = float64(metric.Value()) case metrics.GaugeFloat64: - m.Register(name, metric.Value()) + m[name] = metric.Value() case metrics.Histogram: ms := metric.Snapshot() - m.Register(fmt.Sprintf("%s.count", name), float64(ms.Count())) - m.Register(fmt.Sprintf("%s.max", name), float64(ms.Max())) - m.Register(fmt.Sprintf("%s.min", name), float64(ms.Min())) - m.Register(fmt.Sprintf("%s.mean", name), ms.Mean()) - m.Register(fmt.Sprintf("%s.stddev", name), ms.StdDev()) - m.Register(fmt.Sprintf("%s.var", name), ms.Variance()) + m[fmt.Sprintf("%s.count", name)] = float64(ms.Count()) + m[fmt.Sprintf("%s.max", name)] = float64(ms.Max()) + m[fmt.Sprintf("%s.min", name)] = float64(ms.Min()) + m[fmt.Sprintf("%s.mean", name)] = ms.Mean() + m[fmt.Sprintf("%s.stddev", name)] = ms.StdDev() + m[fmt.Sprintf("%s.var", name)] = ms.Variance() for _, p := range r.percentiles { pStr := strings.Replace(fmt.Sprintf("p%g", p*100), ".", "_", -1) - m.Register(fmt.Sprintf("%s.%s", name, pStr), ms.Percentile(p)) + m[fmt.Sprintf("%s.%s", name, pStr)] = ms.Percentile(p) } case metrics.Meter: ms := metric.Snapshot() - m.Register(fmt.Sprintf("%s.count", name), float64(ms.Count())) - m.Register(fmt.Sprintf("%s.rate1", name), ms.Rate1()) - m.Register(fmt.Sprintf("%s.rate5", name), ms.Rate5()) - m.Register(fmt.Sprintf("%s.rate15", name), ms.Rate15()) - m.Register(fmt.Sprintf("%s.mean", name), ms.RateMean()) + m[fmt.Sprintf("%s.count", name)] = float64(ms.Count()) + m[fmt.Sprintf("%s.rate1", name)] = ms.Rate1() + m[fmt.Sprintf("%s.rate5", name)] = ms.Rate5() + m[fmt.Sprintf("%s.rate15", name)] = ms.Rate15() + m[fmt.Sprintf("%s.mean", name)] = ms.RateMean() case metrics.Timer: ms := metric.Snapshot() - m.Register(fmt.Sprintf("%s.count", name), float64(ms.Count())) - m.Register(fmt.Sprintf("%s.max", name), time.Duration(ms.Max()).Seconds()*1000) - m.Register(fmt.Sprintf("%s.min", name), time.Duration(ms.Min()).Seconds()*1000) - m.Register(fmt.Sprintf("%s.mean", name), time.Duration(ms.Mean()).Seconds()*1000) - m.Register(fmt.Sprintf("%s.stddev", name), time.Duration(ms.StdDev()).Seconds()*1000) + m[fmt.Sprintf("%s.count", name)] = float64(ms.Count()) + m[fmt.Sprintf("%s.max", name)] = time.Duration(ms.Max()).Seconds() * 1000 + m[fmt.Sprintf("%s.min", name)] = time.Duration(ms.Min()).Seconds() * 1000 + m[fmt.Sprintf("%s.mean", name)] = time.Duration(ms.Mean()).Seconds() * 1000 + m[fmt.Sprintf("%s.stddev", name)] = time.Duration(ms.StdDev()).Seconds() * 1000 for _, p := range r.percentiles { duration := time.Duration(ms.Percentile(p)).Seconds() * 1000 pStr := strings.Replace(fmt.Sprintf("p%g", p*100), ".", "_", -1) - m.Register(fmt.Sprintf("%s.%s", name, pStr), duration) + m[fmt.Sprintf("%s.%s", name, pStr)] = duration } } }) - r.Conn.Write(m.ToJSON()) + + data, err := json.Marshal(m) + if err != nil { + return err + } + r.Conn.Write(data) return nil } From c734ed5366890f6d0f45c6d764fcdd3ebd6d9961 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 16:11:24 -0300 Subject: [PATCH 15/21] feat: receive default values by parameter --- integration_test.go | 41 +++++++++++++++++++++++++++++++++-------- reporter.go | 26 +++++++++++++------------- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/integration_test.go b/integration_test.go index 3e74df6..f0929f6 100644 --- a/integration_test.go +++ b/integration_test.go @@ -49,7 +49,7 @@ func TestFlushOnce(t *testing.T) { defer server.Close() registry := metrics.NewRegistry() - reporter, err := NewReporter(registry, serverAddr, "dummy-client") + reporter, err := NewReporter(registry, serverAddr, nil) // Insert metrics metrics.GetOrRegisterCounter("test_counter", registry).Inc(6) @@ -70,9 +70,6 @@ func TestFlushOnce(t *testing.T) { } expected := `{ - "metric": "doc", - "client":"dummy-client", - "count": 1, "test_counter.count": 8, "test_gauge": 3, "test_gaugeFloat64": 5, @@ -100,7 +97,7 @@ func TestFlushOnceKeepsPreviousValues(t *testing.T) { defer server.Close() registry := metrics.NewRegistry() - reporter, err := NewReporter(registry, serverAddr, "dummy-client") + reporter, err := NewReporter(registry, serverAddr, nil) // Insert metrics sample := metrics.NewUniformSample(3) @@ -128,9 +125,6 @@ func TestFlushOnceKeepsPreviousValues(t *testing.T) { } expected := `{ - "metric": "doc", - "client":"dummy-client", - "count": 1, "test_counter.count": 12, "test_gauge": 8, "test_gaugeFloat64": 9, @@ -148,3 +142,34 @@ func TestFlushOnceKeepsPreviousValues(t *testing.T) { }` assert.JSONEq(t, expected, received) } + +func TestFlushOnceWithDefaultValues(t *testing.T) { + serverAddr := "localhost:1984" + server, err := newUDPServer(1984) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + registry := metrics.NewRegistry() + reporter, err := NewReporter(registry, serverAddr, map[string]interface{}{ + "client": "dummy-client", + "metric": "doc", + }) + + // Insert metrics + metrics.GetOrRegisterCounter("test_counter", registry).Inc(6) + reporter.FlushOnce() + + received, err := server.Read() + if err != nil { + t.Fatal(err) + } + + expected := `{ + "client":"dummy-client", + "metric": "doc", + "test_counter.count": 6 + }` + assert.JSONEq(t, expected, received) +} diff --git a/reporter.go b/reporter.go index 63aa594..8b7f62b 100644 --- a/reporter.go +++ b/reporter.go @@ -17,16 +17,16 @@ type Reporter struct { Registry metrics.Registry // Conn is a UDP connection to logstash. Conn *net.UDPConn - // Name of this reporter - Name string - Version string + // DefaultValues are the values that will be sent in all submits. + DefaultValues map[string]interface{} + Version string percentiles []float64 udpAddr *net.UDPAddr } -// NewReporter creates a new Reporter with a pre-configured statsd client. -func NewReporter(r metrics.Registry, addr string, name string) (*Reporter, error) { +// NewReporter creates a new Reporter with an UDP client to the given logstash address. +func NewReporter(r metrics.Registry, addr string, defaultValues map[string]interface{}) (*Reporter, error) { if r == nil { r = metrics.DefaultRegistry } @@ -41,10 +41,10 @@ func NewReporter(r metrics.Registry, addr string, name string) (*Reporter, error } return &Reporter{ - Conn: conn, - Registry: r, - Name: name, - Version: "0.1.1", + Conn: conn, + Registry: r, + DefaultValues: defaultValues, + Version: "0.1.1", udpAddr: udpAddr, percentiles: []float64{0.50, 0.75, 0.95, 0.99, 0.999}, @@ -69,10 +69,10 @@ func (r *Reporter) FlushEach(interval time.Duration) { // FlushOnce submits a snapshot of the registry. func (r *Reporter) FlushOnce() error { - m := map[string]interface{}{ - "metric": "doc", - "client": r.Name, - "count": 1, + m := make(map[string]interface{}) + // Copy default values + for k, v := range r.DefaultValues { + m[k] = v } r.Registry.Each(func(name string, i interface{}) { From a7a1e47ef37a27cfbc1c2889c4cbfd0311337824 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 16:11:24 -0300 Subject: [PATCH 16/21] docs: update readme's example with new parameter --- README.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 2a03599..5ca805a 100644 --- a/README.md +++ b/README.md @@ -29,9 +29,11 @@ func main() { metrics.RegisterRuntimeMemStats(registry) reporter, err := logstash.NewReporter( - registry, // go-metrics registry, or nil - "127.0.0.1:1984", // logstash UDP address, - "my-app", // reporter's name + registry, // go-metrics registry, or nil + "127.0.0.1:1984", // logstash UDP address, + map[string]interface{}{ // default values to be sent at each flush + "client": "my-app", + } ) if err != nil { log.Fatal(err) From dfe609116bfd5840be79adbfb1c97912fdf6ba29 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 17:35:07 -0300 Subject: [PATCH 17/21] docs: add ci and godocs badges --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5ca805a..02e11a5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# go-metrics logstash +# go-metrics logstash [![Build Status](https://travis-ci.com/jonathansp/go-metrics-logstash.svg?branch=master)](https://travis-ci.com/jonathansp/go-metrics-logstash) [![GoDoc](https://godoc.org/github.com/jonathansp/go-metrics-logstash?status.svg)](http://godoc.org/github.com/jonathansp/go-metrics-logstash) This package provides a reporter for the [go-metrics](https://github.com/rcrowley/go-metrics) library that will post the metrics to logstash. This library is based on [go-metrics-datadog](https://github.com/syntaqx/go-metrics-datadog). From 0383f11628c92ab12f89840a234a1e260b74468d Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 17:36:49 -0300 Subject: [PATCH 18/21] refactor: remove unused field --- reporter.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/reporter.go b/reporter.go index 8b7f62b..5118c85 100644 --- a/reporter.go +++ b/reporter.go @@ -22,7 +22,6 @@ type Reporter struct { Version string percentiles []float64 - udpAddr *net.UDPAddr } // NewReporter creates a new Reporter with an UDP client to the given logstash address. @@ -46,7 +45,6 @@ func NewReporter(r metrics.Registry, addr string, defaultValues map[string]inter DefaultValues: defaultValues, Version: "0.1.1", - udpAddr: udpAddr, percentiles: []float64{0.50, 0.75, 0.95, 0.99, 0.999}, }, nil } From a8c11a5552b7881b31ec1141824d15e987117963 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 17:44:51 -0300 Subject: [PATCH 19/21] docs: add more details to NewReporter --- reporter.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/reporter.go b/reporter.go index 5118c85..d088947 100644 --- a/reporter.go +++ b/reporter.go @@ -24,7 +24,9 @@ type Reporter struct { percentiles []float64 } -// NewReporter creates a new Reporter with an UDP client to the given logstash address. +// NewReporter creates a new Reporter for the register r, with an UDP client to +// the given logstash address addr and with the given default values. If defaultValues +// is nil, only the metrics will be sent. func NewReporter(r metrics.Registry, addr string, defaultValues map[string]interface{}) (*Reporter, error) { if r == nil { r = metrics.DefaultRegistry From 98f1aee596cbb92fc3e5f5424f06388ce6193901 Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 17:45:04 -0300 Subject: [PATCH 20/21] docs: fix method name --- reporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reporter.go b/reporter.go index d088947..fd1f645 100644 --- a/reporter.go +++ b/reporter.go @@ -52,7 +52,7 @@ func NewReporter(r metrics.Registry, addr string, defaultValues map[string]inter } // FlushEach is a blocking exporter function which reports metrics in the registry. -// Designed to be used in a goroutine: go reporter.Flush() +// Designed to be used in a goroutine: go reporter.FlushEach() func (r *Reporter) FlushEach(interval time.Duration) { defer func() { if rec := recover(); rec != nil { From f7af88eadf07cf12be479467f658a4386432334d Mon Sep 17 00:00:00 2001 From: Igor Cescon de Moura Date: Tue, 3 Jul 2018 18:52:28 -0300 Subject: [PATCH 21/21] feat: expose percentiles --- reporter.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/reporter.go b/reporter.go index fd1f645..05d4d32 100644 --- a/reporter.go +++ b/reporter.go @@ -20,8 +20,8 @@ type Reporter struct { // DefaultValues are the values that will be sent in all submits. DefaultValues map[string]interface{} Version string - - percentiles []float64 + // Percentiles to be sent on histograms and timers + Percentiles []float64 } // NewReporter creates a new Reporter for the register r, with an UDP client to @@ -47,7 +47,7 @@ func NewReporter(r metrics.Registry, addr string, defaultValues map[string]inter DefaultValues: defaultValues, Version: "0.1.1", - percentiles: []float64{0.50, 0.75, 0.95, 0.99, 0.999}, + Percentiles: []float64{0.50, 0.75, 0.95, 0.99, 0.999}, }, nil } @@ -95,7 +95,7 @@ func (r *Reporter) FlushOnce() error { m[fmt.Sprintf("%s.stddev", name)] = ms.StdDev() m[fmt.Sprintf("%s.var", name)] = ms.Variance() - for _, p := range r.percentiles { + for _, p := range r.Percentiles { pStr := strings.Replace(fmt.Sprintf("p%g", p*100), ".", "_", -1) m[fmt.Sprintf("%s.%s", name, pStr)] = ms.Percentile(p) } @@ -116,7 +116,7 @@ func (r *Reporter) FlushOnce() error { m[fmt.Sprintf("%s.mean", name)] = time.Duration(ms.Mean()).Seconds() * 1000 m[fmt.Sprintf("%s.stddev", name)] = time.Duration(ms.StdDev()).Seconds() * 1000 - for _, p := range r.percentiles { + for _, p := range r.Percentiles { duration := time.Duration(ms.Percentile(p)).Seconds() * 1000 pStr := strings.Replace(fmt.Sprintf("p%g", p*100), ".", "_", -1) m[fmt.Sprintf("%s.%s", name, pStr)] = duration