diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..daf3fb8 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,2 @@ +language: go +script: go test -v ./... diff --git a/README.md b/README.md index 2a03599..02e11a5 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# go-metrics logstash +# go-metrics logstash [![Build Status](https://travis-ci.com/jonathansp/go-metrics-logstash.svg?branch=master)](https://travis-ci.com/jonathansp/go-metrics-logstash) [![GoDoc](https://godoc.org/github.com/jonathansp/go-metrics-logstash?status.svg)](http://godoc.org/github.com/jonathansp/go-metrics-logstash) This package provides a reporter for the [go-metrics](https://github.com/rcrowley/go-metrics) library that will post the metrics to logstash. This library is based on [go-metrics-datadog](https://github.com/syntaqx/go-metrics-datadog). @@ -29,9 +29,11 @@ func main() { metrics.RegisterRuntimeMemStats(registry) reporter, err := logstash.NewReporter( - registry, // go-metrics registry, or nil - "127.0.0.1:1984", // logstash UDP address, - "my-app", // reporter's name + registry, // go-metrics registry, or nil + "127.0.0.1:1984", // logstash UDP address, + map[string]interface{}{ // default values to be sent at each flush + "client": "my-app", + } ) if err != nil { log.Fatal(err) diff --git a/integration_test.go b/integration_test.go new file mode 100644 index 0000000..f0929f6 --- /dev/null +++ b/integration_test.go @@ -0,0 +1,175 @@ +package logstash + +import ( + "net" + "strconv" + "strings" + "testing" + + metrics "github.com/rcrowley/go-metrics" + "github.com/stretchr/testify/assert" +) + +type UDPServer struct { + conn *net.UDPConn +} + +func newUDPServer(port int) (*UDPServer, error) { + serverAddr, err := net.ResolveUDPAddr("udp", ":"+strconv.Itoa(port)) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", serverAddr) + if err != nil { + return nil, err + } + return &UDPServer{conn}, nil +} + +func (us *UDPServer) Read() (string, error) { + buffer := make([]byte, 4096) + _, _, err := us.conn.ReadFromUDP(buffer) + if err != nil { + return "", err + } + resizedStr := strings.Trim(string(buffer), "\x00") // Remove the empty chars at the end of the buffer + return resizedStr, nil +} + +func (us *UDPServer) Close() { + us.conn.Close() +} + +func TestFlushOnce(t *testing.T) { + serverAddr := "localhost:1984" + server, err := newUDPServer(1984) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + registry := metrics.NewRegistry() + reporter, err := NewReporter(registry, serverAddr, nil) + + // Insert metrics + metrics.GetOrRegisterCounter("test_counter", registry).Inc(6) + metrics.GetOrRegisterCounter("test_counter", registry).Inc(2) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(2) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(3) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(4) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(5) + sample := metrics.NewUniformSample(2) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(9) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(10) + // TODO test meter and timer + reporter.FlushOnce() + + received, err := server.Read() + if err != nil { + t.Fatal(err) + } + + expected := `{ + "test_counter.count": 8, + "test_gauge": 3, + "test_gaugeFloat64": 5, + "test_histogram.count": 2, + "test_histogram.min": 9, + "test_histogram.max": 10, + "test_histogram.mean": 9.5, + "test_histogram.stddev": 0.5, + "test_histogram.var": 0.25, + "test_histogram.p50": 9.5, + "test_histogram.p75": 10, + "test_histogram.p95": 10, + "test_histogram.p99": 10, + "test_histogram.p99_9": 10 + }` + assert.JSONEq(t, expected, received) +} + +func TestFlushOnceKeepsPreviousValues(t *testing.T) { + serverAddr := "localhost:1984" + server, err := newUDPServer(1984) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + registry := metrics.NewRegistry() + reporter, err := NewReporter(registry, serverAddr, nil) + + // Insert metrics + sample := metrics.NewUniformSample(3) + metrics.GetOrRegisterCounter("test_counter", registry).Inc(6) + metrics.GetOrRegisterCounter("test_counter", registry).Inc(2) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(2) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(3) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(4) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(5) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(9) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(10) + reporter.FlushOnce() + server.Read() // Ignore current values + + metrics.GetOrRegisterCounter("test_counter", registry).Inc(4) + metrics.GetOrRegisterGauge("test_gauge", registry).Update(8) + metrics.GetOrRegisterGaugeFloat64("test_gaugeFloat64", registry).Update(9) + metrics.GetOrRegisterHistogram("test_histogram", registry, sample).Update(12) + // TODO test meter and timer + reporter.FlushOnce() + + received, err := server.Read() + if err != nil { + t.Fatal(err) + } + + expected := `{ + "test_counter.count": 12, + "test_gauge": 8, + "test_gaugeFloat64": 9, + "test_histogram.count": 3, + "test_histogram.min": 9, + "test_histogram.max": 12, + "test_histogram.mean": 10.333333333333334, + "test_histogram.stddev": 1.247219128924647, + "test_histogram.var": 1.5555555555555556, + "test_histogram.p50": 10, + "test_histogram.p75": 12, + "test_histogram.p95": 12, + "test_histogram.p99": 12, + "test_histogram.p99_9": 12 + }` + assert.JSONEq(t, expected, received) +} + +func TestFlushOnceWithDefaultValues(t *testing.T) { + serverAddr := "localhost:1984" + server, err := newUDPServer(1984) + if err != nil { + t.Fatal(err) + } + defer server.Close() + + registry := metrics.NewRegistry() + reporter, err := NewReporter(registry, serverAddr, map[string]interface{}{ + "client": "dummy-client", + "metric": "doc", + }) + + // Insert metrics + metrics.GetOrRegisterCounter("test_counter", registry).Inc(6) + reporter.FlushOnce() + + received, err := server.Read() + if err != nil { + t.Fatal(err) + } + + expected := `{ + "client":"dummy-client", + "metric": "doc", + "test_counter.count": 6 + }` + assert.JSONEq(t, expected, received) +} diff --git a/metrics.go b/metrics.go deleted file mode 100644 index 8f4c8ce..0000000 --- a/metrics.go +++ /dev/null @@ -1,61 +0,0 @@ -package logstash - -import ( - "encoding/json" - "errors" - "sync" -) - -// Metrics represents a metric that will be sent to logstash -type Metrics struct { - data map[string]interface{} - name string - sync.RWMutex -} - -// NewMetrics Metric{} constructor -func NewMetrics(name string) *Metrics { - return &Metrics{ - data: new(name), - } -} - -func new(name string) map[string]interface{} { - return map[string]interface{}{ - "metric": "doc", - "client": name, - "count": 1, - } -} - -func (m *Metrics) register(name string, value interface{}) error { - m.RLock() - defer m.RUnlock() - - if name == "" { - return errors.New("Invalid metric name") - } - m.data[name] = value - return nil -} - -// Gauge register a new gauge metric -func (m *Metrics) Gauge(name string, value interface{}) error { - return m.register(name+".gauge", value) -} - -// Count register a new gaugeFloat64 metric -func (m *Metrics) Count(name string, value int64) error { - return m.register(name, value) -} - -// Clear clears current buffer -func (m *Metrics) Clear() { - m.data = new(m.name) -} - -// ToJSON serializes data to json -func (m *Metrics) ToJSON() []byte { - data, _ := json.Marshal(m.data) - return data -} diff --git a/reporter.go b/reporter.go index bd8c946..05d4d32 100644 --- a/reporter.go +++ b/reporter.go @@ -1,8 +1,11 @@ package logstash import ( + "encoding/json" + "fmt" "log" "net" + "strings" "time" metrics "github.com/rcrowley/go-metrics" @@ -14,18 +17,17 @@ type Reporter struct { Registry metrics.Registry // Conn is a UDP connection to logstash. Conn *net.UDPConn - // Name of this reporter - Name string - Version string - - percentiles []float64 - p []string - ss map[string]int64 - udpAddr *net.UDPAddr + // DefaultValues are the values that will be sent in all submits. + DefaultValues map[string]interface{} + Version string + // Percentiles to be sent on histograms and timers + Percentiles []float64 } -// NewReporter creates a new Reporter with a pre-configured statsd client. -func NewReporter(r metrics.Registry, addr string, name string) (*Reporter, error) { +// NewReporter creates a new Reporter for the register r, with an UDP client to +// the given logstash address addr and with the given default values. If defaultValues +// is nil, only the metrics will be sent. +func NewReporter(r metrics.Registry, addr string, defaultValues map[string]interface{}) (*Reporter, error) { if r == nil { r = metrics.DefaultRegistry } @@ -40,19 +42,17 @@ func NewReporter(r metrics.Registry, addr string, name string) (*Reporter, error } return &Reporter{ - Conn: conn, - Registry: r, - Name: name, - Version: "0.1.1", - - udpAddr: udpAddr, - percentiles: []float64{0.50, 0.75, 0.95, 0.99, 0.999}, - ss: make(map[string]int64), + Conn: conn, + Registry: r, + DefaultValues: defaultValues, + Version: "0.1.1", + + Percentiles: []float64{0.50, 0.75, 0.95, 0.99, 0.999}, }, nil } // FlushEach is a blocking exporter function which reports metrics in the registry. -// Designed to be used in a goroutine: go reporter.Flush() +// Designed to be used in a goroutine: go reporter.FlushEach() func (r *Reporter) FlushEach(interval time.Duration) { defer func() { if rec := recover(); rec != nil { @@ -69,63 +69,65 @@ func (r *Reporter) FlushEach(interval time.Duration) { // FlushOnce submits a snapshot of the registry. func (r *Reporter) FlushOnce() error { - m := NewMetrics(r.Name) + m := make(map[string]interface{}) + // Copy default values + for k, v := range r.DefaultValues { + m[k] = v + } r.Registry.Each(func(name string, i interface{}) { switch metric := i.(type) { case metrics.Counter: - v := metric.Count() - l := r.ss[name] - m.Count(name, v-l) - r.ss[name] = v + m[fmt.Sprintf("%s.count", name)] = metric.Count() case metrics.Gauge: - m.Gauge(name, float64(metric.Value())) + m[name] = float64(metric.Value()) case metrics.GaugeFloat64: - m.Gauge(name, metric.Value()) + m[name] = metric.Value() case metrics.Histogram: ms := metric.Snapshot() - m.Gauge(name+".count", float64(ms.Count())) - m.Gauge(name+".max", float64(ms.Max())) - m.Gauge(name+".min", float64(ms.Min())) - m.Gauge(name+".mean", ms.Mean()) - m.Gauge(name+".stddev", ms.StdDev()) - m.Gauge(name+".var", ms.Variance()) - - if len(r.percentiles) > 0 { - values := ms.Percentiles(r.percentiles) - for i, p := range r.p { - m.Gauge(name+p, values[i]) - } + m[fmt.Sprintf("%s.count", name)] = float64(ms.Count()) + m[fmt.Sprintf("%s.max", name)] = float64(ms.Max()) + m[fmt.Sprintf("%s.min", name)] = float64(ms.Min()) + m[fmt.Sprintf("%s.mean", name)] = ms.Mean() + m[fmt.Sprintf("%s.stddev", name)] = ms.StdDev() + m[fmt.Sprintf("%s.var", name)] = ms.Variance() + + for _, p := range r.Percentiles { + pStr := strings.Replace(fmt.Sprintf("p%g", p*100), ".", "_", -1) + m[fmt.Sprintf("%s.%s", name, pStr)] = ms.Percentile(p) } case metrics.Meter: ms := metric.Snapshot() - m.Gauge(name+".count", float64(ms.Count())) - m.Gauge(name+".rate1", ms.Rate1()) - m.Gauge(name+".rate5", ms.Rate5()) - m.Gauge(name+".rate15", ms.Rate15()) - m.Gauge(name+".mean", ms.RateMean()) + m[fmt.Sprintf("%s.count", name)] = float64(ms.Count()) + m[fmt.Sprintf("%s.rate1", name)] = ms.Rate1() + m[fmt.Sprintf("%s.rate5", name)] = ms.Rate5() + m[fmt.Sprintf("%s.rate15", name)] = ms.Rate15() + m[fmt.Sprintf("%s.mean", name)] = ms.RateMean() case metrics.Timer: ms := metric.Snapshot() - m.Gauge(name+".count", float64(ms.Count())) - m.Gauge(name+".max", time.Duration(ms.Max()).Seconds()*1000) - m.Gauge(name+".min", time.Duration(ms.Min()).Seconds()*1000) - m.Gauge(name+".mean", time.Duration(ms.Mean()).Seconds()*1000) - m.Gauge(name+".stddev", time.Duration(ms.StdDev()).Seconds()*1000) - - if len(r.percentiles) > 0 { - values := ms.Percentiles(r.percentiles) - for i, p := range r.p { - m.Gauge(name+p, time.Duration(values[i]).Seconds()*1000) - } + m[fmt.Sprintf("%s.count", name)] = float64(ms.Count()) + m[fmt.Sprintf("%s.max", name)] = time.Duration(ms.Max()).Seconds() * 1000 + m[fmt.Sprintf("%s.min", name)] = time.Duration(ms.Min()).Seconds() * 1000 + m[fmt.Sprintf("%s.mean", name)] = time.Duration(ms.Mean()).Seconds() * 1000 + m[fmt.Sprintf("%s.stddev", name)] = time.Duration(ms.StdDev()).Seconds() * 1000 + + for _, p := range r.Percentiles { + duration := time.Duration(ms.Percentile(p)).Seconds() * 1000 + pStr := strings.Replace(fmt.Sprintf("p%g", p*100), ".", "_", -1) + m[fmt.Sprintf("%s.%s", name, pStr)] = duration } } }) - r.Conn.Write(m.ToJSON()) - m.Clear() + + data, err := json.Marshal(m) + if err != nil { + return err + } + r.Conn.Write(data) return nil }