Skip to content

Commit

Permalink
Add M3 reporter example using YAML config file (uber-go#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington authored May 9, 2017
1 parent d0a3e63 commit df06c47
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 2 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
_obj
_test
vendor
bin

# Architecture specific extensions/prefixes
*.[568vq]
Expand Down
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ endif
test:
go test -race -v $(PKGS)

.PHONY: examples
examples:
mkdir -p ./bin
go build -o ./bin/print_example ./example/
go build -o ./bin/m3_example ./m3/example/
go build -o ./bin/prometheus_example ./prometheus/example/
go build -o ./bin/statsd_example ./statsd/example/

.PHONY: cover
cover:
go test -cover -coverprofile cover.out -race -v $(PKGS)
Expand Down
3 changes: 3 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Print reporter example

`go run ./*.go`
8 changes: 6 additions & 2 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ testImport:
subpackages:
- assert
- require
- package: gopkg.in/validator.v2
- package: gopkg.in/yaml.v2

3 changes: 3 additions & 0 deletions m3/example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# M3 reporter example

`go run ./*.go`
8 changes: 8 additions & 0 deletions m3/example/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
m3:
hostPort: 127.0.0.1:6396
service: m3example
env: staging
tags:
cluster: local
region: us-east

110 changes: 110 additions & 0 deletions m3/example/local_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package main

import (
"bytes"
"fmt"
"net"
"sync/atomic"

"github.com/uber-go/tally/m3"
customtransport "github.com/uber-go/tally/m3/customtransports"
m3thrift "github.com/uber-go/tally/m3/thrift"

"github.com/apache/thrift/lib/go/thrift"
)

type batchCallback func(batch *m3thrift.MetricBatch)

type localM3Server struct {
Service *localM3Service
Addr string
protocol m3.Protocol
processor thrift.TProcessor
conn *net.UDPConn
closed int32
}

func newLocalM3Server(
listenAddr string,
protocol m3.Protocol,
fn batchCallback,
) (*localM3Server, error) {
udpAddr, err := net.ResolveUDPAddr("udp", listenAddr)
if err != nil {
return nil, err
}

service := newLocalM3Service(fn)
processor := m3thrift.NewM3Processor(service)
conn, err := net.ListenUDP(udpAddr.Network(), udpAddr)
if err != nil {
return nil, err
}

return &localM3Server{
Service: service,
Addr: conn.LocalAddr().String(),
conn: conn,
protocol: protocol,
processor: processor,
}, nil
}

func (f *localM3Server) Serve() error {
readBuf := make([]byte, 65536)
for {
n, err := f.conn.Read(readBuf)
if err != nil {
if atomic.LoadInt32(&f.closed) == 0 {
return fmt.Errorf("failed to read: %v", err)
}
return nil
}
trans, _ := customtransport.NewTBufferedReadTransport(bytes.NewBuffer(readBuf[0:n]))
var proto thrift.TProtocol
if f.protocol == m3.Compact {
proto = thrift.NewTCompactProtocol(trans)
} else {
proto = thrift.NewTBinaryProtocolTransport(trans)
}
f.processor.Process(proto, proto)
}
}

func (f *localM3Server) Close() error {
atomic.AddInt32(&f.closed, 1)
return f.conn.Close()
}

type localM3Service struct {
fn batchCallback
}

func newLocalM3Service(fn batchCallback) *localM3Service {
return &localM3Service{fn: fn}
}

func (m *localM3Service) EmitMetricBatch(batch *m3thrift.MetricBatch) (err error) {
m.fn(batch)
return thrift.NewTTransportException(thrift.END_OF_FILE, "complete")
}
163 changes: 163 additions & 0 deletions m3/example/m3_main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package main

import (
"flag"
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"time"

"github.com/uber-go/tally"
"github.com/uber-go/tally/m3"
m3thrift "github.com/uber-go/tally/m3/thrift"

validator "gopkg.in/validator.v2"
yaml "gopkg.in/yaml.v2"
)

var configFileArg = flag.String("config", "config.yaml", "YAML config file path")

type config struct {
M3 m3.Configuration `yaml:"m3"`
}

func configFromFile(file string) (*config, error) {
fd, err := os.Open(file)
if err != nil {
return nil, err
}

defer fd.Close()

data, err := ioutil.ReadAll(fd)
if err != nil {
return nil, err
}

cfg := &config{}

if err := yaml.Unmarshal(data, cfg); err != nil {
return nil, err
}
if err := validator.Validate(cfg); err != nil {
return nil, err
}

return cfg, nil
}

func main() {
flag.Parse()
configFile := *configFileArg
if configFile == "" {
flag.Usage()
return
}

cfg, err := configFromFile(configFile)
if err != nil {
log.Fatalf("failed to read config file %s: %v", configFile, err)
}

r, err := cfg.M3.NewReporter()
if err != nil {
log.Fatalf("failed to create reporter: %v", err)
}

scope, closer := tally.NewRootScope(tally.ScopeOptions{
CachedReporter: r,
}, 1*time.Second)

defer closer.Close()

counter := scope.Tagged(map[string]string{
"foo": "bar",
}).Counter("test_counter")

gauge := scope.Tagged(map[string]string{
"foo": "baz",
}).Gauge("test_gauge")

timer := scope.Tagged(map[string]string{
"foo": "qux",
}).Timer("test_timer_summary")

histogram := scope.Tagged(map[string]string{
"foo": "quk",
}).Histogram("test_histogram", tally.DefaultBuckets)

go func() {
for {
counter.Inc(1)
time.Sleep(time.Second)
}
}()

go func() {
for {
gauge.Update(rand.Float64() * 1000)
time.Sleep(time.Second)
}
}()

go func() {
for {
tsw := timer.Start()
hsw := histogram.Start()
time.Sleep(time.Duration(rand.Float64() * float64(time.Second)))
tsw.Stop()
hsw.Stop()
}
}()

srv, err := newLocalM3Server("127.0.0.1:6396", m3.Compact, func(b *m3thrift.MetricBatch) {
for _, m := range b.Metrics {
tags := make(map[string]string)
for tag := range b.CommonTags {
tags[tag.GetTagName()] = tag.GetTagValue()
}
for tag := range m.Tags {
tags[tag.GetTagName()] = tag.GetTagValue()
}
metVal := m.GetMetricValue()
switch {
case metVal != nil && metVal.Count != nil:
fmt.Printf("counter value: %d, tags: %v\n", metVal.Count.GetI64Value(), tags)
case metVal != nil && metVal.Gauge != nil && metVal.Gauge.I64Value != nil:
fmt.Printf("gauge value: %d, tags: %v\n", metVal.Gauge.GetI64Value(), tags)
case metVal != nil && metVal.Gauge != nil && metVal.Gauge.DValue != nil:
fmt.Printf("gauge value: %f, tags: %v\n", metVal.Gauge.GetDValue(), tags)
case metVal != nil && metVal.Timer != nil:
fmt.Printf("timer value: %v, tags: %v\n", time.Duration(metVal.Timer.GetI64Value()), tags)
}
}
})
if err != nil {
log.Fatalf("failed to create test listen server: %v", err)
}
if err := srv.Serve(); err != nil {
log.Fatalf("failed to serve test listen server: %v", err)
}
}
3 changes: 3 additions & 0 deletions prometheus/example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Prometheus reporter example

`go run ./*.go`
3 changes: 3 additions & 0 deletions statsd/example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Statsd reporter example

`go run ./*.go`

0 comments on commit df06c47

Please sign in to comment.