From df06c472a2df4f8a1934c8c82aa0d1ee9582078d Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 9 May 2017 18:38:22 -0400 Subject: [PATCH] Add M3 reporter example using YAML config file (#43) --- .gitignore | 1 + Makefile | 8 ++ example/README.md | 3 + glide.lock | 8 +- glide.yaml | 3 + m3/example/README.md | 3 + m3/example/config.yaml | 8 ++ m3/example/local_server.go | 110 +++++++++++++++++++++++ m3/example/m3_main.go | 163 +++++++++++++++++++++++++++++++++++ prometheus/example/README.md | 3 + statsd/example/README.md | 3 + 11 files changed, 311 insertions(+), 2 deletions(-) create mode 100644 example/README.md create mode 100644 m3/example/README.md create mode 100644 m3/example/config.yaml create mode 100644 m3/example/local_server.go create mode 100644 m3/example/m3_main.go create mode 100644 prometheus/example/README.md create mode 100644 statsd/example/README.md diff --git a/.gitignore b/.gitignore index e8fbd1fa..27ccac90 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ _obj _test vendor +bin # Architecture specific extensions/prefixes *.[568vq] diff --git a/Makefile b/Makefile index 6670a707..9e81ca21 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,14 @@ endif test: go test -race -v $(PKGS) +.PHONY: examples +examples: + mkdir -p ./bin + go build -o ./bin/print_example ./example/ + go build -o ./bin/m3_example ./m3/example/ + go build -o ./bin/prometheus_example ./prometheus/example/ + go build -o ./bin/statsd_example ./statsd/example/ + .PHONY: cover cover: go test -cover -coverprofile cover.out -race -v $(PKGS) diff --git a/example/README.md b/example/README.md new file mode 100644 index 00000000..fd195b0e --- /dev/null +++ b/example/README.md @@ -0,0 +1,3 @@ +# Print reporter example + +`go run ./*.go` diff --git a/glide.lock b/glide.lock index 8132adc4..2045d77c 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: cdbc433d199ac0d184d56f461f87cb93db4f60658218f85a2b39c4614e4c1c68 -updated: 2017-02-05T22:15:53.246370902-05:00 +hash: 307f62540a5487b27efc13eb3e2c4a67bf9b50b91d5857067beca6df0f3cd8cc +updated: 2017-05-09T18:18:15.264370538-04:00 imports: - name: github.com/apache/thrift version: 9549b25c77587b29be4e0b5c258221a4ed85d37a @@ -42,6 +42,10 @@ imports: version: fcdb11ccb4389efb1b210b7ffb623ab71c5fdd60 - name: github.com/uber-go/atomic version: e682c1008ac17bf26d2e4b5ad6cdd08520ed0b22 +- name: gopkg.in/validator.v2 + version: 3e4f037f12a1221a0864cf0dd2e81c452ab22448 +- name: gopkg.in/yaml.v2 + version: a83829b6f1293c91addabc89d0571c246397bbf4 testImports: - name: github.com/axw/gocov version: 54b98cfcac0c63fb3f9bd8e7ad241b724d4e985b diff --git a/glide.yaml b/glide.yaml index 2e43ca80..f5d36d31 100644 --- a/glide.yaml +++ b/glide.yaml @@ -39,3 +39,6 @@ testImport: subpackages: - assert - require +- package: gopkg.in/validator.v2 +- package: gopkg.in/yaml.v2 + diff --git a/m3/example/README.md b/m3/example/README.md new file mode 100644 index 00000000..5ad3d235 --- /dev/null +++ b/m3/example/README.md @@ -0,0 +1,3 @@ +# M3 reporter example + +`go run ./*.go` diff --git a/m3/example/config.yaml b/m3/example/config.yaml new file mode 100644 index 00000000..bda1a08d --- /dev/null +++ b/m3/example/config.yaml @@ -0,0 +1,8 @@ +m3: + hostPort: 127.0.0.1:6396 + service: m3example + env: staging + tags: + cluster: local + region: us-east + diff --git a/m3/example/local_server.go b/m3/example/local_server.go new file mode 100644 index 00000000..1ebcc820 --- /dev/null +++ b/m3/example/local_server.go @@ -0,0 +1,110 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "bytes" + "fmt" + "net" + "sync/atomic" + + "github.com/uber-go/tally/m3" + customtransport "github.com/uber-go/tally/m3/customtransports" + m3thrift "github.com/uber-go/tally/m3/thrift" + + "github.com/apache/thrift/lib/go/thrift" +) + +type batchCallback func(batch *m3thrift.MetricBatch) + +type localM3Server struct { + Service *localM3Service + Addr string + protocol m3.Protocol + processor thrift.TProcessor + conn *net.UDPConn + closed int32 +} + +func newLocalM3Server( + listenAddr string, + protocol m3.Protocol, + fn batchCallback, +) (*localM3Server, error) { + udpAddr, err := net.ResolveUDPAddr("udp", listenAddr) + if err != nil { + return nil, err + } + + service := newLocalM3Service(fn) + processor := m3thrift.NewM3Processor(service) + conn, err := net.ListenUDP(udpAddr.Network(), udpAddr) + if err != nil { + return nil, err + } + + return &localM3Server{ + Service: service, + Addr: conn.LocalAddr().String(), + conn: conn, + protocol: protocol, + processor: processor, + }, nil +} + +func (f *localM3Server) Serve() error { + readBuf := make([]byte, 65536) + for { + n, err := f.conn.Read(readBuf) + if err != nil { + if atomic.LoadInt32(&f.closed) == 0 { + return fmt.Errorf("failed to read: %v", err) + } + return nil + } + trans, _ := customtransport.NewTBufferedReadTransport(bytes.NewBuffer(readBuf[0:n])) + var proto thrift.TProtocol + if f.protocol == m3.Compact { + proto = thrift.NewTCompactProtocol(trans) + } else { + proto = thrift.NewTBinaryProtocolTransport(trans) + } + f.processor.Process(proto, proto) + } +} + +func (f *localM3Server) Close() error { + atomic.AddInt32(&f.closed, 1) + return f.conn.Close() +} + +type localM3Service struct { + fn batchCallback +} + +func newLocalM3Service(fn batchCallback) *localM3Service { + return &localM3Service{fn: fn} +} + +func (m *localM3Service) EmitMetricBatch(batch *m3thrift.MetricBatch) (err error) { + m.fn(batch) + return thrift.NewTTransportException(thrift.END_OF_FILE, "complete") +} diff --git a/m3/example/m3_main.go b/m3/example/m3_main.go new file mode 100644 index 00000000..de9d740e --- /dev/null +++ b/m3/example/m3_main.go @@ -0,0 +1,163 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package main + +import ( + "flag" + "fmt" + "io/ioutil" + "log" + "math/rand" + "os" + "time" + + "github.com/uber-go/tally" + "github.com/uber-go/tally/m3" + m3thrift "github.com/uber-go/tally/m3/thrift" + + validator "gopkg.in/validator.v2" + yaml "gopkg.in/yaml.v2" +) + +var configFileArg = flag.String("config", "config.yaml", "YAML config file path") + +type config struct { + M3 m3.Configuration `yaml:"m3"` +} + +func configFromFile(file string) (*config, error) { + fd, err := os.Open(file) + if err != nil { + return nil, err + } + + defer fd.Close() + + data, err := ioutil.ReadAll(fd) + if err != nil { + return nil, err + } + + cfg := &config{} + + if err := yaml.Unmarshal(data, cfg); err != nil { + return nil, err + } + if err := validator.Validate(cfg); err != nil { + return nil, err + } + + return cfg, nil +} + +func main() { + flag.Parse() + configFile := *configFileArg + if configFile == "" { + flag.Usage() + return + } + + cfg, err := configFromFile(configFile) + if err != nil { + log.Fatalf("failed to read config file %s: %v", configFile, err) + } + + r, err := cfg.M3.NewReporter() + if err != nil { + log.Fatalf("failed to create reporter: %v", err) + } + + scope, closer := tally.NewRootScope(tally.ScopeOptions{ + CachedReporter: r, + }, 1*time.Second) + + defer closer.Close() + + counter := scope.Tagged(map[string]string{ + "foo": "bar", + }).Counter("test_counter") + + gauge := scope.Tagged(map[string]string{ + "foo": "baz", + }).Gauge("test_gauge") + + timer := scope.Tagged(map[string]string{ + "foo": "qux", + }).Timer("test_timer_summary") + + histogram := scope.Tagged(map[string]string{ + "foo": "quk", + }).Histogram("test_histogram", tally.DefaultBuckets) + + go func() { + for { + counter.Inc(1) + time.Sleep(time.Second) + } + }() + + go func() { + for { + gauge.Update(rand.Float64() * 1000) + time.Sleep(time.Second) + } + }() + + go func() { + for { + tsw := timer.Start() + hsw := histogram.Start() + time.Sleep(time.Duration(rand.Float64() * float64(time.Second))) + tsw.Stop() + hsw.Stop() + } + }() + + srv, err := newLocalM3Server("127.0.0.1:6396", m3.Compact, func(b *m3thrift.MetricBatch) { + for _, m := range b.Metrics { + tags := make(map[string]string) + for tag := range b.CommonTags { + tags[tag.GetTagName()] = tag.GetTagValue() + } + for tag := range m.Tags { + tags[tag.GetTagName()] = tag.GetTagValue() + } + metVal := m.GetMetricValue() + switch { + case metVal != nil && metVal.Count != nil: + fmt.Printf("counter value: %d, tags: %v\n", metVal.Count.GetI64Value(), tags) + case metVal != nil && metVal.Gauge != nil && metVal.Gauge.I64Value != nil: + fmt.Printf("gauge value: %d, tags: %v\n", metVal.Gauge.GetI64Value(), tags) + case metVal != nil && metVal.Gauge != nil && metVal.Gauge.DValue != nil: + fmt.Printf("gauge value: %f, tags: %v\n", metVal.Gauge.GetDValue(), tags) + case metVal != nil && metVal.Timer != nil: + fmt.Printf("timer value: %v, tags: %v\n", time.Duration(metVal.Timer.GetI64Value()), tags) + } + } + }) + if err != nil { + log.Fatalf("failed to create test listen server: %v", err) + } + if err := srv.Serve(); err != nil { + log.Fatalf("failed to serve test listen server: %v", err) + } +} diff --git a/prometheus/example/README.md b/prometheus/example/README.md new file mode 100644 index 00000000..d4a399aa --- /dev/null +++ b/prometheus/example/README.md @@ -0,0 +1,3 @@ +# Prometheus reporter example + +`go run ./*.go` diff --git a/statsd/example/README.md b/statsd/example/README.md new file mode 100644 index 00000000..8a5557dc --- /dev/null +++ b/statsd/example/README.md @@ -0,0 +1,3 @@ +# Statsd reporter example + +`go run ./*.go`