diff --git a/.gitignore b/.gitignore index e6bb7643..e8fbd1fa 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,6 @@ _testmain.go *.log .DS_Store +node_modules/ + diff --git a/.travis.yml b/.travis.yml index 80b70845..127fee99 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ cache: directories: - vendor install: + - npm i uber-licence - make dependencies script: - make test diff --git a/Makefile b/Makefile index 67d34b38..bf7378e7 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,8 @@ export GO15VENDOREXPERIMENT=1 BENCH_FLAGS ?= -cpuprofile=cpu.pprof -memprofile=mem.pprof -benchmem PKGS ?= $(shell glide novendor) -PKG_FILES ?= *.go example/*.go +PKG_FILES ?= *.go example/*.go m3 +LINT_IGNORE = m3/thrift # The linting tools evolve with each Go version, so run them only on the latest # stable release. @@ -37,15 +38,15 @@ lint: ifdef SHOULD_LINT @rm -rf lint.log @echo "Checking formatting..." - @gofmt -d -s $(PKG_FILES) 2>&1 | tee lint.log + @gofmt -d -s $(PKG_FILES) 2>&1 | grep -v $(LINT_IGNORE) | tee lint.log @echo "Installing test dependencies for vet..." @go test -i $(PKGS) @echo "Checking vet..." - @$(foreach dir,$(PKG_FILES),go tool vet $(dir) 2>&1 | tee -a lint.log;) + @$(foreach dir,$(PKG_FILES),go tool vet $(dir) 2>&1 | grep -v $(LINT_IGNORE) | tee -a lint.log;) @echo "Checking lint..." - @$(foreach dir,$(PKGS),golint $(dir) 2>&1 | tee -a lint.log;) + @$(foreach dir,$(PKGS),golint $(dir) 2>&1 | grep -v $(LINT_IGNORE) | tee -a lint.log;) @echo "Checking for unresolved FIXMEs..." - @git grep -i fixme | grep -v -e vendor -e Makefile | tee -a lint.log + @git grep -i fixme | grep -v -e vendor -e Makefile | grep -v $(LINT_IGNORE) | tee -a lint.log @echo "Checking for license headers..." @./check_license.sh | tee -a lint.log @[ ! -s lint.log ] @@ -55,11 +56,11 @@ endif .PHONY: test test: - go test -race $(PKGS) + go test -race -v $(PKGS) .PHONY: cover cover: - go test -cover -coverprofile cover.out -race . + go test -cover -coverprofile cover.out -race -v $(PKGS) .PHONY: coveralls coveralls: diff --git a/check_license.sh b/check_license.sh index 0b79efe3..f3b66823 100755 --- a/check_license.sh +++ b/check_license.sh @@ -1,15 +1,4 @@ #!/bin/bash -text=`head -1 LICENSE` - -ERROR_COUNT=0 -while read file -do - head -1 ${file} | grep -q "${text}" - if [ $? -ne 0 ]; then - echo "$file is missing license header." - ERROR_COUNT+=1 - fi -done < <(git ls-files "\.go") - -exit $ERROR_COUNT +./node_modules/.bin/uber-licence --version || npm i uber-licence@latest +./node_modules/.bin/uber-licence --dry --file "*.go" diff --git a/example/main.go b/example/main.go index fe44a3db..b5fc6b34 100644 --- a/example/main.go +++ b/example/main.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/glide.lock b/glide.lock index 86ae315d..8132adc4 100644 --- a/glide.lock +++ b/glide.lock @@ -1,6 +1,10 @@ -hash: 6f924ebe8f8cf2fc4ef2df8dff6ec1fb0ab38ee329c28e33e3c9d9d1ad6e50e5 -updated: 2016-12-22T11:33:11.939562737-05:00 +hash: cdbc433d199ac0d184d56f461f87cb93db4f60658218f85a2b39c4614e4c1c68 +updated: 2017-02-05T22:15:53.246370902-05:00 imports: +- name: github.com/apache/thrift + version: 9549b25c77587b29be4e0b5c258221a4ed85d37a + subpackages: + - lib/go/thrift - name: github.com/beorn7/perks version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 subpackages: @@ -36,13 +40,15 @@ imports: - model - name: github.com/prometheus/procfs version: fcdb11ccb4389efb1b210b7ffb623ab71c5fdd60 +- name: github.com/uber-go/atomic + version: e682c1008ac17bf26d2e4b5ad6cdd08520ed0b22 testImports: - name: github.com/axw/gocov version: 54b98cfcac0c63fb3f9bd8e7ad241b724d4e985b subpackages: - gocov - name: github.com/davecgh/go-spew - version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 + version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d subpackages: - spew - name: github.com/golang/lint @@ -54,7 +60,7 @@ testImports: - name: github.com/pborman/uuid version: c55201b036063326c5b1b89ccfe45a184973d073 - name: github.com/pmezard/go-difflib - version: 792786c7400a136282c1664665ae0a8db921c6c2 + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d subpackages: - difflib - name: github.com/stretchr/testify diff --git a/glide.yaml b/glide.yaml index 88512fd1..2e43ca80 100644 --- a/glide.yaml +++ b/glide.yaml @@ -11,6 +11,12 @@ import: version: ^0.8.0 subpackages: - prometheus +- package: github.com/apache/thrift + version: 9549b25c77587b29be4e0b5c258221a4ed85d37a + subpackages: + - lib/go/thrift +- package: github.com/uber-go/atomic + version: e682c1008ac17bf26d2e4b5ad6cdd08520ed0b22 testImport: - package: github.com/axw/gocov version: 54b98cfcac0c63fb3f9bd8e7ad241b724d4e985b diff --git a/m3/config.go b/m3/config.go new file mode 100644 index 00000000..86e690cc --- /dev/null +++ b/m3/config.go @@ -0,0 +1,65 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +// Configuration is a configuration for a M3 reporter. +type Configuration struct { + // HostPort is the host and port of the M3 server. + HostPort string `yaml:"hostPort" validate:"nonzero"` + + // HostPorts are the host and port of the M3 server. + HostPorts []string `yaml:"hostPorts"` + + // Service is the service tag to that this client emits. + Service string `yaml:"service" validate:"nonzero"` + + // Env is the env tag to use that this client emits. + Env string `yaml:"env" validate:"nonzero"` + + // CommonTags are tags that are common for all metrics this client emits. + CommonTags map[string]string `yaml:"tags" ` + + // Queue is the maximum metric queue size of client. + Queue int `yaml:"queue"` + + // PacketSize is the maximum packet size for a batch of metrics. + PacketSize int32 `yaml:"packetSize"` + + // IncludeHost is whether or not to include host tag. + IncludeHost bool `yaml:"includeHost"` +} + +// NewReporter creates a new M3 reporter from this configuration. +func (c Configuration) NewReporter() (Reporter, error) { + hostPorts := c.HostPorts + if len(hostPorts) == 0 { + hostPorts = []string{c.HostPort} + } + return NewReporter(Options{ + HostPorts: hostPorts, + Service: c.Service, + Env: c.Env, + CommonTags: c.CommonTags, + MaxQueueSize: c.Queue, + MaxPacketSizeBytes: c.PacketSize, + IncludeHost: c.IncludeHost, + }) +} diff --git a/m3/config_test.go b/m3/config_test.go new file mode 100644 index 00000000..e756d227 --- /dev/null +++ b/m3/config_test.go @@ -0,0 +1,62 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "testing" + + "github.com/uber-go/tally/m3/thriftudp" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestConfigSimple(t *testing.T) { + c := Configuration{ + HostPort: "127.0.0.1:9052", + Service: "my-service", + Env: "test", + } + r, err := c.NewReporter() + require.NoError(t, err) + + reporter := r.(*reporter) + _, ok := reporter.client.Transport.(*thriftudp.TUDPTransport) + assert.True(t, ok) + assert.True(t, tagEquals(reporter.commonTags, "service", "my-service")) + assert.True(t, tagEquals(reporter.commonTags, "env", "test")) +} + +func TestConfigMulti(t *testing.T) { + c := Configuration{ + HostPorts: []string{"127.0.0.1:9052", "127.0.0.1:9062"}, + Service: "my-service", + Env: "test", + } + r, err := c.NewReporter() + require.NoError(t, err) + + reporter := r.(*reporter) + _, ok := reporter.client.Transport.(*thriftudp.TMultiUDPTransport) + assert.True(t, ok) + assert.True(t, tagEquals(reporter.commonTags, "service", "my-service")) + assert.True(t, tagEquals(reporter.commonTags, "env", "test")) +} diff --git a/m3/customtransports/buffered_read_transport.go b/m3/customtransports/buffered_read_transport.go new file mode 100644 index 00000000..95b6d8f0 --- /dev/null +++ b/m3/customtransports/buffered_read_transport.go @@ -0,0 +1,79 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package customtransport + +import ( + "bytes" + + "github.com/apache/thrift/lib/go/thrift" +) + +// TBufferedReadTransport is a thrift.TTransport that reads from a buffer +type TBufferedReadTransport struct { + readBuf *bytes.Buffer +} + +// NewTBufferedReadTransport creates a buffer backed TTransport +func NewTBufferedReadTransport(readBuf *bytes.Buffer) (*TBufferedReadTransport, error) { + return &TBufferedReadTransport{readBuf: readBuf}, nil +} + +// IsOpen does nothing as transport is not maintaining the connection +// Required to maintain thrift.TTransport interface +func (p *TBufferedReadTransport) IsOpen() bool { + return true +} + +// Open does nothing as transport is not maintaining the connection +// Required to maintain thrift.TTransport interface +func (p *TBufferedReadTransport) Open() error { + return nil +} + +// Close does nothing as transport is not maintaining the connection +// Required to maintain thrift.TTransport interface +func (p *TBufferedReadTransport) Close() error { + return nil +} + +// Read reads bytes from the local buffer and puts them in the specified buf +func (p *TBufferedReadTransport) Read(buf []byte) (int, error) { + in, err := p.readBuf.Read(buf) + return in, thrift.NewTTransportExceptionFromError(err) +} + +// RemainingBytes returns the number of bytes left to be read from the readBuf +func (p *TBufferedReadTransport) RemainingBytes() uint64 { + return uint64(p.readBuf.Len()) +} + +// Write writes bytes into the read buffer +// Required to maintain thrift.TTransport interface +func (p *TBufferedReadTransport) Write(buf []byte) (int, error) { + p.readBuf = bytes.NewBuffer(buf) + return len(buf), nil +} + +// Flush does nothing as udp server does not write responses back +// Required to maintain thrift.TTransport interface +func (p *TBufferedReadTransport) Flush() error { + return nil +} diff --git a/m3/customtransports/buffered_read_transport_test.go b/m3/customtransports/buffered_read_transport_test.go new file mode 100644 index 00000000..411f83c0 --- /dev/null +++ b/m3/customtransports/buffered_read_transport_test.go @@ -0,0 +1,74 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package customtransport + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestTBufferedReadTransport tests the TBufferedReadTransport +func TestTBufferedReadTransport(t *testing.T) { + buffer := bytes.NewBuffer([]byte("testString")) + trans, err := NewTBufferedReadTransport(buffer) + require.NotNil(t, trans) + require.Nil(t, err) + require.Equal(t, uint64(10), trans.RemainingBytes()) + + firstRead := make([]byte, 4) + n, err := trans.Read(firstRead) + require.Nil(t, err) + require.Equal(t, 4, n) + require.Equal(t, []byte("test"), firstRead) + require.Equal(t, uint64(6), trans.RemainingBytes()) + + secondRead := make([]byte, 7) + n, err = trans.Read(secondRead) + require.Equal(t, 6, n) + require.Equal(t, []byte("String"), secondRead[0:6]) + require.Equal(t, uint64(0), trans.RemainingBytes()) +} + +// TestTBufferedReadTransportEmptyFunctions tests the empty functions in TBufferedReadTransport +func TestTBufferedReadTransportEmptyFunctions(t *testing.T) { + byteArr := make([]byte, 1) + trans, err := NewTBufferedReadTransport(bytes.NewBuffer(byteArr)) + require.NotNil(t, trans) + require.Nil(t, err) + + err = trans.Open() + require.Nil(t, err) + + err = trans.Close() + require.Nil(t, err) + + err = trans.Flush() + require.Nil(t, err) + + n, err := trans.Write(byteArr) + require.Equal(t, 1, n) + require.Nil(t, err) + + isOpen := trans.IsOpen() + require.True(t, isOpen) +} diff --git a/m3/customtransports/m3_calc_transport.go b/m3/customtransports/m3_calc_transport.go new file mode 100644 index 00000000..dae2a669 --- /dev/null +++ b/m3/customtransports/m3_calc_transport.go @@ -0,0 +1,86 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package customtransport + +import ( + "sync/atomic" +) + +// TCalcTransport is a thrift TTransport that is used to calculate how many +// bytes are used when writing a thrift element. It is thread-safe +type TCalcTransport struct { + count int32 +} + +// GetCount returns the number of bytes that would be written +// Required to maintain thrift.TTransport interface +func (p *TCalcTransport) GetCount() int32 { + return atomic.LoadInt32(&p.count) +} + +// ResetCount resets the number of bytes written to 0 +func (p *TCalcTransport) ResetCount() { + atomic.StoreInt32(&p.count, 0) +} + +// Write adds the number of bytes written to the count +// Required to maintain thrift.TTransport interface +func (p *TCalcTransport) Write(buf []byte) (int, error) { + atomic.AddInt32(&p.count, int32(len(buf))) + return len(buf), nil +} + +// IsOpen does nothing as transport is not maintaining a connection +// Required to maintain thrift.TTransport interface +func (p *TCalcTransport) IsOpen() bool { + return true +} + +// Open does nothing as transport is not maintaining a connection +// Required to maintain thrift.TTransport interface +func (p *TCalcTransport) Open() error { + return nil +} + +// Close does nothing as transport is not maintaining a connection +// Required to maintain thrift.TTransport interface +func (p *TCalcTransport) Close() error { + return nil +} + +// Read does nothing as it's not required for calculations +// Required to maintain thrift.TTransport interface +func (p *TCalcTransport) Read(buf []byte) (int, error) { + return 0, nil +} + +// RemainingBytes returns the max number of bytes (same as Thrift's StreamTransport) as we +// do not know how many bytes we have left. +func (p *TCalcTransport) RemainingBytes() uint64 { + const maxSize = ^uint64(0) + return maxSize +} + +// Flush does nothing as it's not required for calculations +// Required to maintain thrift.TTransport interface +func (p *TCalcTransport) Flush() error { + return nil +} diff --git a/m3/customtransports/m3_calc_transport_test.go b/m3/customtransports/m3_calc_transport_test.go new file mode 100644 index 00000000..442a106a --- /dev/null +++ b/m3/customtransports/m3_calc_transport_test.go @@ -0,0 +1,57 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package customtransport + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTCalcTransport(t *testing.T) { + trans := &TCalcTransport{} + require.Nil(t, trans.Open()) + require.True(t, trans.IsOpen()) + require.EqualValues(t, 0, trans.GetCount()) + + testString1 := "test" + testString2 := "string" + n, err := trans.Write([]byte(testString1)) + require.Equal(t, len(testString1), n) + require.Nil(t, err) + require.EqualValues(t, len(testString1), trans.GetCount()) + n, err = trans.Write([]byte(testString2)) + require.EqualValues(t, len(testString2), n) + require.Nil(t, err) + require.EqualValues(t, len(testString1)+len(testString2), trans.GetCount()) + + n, err = trans.Read([]byte(testString1)) + require.Nil(t, err) + require.EqualValues(t, 0, n) + require.Equal(t, ^uint64(0), trans.RemainingBytes()) + + trans.ResetCount() + require.EqualValues(t, 0, trans.GetCount()) + + err = trans.Flush() + require.Nil(t, err) + require.Nil(t, trans.Close()) +} diff --git a/m3/reporter.go b/m3/reporter.go new file mode 100644 index 00000000..31a77c41 --- /dev/null +++ b/m3/reporter.go @@ -0,0 +1,445 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "errors" + "fmt" + "io" + "math" + "os" + "sync" + "time" + + "github.com/uber-go/tally" + "github.com/uber-go/tally/m3/customtransports" + m3thrift "github.com/uber-go/tally/m3/thrift" + "github.com/uber-go/tally/m3/thriftudp" + + "github.com/apache/thrift/lib/go/thrift" +) + +// Protocol describes a M3 thrift transport protocol. +type Protocol int + +// Compact and Binary represent the compact and +// binary thrift protocols respectively. +const ( + Compact Protocol = iota + Binary +) + +const ( + // ServiceTag is the name of the M3 service tag. + ServiceTag = "service" + // EnvTag is the name of the M3 env tag. + EnvTag = "env" + // HostTag is the name of the M3 host tag. + HostTag = "host" + // DefaultMaxQueueSize is the default M3 reporter queue size. + DefaultMaxQueueSize = 4096 + // DefaultMaxPacketSize is the default M3 reporter max packet size. + DefaultMaxPacketSize = int32(1440) + + emitMetricBatchOverhead = 19 +) + +// Initialize max vars in init function to avoid lint error. +var ( + maxInt64 int64 + maxFloat64 float64 +) + +func init() { + maxInt64 = math.MaxInt64 + maxFloat64 = math.MaxFloat64 +} + +type metricType int + +const ( + counterType metricType = iota + 1 + timerType + gaugeType +) + +var ( + errNoHostPorts = errors.New("at least one entry for HostPorts is required") + errCommonTagSize = errors.New("common tags serialized size exceeds packet size") +) + +// Reporter is an M3 reporter. +type Reporter interface { + tally.CachedStatsReporter + io.Closer +} + +// reporter is a metrics backend that reports metrics to a local or +// remote M3 collector, metrics are batched together and emitted +// via either thrift compact or binary protocol in batch UDP packets. +type reporter struct { + client *m3thrift.M3Client + curBatch *m3thrift.MetricBatch + curBatchLock sync.Mutex + calc *customtransport.TCalcTransport + calcProto thrift.TProtocol + calcLock sync.Mutex + commonTags map[*m3thrift.MetricTag]bool + freeBytes int32 + processors sync.WaitGroup + resourcePool *resourcePool + closeChan chan struct{} + + metCh chan sizedMetric +} + +// Options is a set of options for the M3 reporter. +type Options struct { + HostPorts []string + Service string + Env string + CommonTags map[string]string + IncludeHost bool + Protocol Protocol + MaxQueueSize int + MaxPacketSizeBytes int32 + Interval time.Duration +} + +// NewReporter creates a new M3 reporter. +func NewReporter(opts Options) (Reporter, error) { + if opts.MaxQueueSize <= 0 { + opts.MaxQueueSize = DefaultMaxQueueSize + } + if opts.MaxPacketSizeBytes <= 0 { + opts.MaxPacketSizeBytes = DefaultMaxPacketSize + } + + // Create M3 thrift client + var trans thrift.TTransport + var err error + if len(opts.HostPorts) == 0 { + err = errNoHostPorts + } else if len(opts.HostPorts) == 1 { + trans, err = thriftudp.NewTUDPClientTransport(opts.HostPorts[0], "") + } else { + trans, err = thriftudp.NewTMultiUDPClientTransport(opts.HostPorts, "") + } + if err != nil { + return nil, err + } + + var protocolFactory thrift.TProtocolFactory + if opts.Protocol == Compact { + protocolFactory = thrift.NewTCompactProtocolFactory() + } else { + protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() + } + + client := m3thrift.NewM3ClientFactory(trans, protocolFactory) + resourcePool := newResourcePool(protocolFactory) + + // Create common tags + tags := resourcePool.getTagList() + for k, v := range opts.CommonTags { + tags[createTag(resourcePool, k, v)] = true + } + if opts.CommonTags[ServiceTag] == "" { + if opts.Service == "" { + return nil, fmt.Errorf("%s common tag is required", ServiceTag) + } + tags[createTag(resourcePool, ServiceTag, opts.Service)] = true + } + if opts.CommonTags[EnvTag] == "" { + if opts.Env == "" { + return nil, fmt.Errorf("%s common tag is required", EnvTag) + } + tags[createTag(resourcePool, EnvTag, opts.Env)] = true + } + if opts.IncludeHost { + if opts.CommonTags[HostTag] == "" { + hostname, err := os.Hostname() + if err != nil { + return nil, fmt.Errorf("error resolving host tag: %v", err) + } + tags[createTag(resourcePool, HostTag, hostname)] = true + } + } + + // Calculate size of common tags + batch := resourcePool.getBatch() + batch.CommonTags = tags + batch.Metrics = []*m3thrift.Metric{} + proto := resourcePool.getProto() + batch.Write(proto) + calc := proto.Transport().(*customtransport.TCalcTransport) + numOverheadBytes := emitMetricBatchOverhead + calc.GetCount() + calc.ResetCount() + + freeBytes := opts.MaxPacketSizeBytes - numOverheadBytes + if freeBytes <= 0 { + return nil, errCommonTagSize + } + + r := &reporter{ + client: client, + curBatch: batch, + calc: calc, + calcProto: proto, + commonTags: tags, + freeBytes: freeBytes, + resourcePool: resourcePool, + metCh: make(chan sizedMetric, opts.MaxQueueSize), + } + + r.processors.Add(1) + go r.process() + + return r, nil +} + +// AllocateCounter implements tally.CachedStatsReporter. +func (r *reporter) AllocateCounter( + name string, tags map[string]string, +) tally.CachedCount { + counter := r.newMetric(name, tags, counterType) + size := r.calculateSize(counter) + return cachedMetric{counter, r, size} +} + +// AllocateGauge implements tally.CachedStatsReporter. +func (r *reporter) AllocateGauge( + name string, tags map[string]string, +) tally.CachedGauge { + gauge := r.newMetric(name, tags, gaugeType) + size := r.calculateSize(gauge) + return cachedMetric{gauge, r, size} +} + +// AllocateTimer implements tally.CachedStatsReporter. +func (r *reporter) AllocateTimer( + name string, tags map[string]string, +) tally.CachedTimer { + timer := r.newMetric(name, tags, timerType) + size := r.calculateSize(timer) + return cachedMetric{timer, r, size} +} + +func (r *reporter) newMetric( + name string, + tags map[string]string, + t metricType, +) *m3thrift.Metric { + var ( + m = r.resourcePool.getMetric() + metVal = r.resourcePool.getValue() + ) + m.Name = name + if tags != nil { + metTags := r.resourcePool.getTagList() + for k, v := range tags { + val := v + metTag := r.resourcePool.getTag() + metTag.TagName = k + metTag.TagValue = &val + metTags[metTag] = true + } + m.Tags = metTags + } + m.Timestamp = &maxInt64 + + switch t { + case counterType: + c := r.resourcePool.getCount() + c.I64Value = &maxInt64 + metVal.Count = c + case gaugeType: + g := r.resourcePool.getGauge() + g.DValue = &maxFloat64 + metVal.Gauge = g + case timerType: + t := r.resourcePool.getTimer() + t.I64Value = &maxInt64 + metVal.Timer = t + } + m.MetricValue = metVal + + return m +} + +func (r *reporter) calculateSize(m *m3thrift.Metric) int32 { + r.calcLock.Lock() + m.Write(r.calcProto) + size := r.calc.GetCount() + r.calc.ResetCount() + r.calcLock.Unlock() + return size +} + +func (r *reporter) reportCopyMetric( + m *m3thrift.Metric, + size int32, + t metricType, + iValue int64, + dValue float64, +) { + copy := r.resourcePool.getMetric() + copy.Name = m.Name + copy.Tags = m.Tags + timestampNano := time.Now().UnixNano() + copy.Timestamp = ×tampNano + copy.MetricValue = r.resourcePool.getValue() + + switch t { + case counterType: + c := r.resourcePool.getCount() + c.I64Value = &iValue + copy.MetricValue.Count = c + case gaugeType: + g := r.resourcePool.getGauge() + g.DValue = &dValue + copy.MetricValue.Gauge = g + case timerType: + t := r.resourcePool.getTimer() + t.I64Value = &iValue + copy.MetricValue.Timer = t + } + + select { + case r.metCh <- sizedMetric{copy, size}: + default: + } +} + +// Flush implements tally.CachedStatsReporter. +func (r *reporter) Flush() { + r.metCh <- sizedMetric{} +} + +// Close waits for metrics to be flushed before closing the backend. +func (r *reporter) Close() (err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("close error occurred: %v", r) + } + }() + + close(r.metCh) + r.processors.Wait() + return +} + +func (r *reporter) Capabilities() tally.Capabilities { + return r +} + +func (r *reporter) Reporting() bool { + return true +} + +func (r *reporter) Tagging() bool { + return true +} + +func (r *reporter) process() { + mets := make([]*m3thrift.Metric, 0, (r.freeBytes / 10)) + bytes := int32(0) + + for smet := range r.metCh { + if smet.m == nil { + // Explicit flush requested + if len(mets) > 0 { + mets = r.flush(mets) + bytes = 0 + } + continue + } + + if bytes+smet.size > r.freeBytes { + mets = r.flush(mets) + bytes = 0 + } + + mets = append(mets, smet.m) + bytes += smet.size + } + + if len(mets) > 0 { + // Final flush + r.flush(mets) + } + + r.processors.Done() +} + +func (r *reporter) flush( + mets []*m3thrift.Metric, +) []*m3thrift.Metric { + r.curBatchLock.Lock() + r.curBatch.Metrics = mets + r.client.EmitMetricBatch(r.curBatch) + r.curBatch.Metrics = nil + r.curBatchLock.Unlock() + + r.resourcePool.releaseShallowMetrics(mets) + + for i := range mets { + mets[i] = nil + } + return mets[:0] +} + +func createTag( + pool *resourcePool, + tagName, tagValue string, +) *m3thrift.MetricTag { + tag := pool.getTag() + tag.TagName = tagName + if tagValue != "" { + tag.TagValue = &tagValue + } + + return tag +} + +type cachedMetric struct { + metric *m3thrift.Metric + reporter *reporter + size int32 +} + +func (c cachedMetric) ReportCount(value int64) { + c.reporter.reportCopyMetric(c.metric, c.size, counterType, value, 0) +} + +func (c cachedMetric) ReportGauge(value float64) { + c.reporter.reportCopyMetric(c.metric, c.size, gaugeType, 0, value) +} + +func (c cachedMetric) ReportTimer(interval time.Duration) { + val := int64(interval) + c.reporter.reportCopyMetric(c.metric, c.size, timerType, val, 0) +} + +type sizedMetric struct { + m *m3thrift.Metric + size int32 +} diff --git a/m3/reporter_benchmark_test.go b/m3/reporter_benchmark_test.go new file mode 100644 index 00000000..89dbeffe --- /dev/null +++ b/m3/reporter_benchmark_test.go @@ -0,0 +1,61 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "testing" + + "github.com/apache/thrift/lib/go/thrift" +) + +const ( + updaters = 10 + updates = 1000 + numIds = 10 + + testID = "stats.$dc.gauges.m3+" + + "servers.my-internal-server-$dc.network.eth0_tx_colls+" + + "dc=$dc,domain=production.$zone,env=production,pipe=$pipe,service=servers,type=gauge" +) + +func BenchmarkNewMetric(b *testing.B) { + protocolFactory := thrift.NewTCompactProtocolFactory() + resourcePool := newResourcePool(protocolFactory) + benchReporter := &reporter{resourcePool: resourcePool} + + for n := 0; n < b.N; n++ { + benchReporter.newMetric("foo", nil, counterType) + } +} + +func BenchmarkCalulateSize(b *testing.B) { + protocolFactory := thrift.NewTCompactProtocolFactory() + resourcePool := newResourcePool(protocolFactory) + benchReporter := &reporter{resourcePool: resourcePool} + + val := int64(123456) + met := benchReporter.newMetric("foo", nil, counterType) + met.MetricValue.Count.I64Value = &val + + for n := 0; n < b.N; n++ { + benchReporter.calculateSize(met) + } +} diff --git a/m3/reporter_test.go b/m3/reporter_test.go new file mode 100644 index 00000000..d6cf3d70 --- /dev/null +++ b/m3/reporter_test.go @@ -0,0 +1,543 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "bytes" + "math/rand" + "net" + "os" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/uber-go/tally" + "github.com/uber-go/tally/m3/customtransports" + m3thrift "github.com/uber-go/tally/m3/thrift" + "github.com/uber-go/tally/m3/thriftudp" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + numReaders = 10 + queueSize = 1000 + includeHost = true + maxPacketSize = int32(1440) + shortInterval = 10 * time.Millisecond +) + +var localListenAddr = &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1)} +var defaultCommonTags = map[string]string{"env": "test", "host": "test"} + +var protocols = []Protocol{Compact, Binary} + +// TestReporter tests the reporter works as expected with both compact and binary protocols +func TestReporter(t *testing.T) { + for _, protocol := range protocols { + var wg sync.WaitGroup + server := newFakeM3Server(t, &wg, true, protocol) + go server.Serve() + defer server.Close() + + commonTags = map[string]string{ + "env": "development", + "host": hostname(), + "commonTag": "common", + "commonTag2": "tag", + "commonTag3": "val", + } + r, err := NewReporter(Options{ + HostPorts: []string{server.Addr}, + Service: "testService", + CommonTags: commonTags, + IncludeHost: includeHost, + Protocol: protocol, + MaxQueueSize: queueSize, + MaxPacketSizeBytes: maxPacketSize, + }) + require.NoError(t, err) + defer func() { + assert.NoError(t, r.Close()) + }() + + tags := map[string]string{"testTag": "TestValue", "testTag2": "TestValue2"} + + wg.Add(2) + + r.AllocateCounter("my-counter", tags).ReportCount(10) + r.Flush() + + r.AllocateTimer("my-timer", tags).ReportTimer(5 * time.Millisecond) + r.Flush() + + wg.Wait() + + batches := server.Service.getBatches() + require.Equal(t, 2, len(batches)) + + // Validate common tags + for _, batch := range batches { + require.NotNil(t, batch) + require.True(t, batch.IsSetCommonTags()) + require.Equal(t, len(commonTags)+1, len(batch.GetCommonTags())) + for tag := range batch.GetCommonTags() { + if tag.GetTagName() == ServiceTag { + require.Equal(t, "testService", tag.GetTagValue()) + } else { + require.Equal(t, commonTags[tag.GetTagName()], tag.GetTagValue()) + } + } + } + + //Validate metrics + emittedCounters := batches[0].GetMetrics() + require.Equal(t, 1, len(emittedCounters)) + emittedTimers := batches[1].GetMetrics() + require.Equal(t, 1, len(emittedTimers)) + + emittedCounter, emittedTimer := emittedCounters[0], emittedTimers[0] + if emittedCounter.GetName() == "my-timer" { + emittedCounter, emittedTimer = emittedTimer, emittedCounter + } + + require.Equal(t, "my-counter", emittedCounter.GetName()) + require.True(t, emittedCounter.IsSetTags()) + require.Equal(t, len(tags), len(emittedCounter.GetTags())) + for tag := range emittedCounter.GetTags() { + require.Equal(t, tags[tag.GetTagName()], tag.GetTagValue()) + } + require.True(t, emittedCounter.IsSetMetricValue()) + emittedVal := emittedCounter.GetMetricValue() + require.True(t, emittedVal.IsSetCount()) + require.False(t, emittedVal.IsSetGauge()) + require.False(t, emittedVal.IsSetTimer()) + emittedCount := emittedVal.GetCount() + require.True(t, emittedCount.IsSetI64Value()) + require.EqualValues(t, int64(10), emittedCount.GetI64Value()) + + require.True(t, emittedTimer.IsSetMetricValue()) + emittedVal = emittedTimer.GetMetricValue() + require.False(t, emittedVal.IsSetCount()) + require.False(t, emittedVal.IsSetGauge()) + require.True(t, emittedVal.IsSetTimer()) + emittedTimerVal := emittedVal.GetTimer() + require.True(t, emittedTimerVal.IsSetI64Value()) + require.EqualValues(t, int64(5*1000*1000), emittedTimerVal.GetI64Value()) + } +} + +// TestMultiReporter tests the multi Reporter works as expected +func TestMultiReporter(t *testing.T) { + dests := []string{"127.0.0.1:9052", "127.0.0.1:9053"} + commonTags := map[string]string{ + "env": "test", + "host": "test", + "commonTag": "common", + "commonTag2": "tag", + "commonTag3": "val", + } + r, err := NewReporter(Options{ + HostPorts: dests, + Service: "testService", + CommonTags: commonTags, + }) + require.NoError(t, err) + defer r.Close() + + reporter, ok := r.(*reporter) + require.True(t, ok) + multitransport, ok := reporter.client.Transport.(*thriftudp.TMultiUDPTransport) + require.NotNil(t, multitransport) + require.True(t, ok) +} + +// TestNewReporterErrors tests for Reporter creation errors +func TestNewReporterErrors(t *testing.T) { + var err error + // Test freeBytes (maxPacketSizeBytes - numOverheadBytes) is negative + _, err = NewReporter(Options{ + HostPorts: []string{"127.0.0.1"}, + Service: "testService", + MaxQueueSize: 10, + MaxPacketSizeBytes: 2 << 5, + }) + assert.Error(t, err) + // Test invalid addr + _, err = NewReporter(Options{ + HostPorts: []string{"fakeAddress"}, + Service: "testService", + }) + assert.Error(t, err) +} + +// TestReporterFinalFlush ensures the Reporter emits the last batch of metrics +// after close +func TestReporterFinalFlush(t *testing.T) { + var wg sync.WaitGroup + server := newFakeM3Server(t, &wg, true, Compact) + go server.Serve() + defer server.Close() + + r, err := NewReporter(Options{ + HostPorts: []string{server.Addr}, + Service: "testService", + CommonTags: defaultCommonTags, + MaxQueueSize: queueSize, + MaxPacketSizeBytes: maxPacketSize, + }) + require.NoError(t, err) + + wg.Add(1) + + r.AllocateTimer("my-timer", nil).ReportTimer(10 * time.Millisecond) + r.Close() + + wg.Wait() + + require.Equal(t, 1, len(server.Service.getBatches())) + require.NotNil(t, server.Service.getBatches()[0]) + require.Equal(t, 1, len(server.Service.getBatches()[0].GetMetrics())) +} + +func TestBatchSizes(t *testing.T) { + server := newSimpleServer(t) + go server.serve() + defer server.close() + + commonTags := map[string]string{ + "env": "test", + "domain": "pod" + strconv.Itoa(rand.Intn(100)), + } + maxPacketSize := int32(1440) + r, err := NewReporter(Options{ + HostPorts: []string{server.addr()}, + Service: "testService", + CommonTags: commonTags, + MaxQueueSize: 10000, + MaxPacketSizeBytes: maxPacketSize, + }) + + require.NoError(t, err) + rand.Seed(time.Now().UnixNano()) + + var stop uint32 + go func() { + var ( + counters = make(map[string]tally.CachedCount) + gauges = make(map[string]tally.CachedGauge) + timers = make(map[string]tally.CachedTimer) + randTags = func() map[string]string { + return map[string]string{ + "t1": "val" + strconv.Itoa(rand.Intn(10000)), + } + } + ) + for atomic.LoadUint32(&stop) == 0 { + metTypeRand := rand.Intn(9) + name := "size.test.metric.name" + strconv.Itoa(rand.Intn(50)) + + if metTypeRand <= 2 { + _, ok := counters[name] + if !ok { + counters[name] = r.AllocateCounter(name, randTags()) + } + counters[name].ReportCount(rand.Int63n(10000)) + } else if metTypeRand <= 5 { + _, ok := gauges[name] + if !ok { + gauges[name] = r.AllocateGauge(name, randTags()) + } + gauges[name].ReportGauge(rand.Float64() * 10000) + } else { + _, ok := timers[name] + if !ok { + timers[name] = r.AllocateTimer(name, randTags()) + } + timers[name].ReportTimer(time.Duration(rand.Int63n(10000))) + } + } + r.Close() + }() + + for len(server.getPackets()) < 100 { + time.Sleep(shortInterval) + } + + atomic.StoreUint32(&stop, 1) + for _, packet := range server.getPackets() { + require.True(t, len(packet) < int(maxPacketSize)) + } +} + +func TestReporterSpecifyService(t *testing.T) { + commonTags := map[string]string{ + ServiceTag: "overrideService", + EnvTag: "test", + HostTag: "overrideHost", + } + r, err := NewReporter(Options{ + HostPorts: []string{"127.0.0.1:1000"}, + Service: "testService", + CommonTags: commonTags, + IncludeHost: includeHost, + MaxQueueSize: 10, MaxPacketSizeBytes: 100, + }) + require.NoError(t, err) + defer r.Close() + + reporter, ok := r.(*reporter) + require.True(t, ok) + assert.Equal(t, 3, len(reporter.commonTags)) + for tag := range reporter.commonTags { + switch tag.GetTagName() { + case ServiceTag: + assert.Equal(t, "overrideService", tag.GetTagValue()) + case EnvTag: + assert.Equal(t, "test", tag.GetTagValue()) + case HostTag: + assert.Equal(t, "overrideHost", tag.GetTagValue()) + } + } +} + +func TestIncludeHost(t *testing.T) { + var wg sync.WaitGroup + server := newFakeM3Server(t, &wg, true, Compact) + go server.Serve() + defer server.Close() + + commonTags := map[string]string{"env": "test"} + r, err := NewReporter(Options{ + HostPorts: []string{server.Addr}, + Service: "testService", + CommonTags: commonTags, + IncludeHost: false, + }) + require.NoError(t, err) + defer r.Close() + withoutHost, ok := r.(*reporter) + require.True(t, ok) + assert.False(t, tagIncluded(withoutHost.commonTags, "host")) + + r, err = NewReporter(Options{ + HostPorts: []string{server.Addr}, + Service: "testService", + CommonTags: commonTags, + IncludeHost: true, + }) + require.NoError(t, err) + defer r.Close() + withHost, ok := r.(*reporter) + require.True(t, ok) + assert.True(t, tagIncluded(withHost.commonTags, "host")) +} + +func TestReporterHasReportingAndTaggingCapability(t *testing.T) { + r, err := NewReporter(Options{ + HostPorts: []string{"127.0.0.1:9052"}, + Service: "testService", + CommonTags: defaultCommonTags, + }) + require.Nil(t, err) + + assert.True(t, r.Capabilities().Reporting()) + assert.True(t, r.Capabilities().Tagging()) +} + +type simpleServer struct { + conn *net.UDPConn + t *testing.T + packets [][]byte + sync.Mutex + closed int32 +} + +func newSimpleServer(t *testing.T) *simpleServer { + addr, err := net.ResolveUDPAddr("udp", ":0") + require.NoError(t, err) + + conn, err := net.ListenUDP(addr.Network(), addr) + require.NoError(t, err) + + return &simpleServer{conn: conn, t: t} +} + +func (s *simpleServer) serve() { + readBuf := make([]byte, 64000) + for atomic.LoadInt32(&s.closed) == 0 { + n, err := s.conn.Read(readBuf) + if err != nil { + if atomic.LoadInt32(&s.closed) == 0 { + s.t.Errorf("FakeM3Server failed to Read: %v", err) + } + return + } + s.Lock() + s.packets = append(s.packets, readBuf[0:n]) + s.Unlock() + readBuf = make([]byte, 64000) + } +} + +func (s *simpleServer) getPackets() [][]byte { + s.Lock() + defer s.Unlock() + copy := make([][]byte, len(s.packets)) + for i, packet := range s.packets { + copy[i] = packet + } + + return copy +} + +func (s *simpleServer) close() error { + atomic.AddInt32(&s.closed, 1) + return s.conn.Close() +} + +func (s *simpleServer) addr() string { + return s.conn.LocalAddr().String() +} + +type fakeM3Server struct { + t *testing.T + Service *fakeM3Service + Addr string + protocol Protocol + processor thrift.TProcessor + conn *net.UDPConn + closed int32 +} + +func newFakeM3Server(t *testing.T, wg *sync.WaitGroup, countBatches bool, protocol Protocol) *fakeM3Server { + service := newFakeM3Service(wg, countBatches) + processor := m3thrift.NewM3Processor(service) + conn, err := net.ListenUDP(localListenAddr.Network(), localListenAddr) + require.NoError(t, err, "ListenUDP failed") + + return &fakeM3Server{ + t: t, + Service: service, + Addr: conn.LocalAddr().String(), + conn: conn, + protocol: protocol, + processor: processor, + } +} + +func (f *fakeM3Server) Serve() { + readBuf := make([]byte, 64000) + for f.conn != nil { + n, err := f.conn.Read(readBuf) + if err != nil { + if atomic.LoadInt32(&f.closed) == 0 { + f.t.Errorf("FakeM3Server failed to Read: %v", err) + } + return + } + trans, _ := customtransport.NewTBufferedReadTransport(bytes.NewBuffer(readBuf[0:n])) + var proto thrift.TProtocol + if f.protocol == Compact { + proto = thrift.NewTCompactProtocol(trans) + } else { + proto = thrift.NewTBinaryProtocolTransport(trans) + } + f.processor.Process(proto, proto) + } +} + +func (f *fakeM3Server) Close() error { + atomic.AddInt32(&f.closed, 1) + return f.conn.Close() +} + +func newFakeM3Service(wg *sync.WaitGroup, countBatches bool) *fakeM3Service { + return &fakeM3Service{wg: wg, countBatches: countBatches} +} + +type fakeM3Service struct { + lock sync.RWMutex + batches []*m3thrift.MetricBatch + metrics []*m3thrift.Metric + wg *sync.WaitGroup + countBatches bool +} + +func (m *fakeM3Service) getBatches() []*m3thrift.MetricBatch { + m.lock.RLock() + defer m.lock.RUnlock() + return m.batches +} + +func (m *fakeM3Service) getMetrics() []*m3thrift.Metric { + m.lock.RLock() + defer m.lock.RUnlock() + return m.metrics +} + +func (m *fakeM3Service) EmitMetricBatch(batch *m3thrift.MetricBatch) (err error) { + m.lock.Lock() + m.batches = append(m.batches, batch) + if m.countBatches { + m.wg.Done() + } + + for _, metric := range batch.Metrics { + m.metrics = append(m.metrics, metric) + if !m.countBatches { + m.wg.Done() + } + } + + m.lock.Unlock() + return thrift.NewTTransportException(thrift.END_OF_FILE, "complete") +} + +func hostname() string { + host, err := os.Hostname() + if err != nil { + host = "unknown" + } + return host +} + +func tagIncluded(tags map[*m3thrift.MetricTag]bool, tagName string) bool { + for k, v := range tags { + if v && k.TagName == tagName { + return true + } + } + return false +} + +func tagEquals(tags map[*m3thrift.MetricTag]bool, tagName, tagValue string) bool { + for k, v := range tags { + if v && k.GetTagName() == tagName { + return k.GetTagValue() == tagValue + } + } + return false +} diff --git a/m3/resource_pool.go b/m3/resource_pool.go new file mode 100644 index 00000000..0d92ad96 --- /dev/null +++ b/m3/resource_pool.go @@ -0,0 +1,222 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "github.com/uber-go/tally" + "github.com/uber-go/tally/m3/customtransports" + m3thrift "github.com/uber-go/tally/m3/thrift" + + "github.com/apache/thrift/lib/go/thrift" +) + +const ( + batchPoolSize = 10 + metricPoolSize = DefaultMaxQueueSize + valuePoolSize = DefaultMaxQueueSize + timerPoolSize = DefaultMaxQueueSize + tagPoolSize = DefaultMaxQueueSize + counterPoolSize = DefaultMaxQueueSize + gaugePoolSize = DefaultMaxQueueSize + protoPoolSize = 10 +) + +type resourcePool struct { + batchPool *tally.ObjectPool + metricPool *tally.ObjectPool + tagPool *tally.ObjectPool + valuePool *tally.ObjectPool + counterPool *tally.ObjectPool + gaugePool *tally.ObjectPool + timerPool *tally.ObjectPool + protoPool *tally.ObjectPool +} + +func newResourcePool(protoFac thrift.TProtocolFactory) *resourcePool { + batchPool := tally.NewObjectPool(batchPoolSize) + batchPool.Init(func() interface{} { + return m3thrift.NewMetricBatch() + }) + + metricPool := tally.NewObjectPool(metricPoolSize) + metricPool.Init(func() interface{} { + return m3thrift.NewMetric() + }) + + tagPool := tally.NewObjectPool(tagPoolSize) + tagPool.Init(func() interface{} { + return m3thrift.NewMetricTag() + }) + + valuePool := tally.NewObjectPool(valuePoolSize) + valuePool.Init(func() interface{} { + return m3thrift.NewMetricValue() + }) + + counterPool := tally.NewObjectPool(counterPoolSize) + counterPool.Init(func() interface{} { + return m3thrift.NewCountValue() + }) + + gaugePool := tally.NewObjectPool(gaugePoolSize) + gaugePool.Init(func() interface{} { + return m3thrift.NewGaugeValue() + }) + + timerPool := tally.NewObjectPool(timerPoolSize) + timerPool.Init(func() interface{} { + return m3thrift.NewTimerValue() + }) + + protoPool := tally.NewObjectPool(protoPoolSize) + protoPool.Init(func() interface{} { + return protoFac.GetProtocol(&customtransport.TCalcTransport{}) + }) + + return &resourcePool{ + batchPool: batchPool, + metricPool: metricPool, + tagPool: tagPool, + valuePool: valuePool, + counterPool: counterPool, + gaugePool: gaugePool, + timerPool: timerPool, + protoPool: protoPool, + } +} + +func (r *resourcePool) getBatch() *m3thrift.MetricBatch { + o := r.batchPool.Get() + return o.(*m3thrift.MetricBatch) +} + +func (r *resourcePool) getMetric() *m3thrift.Metric { + o := r.metricPool.Get() + return o.(*m3thrift.Metric) +} + +func (r *resourcePool) getTagList() map[*m3thrift.MetricTag]bool { + return map[*m3thrift.MetricTag]bool{} +} + +func (r *resourcePool) getTag() *m3thrift.MetricTag { + o := r.tagPool.Get() + return o.(*m3thrift.MetricTag) +} + +func (r *resourcePool) getValue() *m3thrift.MetricValue { + o := r.valuePool.Get() + return o.(*m3thrift.MetricValue) +} + +func (r *resourcePool) getCount() *m3thrift.CountValue { + o := r.counterPool.Get() + return o.(*m3thrift.CountValue) +} + +func (r *resourcePool) getGauge() *m3thrift.GaugeValue { + o := r.gaugePool.Get() + return o.(*m3thrift.GaugeValue) +} + +func (r *resourcePool) getTimer() *m3thrift.TimerValue { + o := r.timerPool.Get() + return o.(*m3thrift.TimerValue) +} + +func (r *resourcePool) getProto() thrift.TProtocol { + o := r.protoPool.Get() + return o.(thrift.TProtocol) +} + +func (r *resourcePool) releaseProto(proto thrift.TProtocol) { + calc := proto.Transport().(*customtransport.TCalcTransport) + calc.ResetCount() + r.protoPool.Put(proto) +} + +func (r *resourcePool) releaseBatch(batch *m3thrift.MetricBatch) { + batch.CommonTags = nil + for _, metric := range batch.Metrics { + r.releaseMetric(metric) + } + batch.Metrics = nil + r.batchPool.Put(batch) +} + +func (r *resourcePool) releaseMetricValue(metVal *m3thrift.MetricValue) { + if metVal.IsSetCount() { + metVal.Count.I64Value = nil + r.counterPool.Put(metVal.Count) + metVal.Count = nil + } else if metVal.IsSetGauge() { + metVal.Gauge.I64Value = nil + metVal.Gauge.DValue = nil + r.gaugePool.Put(metVal.Gauge) + metVal.Gauge = nil + } else if metVal.IsSetTimer() { + metVal.Timer.I64Value = nil + metVal.Timer.DValue = nil + r.timerPool.Put(metVal.Timer) + metVal.Timer = nil + } + r.valuePool.Put(metVal) +} + +func (r *resourcePool) releaseMetrics(mets []*m3thrift.Metric) { + for _, m := range mets { + r.releaseMetric(m) + } +} + +func (r *resourcePool) releaseShallowMetrics(mets []*m3thrift.Metric) { + for _, m := range mets { + r.releaseShallowMetric(m) + } +} + +func (r *resourcePool) releaseMetric(metric *m3thrift.Metric) { + metric.Name = "" + // Release Tags + for tag := range metric.Tags { + tag.TagName = "" + tag.TagValue = nil + r.tagPool.Put(tag) + } + metric.Tags = nil + + r.releaseShallowMetric(metric) +} + +func (r *resourcePool) releaseShallowMetric(metric *m3thrift.Metric) { + metric.Timestamp = nil + + metVal := metric.MetricValue + if metVal == nil { + r.metricPool.Put(metric) + return + } + + r.releaseMetricValue(metVal) + metric.MetricValue = nil + + r.metricPool.Put(metric) +} diff --git a/m3/resource_pool_test.go b/m3/resource_pool_test.go new file mode 100644 index 00000000..e7ece120 --- /dev/null +++ b/m3/resource_pool_test.go @@ -0,0 +1,109 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "testing" + + m3thrift "github.com/uber-go/tally/m3/thrift" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/stretchr/testify/require" +) + +func TestM3ResourcePoolMetric(t *testing.T) { + p := newResourcePool(thrift.NewTCompactProtocolFactory()) + + var v int64 + cm := p.getMetric() + cmv := p.getValue() + cm.MetricValue = cmv + cv := p.getCount() + cmv.Count = cv + cv.I64Value = &v + cm.Tags = map[*m3thrift.MetricTag]bool{createTag(p, "t1", "v1"): true} + + gm := p.getMetric() + gmv := p.getValue() + gm.MetricValue = gmv + gv := p.getGauge() + gmv.Gauge = gv + gv.I64Value = &v + + tm := p.getMetric() + tmv := p.getValue() + tm.MetricValue = tmv + tv := p.getTimer() + tmv.Timer = tv + tv.I64Value = &v + + p.releaseMetric(tm) + p.releaseMetric(gm) + p.releaseMetric(cm) + + cm2 := p.getMetric() + gm2 := p.getMetric() + tm2 := p.getMetric() + + require.Nil(t, cm2.MetricValue) + require.Nil(t, gm2.MetricValue) + require.Nil(t, tm2.MetricValue) +} + +func TestM3ResourcePoolMetricValue(t *testing.T) { + p := newResourcePool(thrift.NewTCompactProtocolFactory()) + var v int64 + cmv := p.getValue() + cv := p.getCount() + cmv.Count = cv + cv.I64Value = &v + + gmv := p.getValue() + gv := p.getGauge() + gmv.Gauge = gv + gv.I64Value = &v + + tmv := p.getValue() + tv := p.getTimer() + tmv.Timer = tv + tv.I64Value = &v + + p.releaseMetricValue(tmv) + p.releaseMetricValue(gmv) + p.releaseMetricValue(cmv) + + cmv2 := p.getValue() + gmv2 := p.getValue() + tmv2 := p.getValue() + + require.Nil(t, cmv2.Count) + require.Nil(t, gmv2.Gauge) + require.Nil(t, tmv2.Timer) +} + +func TestM3ResourcePoolBatch(t *testing.T) { + p := newResourcePool(thrift.NewTCompactProtocolFactory()) + b := p.getBatch() + b.Metrics = append(b.Metrics, p.getMetric()) + p.releaseBatch(b) + b2 := p.getBatch() + require.Equal(t, 0, len(b2.Metrics)) +} diff --git a/m3/scope_test.go b/m3/scope_test.go new file mode 100644 index 00000000..24c5f088 --- /dev/null +++ b/m3/scope_test.go @@ -0,0 +1,192 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package m3 + +import ( + "sync" + "testing" + "time" + + "github.com/uber-go/tally" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var commonTags = map[string]string{"env": "test"} + +type doneFn func() + +func newTestReporterScope( + t *testing.T, + addr string, + scopePrefix string, + scopeTags map[string]string, +) (Reporter, tally.Scope, doneFn) { + r, err := NewReporter(Options{ + HostPorts: []string{addr}, + Service: "testService", + CommonTags: commonTags, + IncludeHost: includeHost, + MaxQueueSize: queueSize, + MaxPacketSizeBytes: maxPacketSize, + }) + require.NoError(t, err) + + scope, closer := tally.NewCachedRootScope( + scopePrefix, scopeTags, r, shortInterval, + tally.DefaultSeparator) + + return r, scope, func() { + assert.NoError(t, closer.Close()) + + // Ensure reporter is closed too + var open, readStatus bool + select { + case _, open = <-r.(*reporter).metCh: + readStatus = true + default: + } + assert.True(t, readStatus) + assert.False(t, open) + } +} + +// TestScope tests that scope works as expected +func TestScope(t *testing.T) { + var wg sync.WaitGroup + server := newFakeM3Server(t, &wg, true, Compact) + go server.Serve() + defer server.Close() + + tags := map[string]string{"testTag": "TestValue", "testTag2": "TestValue2"} + + _, scope, close := newTestReporterScope(t, server.Addr, "honk", tags) + defer close() + + wg.Add(1) + + timer := scope.Timer("dazzle") + timer.Start().Stop() + + wg.Wait() + + require.Equal(t, 1, len(server.Service.getBatches())) + require.NotNil(t, server.Service.getBatches()[0]) + + emittedTimers := server.Service.getBatches()[0].GetMetrics() + require.Equal(t, 1, len(emittedTimers)) + require.Equal(t, "honk.dazzle", emittedTimers[0].GetName()) +} + +// TestScopeCounter tests that scope works as expected +func TestScopeCounter(t *testing.T) { + var wg sync.WaitGroup + server := newFakeM3Server(t, &wg, true, Compact) + go server.Serve() + defer server.Close() + + tags := map[string]string{"testTag": "TestValue", "testTag2": "TestValue2"} + + _, scope, close := newTestReporterScope(t, server.Addr, "honk", tags) + defer close() + + wg.Add(1) + + counter := scope.Counter("foobar") + counter.Inc(42) + + wg.Wait() + + require.Equal(t, 1, len(server.Service.getBatches())) + require.NotNil(t, server.Service.getBatches()[0]) + + emittedTimers := server.Service.getBatches()[0].GetMetrics() + require.Equal(t, 1, len(emittedTimers)) + require.Equal(t, "honk.foobar", emittedTimers[0].GetName()) +} + +// TestScopeGauge tests that scope works as expected +func TestScopeGauge(t *testing.T) { + var wg sync.WaitGroup + server := newFakeM3Server(t, &wg, true, Compact) + go server.Serve() + defer server.Close() + + tags := map[string]string{"testTag": "TestValue", "testTag2": "TestValue2"} + + _, scope, close := newTestReporterScope(t, server.Addr, "honk", tags) + defer close() + + wg.Add(1) + + gauge := scope.Gauge("foobaz") + gauge.Update(42) + + wg.Wait() + + require.Equal(t, 1, len(server.Service.getBatches())) + require.NotNil(t, server.Service.getBatches()[0]) + + emittedTimers := server.Service.getBatches()[0].GetMetrics() + require.Equal(t, 1, len(emittedTimers)) + require.Equal(t, "honk.foobaz", emittedTimers[0].GetName()) +} + +func BenchmarkScopeReportTimer(b *testing.B) { + backend, err := NewReporter(Options{ + HostPorts: []string{"127.0.0.1:4444"}, + Service: "my-service", + MaxQueueSize: 10000, + MaxPacketSizeBytes: maxPacketSize, + }) + if err != nil { + b.Error(err.Error()) + return + } + + scope, closer := tally.NewCachedRootScope( + "bench", + nil, + backend, + 1*time.Second, + tally.DefaultSeparator, + ) + + perEndpointScope := scope.Tagged( + map[string]string{ + "endpointid": "health", + "handlerid": "health", + }, + ) + timer := perEndpointScope.Timer("inbound.latency") + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + timer.Record(500) + } + }) + + b.StopTimer() + closer.Close() + b.StartTimer() +} diff --git a/m3/thrift/Makefile b/m3/thrift/Makefile new file mode 100644 index 00000000..56023b20 --- /dev/null +++ b/m3/thrift/Makefile @@ -0,0 +1,9 @@ +thrift_version := v1.0.0 + +gen-thrift: + @thrift --gen go:thrift_import="github.com/apache/thrift/lib/go/thrift" -out . $(thrift_version)/m3.thrift + @rm -rf m3/m3-remote + @mv m3/* . + @rm -rf m3 + @echo Generated thrift go files in metrics/m3/thrift/ + diff --git a/m3/thrift/constants.go b/m3/thrift/constants.go new file mode 100644 index 00000000..08083b31 --- /dev/null +++ b/m3/thrift/constants.go @@ -0,0 +1,38 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Autogenerated by Thrift Compiler (0.9.2) @generated +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +package m3 + +import ( + "bytes" + "fmt" + "github.com/apache/thrift/lib/go/thrift" +) + +// (needed to ensure safety because of naive import list construction.) +var _ = thrift.ZERO +var _ = fmt.Printf +var _ = bytes.Equal + +func init() { +} diff --git a/m3/thrift/m3.go b/m3/thrift/m3.go new file mode 100644 index 00000000..2b023523 --- /dev/null +++ b/m3/thrift/m3.go @@ -0,0 +1,265 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Autogenerated by Thrift Compiler (0.9.2) @generated +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +package m3 + +import ( + "bytes" + "fmt" + "github.com/apache/thrift/lib/go/thrift" +) + +// (needed to ensure safety because of naive import list construction.) +var _ = thrift.ZERO +var _ = fmt.Printf +var _ = bytes.Equal + +type M3 interface { //M3 Metrics Service + + // Emits a batch of metrics. + // + // Parameters: + // - Batch + EmitMetricBatch(batch *MetricBatch) (err error) +} + +//M3 Metrics Service +type M3Client struct { + Transport thrift.TTransport + ProtocolFactory thrift.TProtocolFactory + InputProtocol thrift.TProtocol + OutputProtocol thrift.TProtocol + SeqId int32 +} + +func NewM3ClientFactory(t thrift.TTransport, f thrift.TProtocolFactory) *M3Client { + return &M3Client{Transport: t, + ProtocolFactory: f, + InputProtocol: f.GetProtocol(t), + OutputProtocol: f.GetProtocol(t), + SeqId: 0, + } +} + +func NewM3ClientProtocol(t thrift.TTransport, iprot thrift.TProtocol, oprot thrift.TProtocol) *M3Client { + return &M3Client{Transport: t, + ProtocolFactory: nil, + InputProtocol: iprot, + OutputProtocol: oprot, + SeqId: 0, + } +} + +// Emits a batch of metrics. +// +// Parameters: +// - Batch +func (p *M3Client) EmitMetricBatch(batch *MetricBatch) (err error) { + if err = p.sendEmitMetricBatch(batch); err != nil { + return + } + return +} + +func (p *M3Client) sendEmitMetricBatch(batch *MetricBatch) (err error) { + oprot := p.OutputProtocol + if oprot == nil { + oprot = p.ProtocolFactory.GetProtocol(p.Transport) + p.OutputProtocol = oprot + } + p.SeqId++ + if err = oprot.WriteMessageBegin("emitMetricBatch", thrift.ONEWAY, p.SeqId); err != nil { + return + } + args := EmitMetricBatchArgs{ + Batch: batch, + } + if err = args.Write(oprot); err != nil { + return + } + if err = oprot.WriteMessageEnd(); err != nil { + return + } + return oprot.Flush() +} + +type M3Processor struct { + processorMap map[string]thrift.TProcessorFunction + handler M3 +} + +func (p *M3Processor) AddToProcessorMap(key string, processor thrift.TProcessorFunction) { + p.processorMap[key] = processor +} + +func (p *M3Processor) GetProcessorFunction(key string) (processor thrift.TProcessorFunction, ok bool) { + processor, ok = p.processorMap[key] + return processor, ok +} + +func (p *M3Processor) ProcessorMap() map[string]thrift.TProcessorFunction { + return p.processorMap +} + +func NewM3Processor(handler M3) *M3Processor { + + self3 := &M3Processor{handler: handler, processorMap: make(map[string]thrift.TProcessorFunction)} + self3.processorMap["emitMetricBatch"] = &m3ProcessorEmitMetricBatch{handler: handler} + return self3 +} + +func (p *M3Processor) Process(iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + name, _, seqId, err := iprot.ReadMessageBegin() + if err != nil { + return false, err + } + if processor, ok := p.GetProcessorFunction(name); ok { + return processor.Process(seqId, iprot, oprot) + } + iprot.Skip(thrift.STRUCT) + iprot.ReadMessageEnd() + x4 := thrift.NewTApplicationException(thrift.UNKNOWN_METHOD, "Unknown function "+name) + oprot.WriteMessageBegin(name, thrift.EXCEPTION, seqId) + x4.Write(oprot) + oprot.WriteMessageEnd() + oprot.Flush() + return false, x4 + +} + +type m3ProcessorEmitMetricBatch struct { + handler M3 +} + +func (p *m3ProcessorEmitMetricBatch) Process(seqId int32, iprot, oprot thrift.TProtocol) (success bool, err thrift.TException) { + args := EmitMetricBatchArgs{} + if err = args.Read(iprot); err != nil { + iprot.ReadMessageEnd() + return false, err + } + + iprot.ReadMessageEnd() + var err2 error + if err2 = p.handler.EmitMetricBatch(args.Batch); err2 != nil { + return true, err2 + } + return true, nil +} + +// HELPER FUNCTIONS AND STRUCTURES + +type EmitMetricBatchArgs struct { + Batch *MetricBatch `thrift:"batch,1" json:"batch"` +} + +func NewEmitMetricBatchArgs() *EmitMetricBatchArgs { + return &EmitMetricBatchArgs{} +} + +var EmitMetricBatchArgs_Batch_DEFAULT *MetricBatch + +func (p *EmitMetricBatchArgs) GetBatch() *MetricBatch { + if !p.IsSetBatch() { + return EmitMetricBatchArgs_Batch_DEFAULT + } + return p.Batch +} +func (p *EmitMetricBatchArgs) IsSetBatch() bool { + return p.Batch != nil +} + +func (p *EmitMetricBatchArgs) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return fmt.Errorf("%T read error: %s", p, err) + } + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return fmt.Errorf("%T field %d read error: %s", p, fieldId, err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return fmt.Errorf("%T read struct end error: %s", p, err) + } + return nil +} + +func (p *EmitMetricBatchArgs) ReadField1(iprot thrift.TProtocol) error { + p.Batch = &MetricBatch{} + if err := p.Batch.Read(iprot); err != nil { + return fmt.Errorf("%T error reading struct: %s", p.Batch, err) + } + return nil +} + +func (p *EmitMetricBatchArgs) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("emitMetricBatch_args"); err != nil { + return fmt.Errorf("%T write struct begin error: %s", p, err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return fmt.Errorf("write field stop error: %s", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return fmt.Errorf("write struct stop error: %s", err) + } + return nil +} + +func (p *EmitMetricBatchArgs) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("batch", thrift.STRUCT, 1); err != nil { + return fmt.Errorf("%T write field begin error 1:batch: %s", p, err) + } + if err := p.Batch.Write(oprot); err != nil { + return fmt.Errorf("%T error writing struct: %s", p.Batch, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 1:batch: %s", p, err) + } + return err +} + +func (p *EmitMetricBatchArgs) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("EmitMetricBatchArgs(%+v)", *p) +} diff --git a/m3/thrift/ttypes.go b/m3/thrift/ttypes.go new file mode 100644 index 00000000..e876cd5e --- /dev/null +++ b/m3/thrift/ttypes.go @@ -0,0 +1,1149 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +// Autogenerated by Thrift Compiler (0.9.2) @generated +// DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + +package m3 + +import ( + "bytes" + "fmt" + "github.com/apache/thrift/lib/go/thrift" +) + +// (needed to ensure safety because of naive import list construction.) +var _ = thrift.ZERO +var _ = fmt.Printf +var _ = bytes.Equal + +var GoUnusedProtection__ int + +type MetricValue struct { + Count *CountValue `thrift:"count,1" json:"count"` + Gauge *GaugeValue `thrift:"gauge,2" json:"gauge"` + Timer *TimerValue `thrift:"timer,3" json:"timer"` +} + +func NewMetricValue() *MetricValue { + return &MetricValue{} +} + +var MetricValue_Count_DEFAULT CountValue + +func (p *MetricValue) GetCount() CountValue { + if !p.IsSetCount() { + return MetricValue_Count_DEFAULT + } + return *p.Count +} + +var MetricValue_Gauge_DEFAULT GaugeValue + +func (p *MetricValue) GetGauge() GaugeValue { + if !p.IsSetGauge() { + return MetricValue_Gauge_DEFAULT + } + return *p.Gauge +} + +var MetricValue_Timer_DEFAULT TimerValue + +func (p *MetricValue) GetTimer() TimerValue { + if !p.IsSetTimer() { + return MetricValue_Timer_DEFAULT + } + return *p.Timer +} +func (p *MetricValue) IsSetCount() bool { + return p.Count != nil +} + +func (p *MetricValue) IsSetGauge() bool { + return p.Gauge != nil +} + +func (p *MetricValue) IsSetTimer() bool { + return p.Timer != nil +} + +func (p *MetricValue) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return fmt.Errorf("%T read error: %s", p, err) + } + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return fmt.Errorf("%T field %d read error: %s", p, fieldId, err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + case 3: + if err := p.ReadField3(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return fmt.Errorf("%T read struct end error: %s", p, err) + } + return nil +} + +func (p *MetricValue) ReadField1(iprot thrift.TProtocol) error { + p.Count = &CountValue{} + if err := p.Count.Read(iprot); err != nil { + return fmt.Errorf("%T error reading struct: %s", p.Count, err) + } + return nil +} + +func (p *MetricValue) ReadField2(iprot thrift.TProtocol) error { + p.Gauge = &GaugeValue{} + if err := p.Gauge.Read(iprot); err != nil { + return fmt.Errorf("%T error reading struct: %s", p.Gauge, err) + } + return nil +} + +func (p *MetricValue) ReadField3(iprot thrift.TProtocol) error { + p.Timer = &TimerValue{} + if err := p.Timer.Read(iprot); err != nil { + return fmt.Errorf("%T error reading struct: %s", p.Timer, err) + } + return nil +} + +func (p *MetricValue) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("MetricValue"); err != nil { + return fmt.Errorf("%T write struct begin error: %s", p, err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := p.writeField3(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return fmt.Errorf("write field stop error: %s", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return fmt.Errorf("write struct stop error: %s", err) + } + return nil +} + +func (p *MetricValue) writeField1(oprot thrift.TProtocol) (err error) { + if p.IsSetCount() { + if err := oprot.WriteFieldBegin("count", thrift.STRUCT, 1); err != nil { + return fmt.Errorf("%T write field begin error 1:count: %s", p, err) + } + if err := p.Count.Write(oprot); err != nil { + return fmt.Errorf("%T error writing struct: %s", p.Count, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 1:count: %s", p, err) + } + } + return err +} + +func (p *MetricValue) writeField2(oprot thrift.TProtocol) (err error) { + if p.IsSetGauge() { + if err := oprot.WriteFieldBegin("gauge", thrift.STRUCT, 2); err != nil { + return fmt.Errorf("%T write field begin error 2:gauge: %s", p, err) + } + if err := p.Gauge.Write(oprot); err != nil { + return fmt.Errorf("%T error writing struct: %s", p.Gauge, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 2:gauge: %s", p, err) + } + } + return err +} + +func (p *MetricValue) writeField3(oprot thrift.TProtocol) (err error) { + if p.IsSetTimer() { + if err := oprot.WriteFieldBegin("timer", thrift.STRUCT, 3); err != nil { + return fmt.Errorf("%T write field begin error 3:timer: %s", p, err) + } + if err := p.Timer.Write(oprot); err != nil { + return fmt.Errorf("%T error writing struct: %s", p.Timer, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 3:timer: %s", p, err) + } + } + return err +} + +func (p *MetricValue) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("MetricValue(%+v)", *p) +} + +type CountValue struct { + I64Value *int64 `thrift:"i64Value,1" json:"i64Value"` +} + +func NewCountValue() *CountValue { + return &CountValue{} +} + +var CountValue_I64Value_DEFAULT int64 + +func (p *CountValue) GetI64Value() int64 { + if !p.IsSetI64Value() { + return CountValue_I64Value_DEFAULT + } + return *p.I64Value +} +func (p *CountValue) IsSetI64Value() bool { + return p.I64Value != nil +} + +func (p *CountValue) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return fmt.Errorf("%T read error: %s", p, err) + } + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return fmt.Errorf("%T field %d read error: %s", p, fieldId, err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return fmt.Errorf("%T read struct end error: %s", p, err) + } + return nil +} + +func (p *CountValue) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return fmt.Errorf("error reading field 1: %s", err) + } else { + p.I64Value = &v + } + return nil +} + +func (p *CountValue) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("CountValue"); err != nil { + return fmt.Errorf("%T write struct begin error: %s", p, err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return fmt.Errorf("write field stop error: %s", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return fmt.Errorf("write struct stop error: %s", err) + } + return nil +} + +func (p *CountValue) writeField1(oprot thrift.TProtocol) (err error) { + if p.IsSetI64Value() { + if err := oprot.WriteFieldBegin("i64Value", thrift.I64, 1); err != nil { + return fmt.Errorf("%T write field begin error 1:i64Value: %s", p, err) + } + if err := oprot.WriteI64(int64(*p.I64Value)); err != nil { + return fmt.Errorf("%T.i64Value (1) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 1:i64Value: %s", p, err) + } + } + return err +} + +func (p *CountValue) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("CountValue(%+v)", *p) +} + +type GaugeValue struct { + I64Value *int64 `thrift:"i64Value,1" json:"i64Value"` + DValue *float64 `thrift:"dValue,2" json:"dValue"` +} + +func NewGaugeValue() *GaugeValue { + return &GaugeValue{} +} + +var GaugeValue_I64Value_DEFAULT int64 + +func (p *GaugeValue) GetI64Value() int64 { + if !p.IsSetI64Value() { + return GaugeValue_I64Value_DEFAULT + } + return *p.I64Value +} + +var GaugeValue_DValue_DEFAULT float64 + +func (p *GaugeValue) GetDValue() float64 { + if !p.IsSetDValue() { + return GaugeValue_DValue_DEFAULT + } + return *p.DValue +} +func (p *GaugeValue) IsSetI64Value() bool { + return p.I64Value != nil +} + +func (p *GaugeValue) IsSetDValue() bool { + return p.DValue != nil +} + +func (p *GaugeValue) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return fmt.Errorf("%T read error: %s", p, err) + } + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return fmt.Errorf("%T field %d read error: %s", p, fieldId, err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return fmt.Errorf("%T read struct end error: %s", p, err) + } + return nil +} + +func (p *GaugeValue) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return fmt.Errorf("error reading field 1: %s", err) + } else { + p.I64Value = &v + } + return nil +} + +func (p *GaugeValue) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadDouble(); err != nil { + return fmt.Errorf("error reading field 2: %s", err) + } else { + p.DValue = &v + } + return nil +} + +func (p *GaugeValue) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("GaugeValue"); err != nil { + return fmt.Errorf("%T write struct begin error: %s", p, err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return fmt.Errorf("write field stop error: %s", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return fmt.Errorf("write struct stop error: %s", err) + } + return nil +} + +func (p *GaugeValue) writeField1(oprot thrift.TProtocol) (err error) { + if p.IsSetI64Value() { + if err := oprot.WriteFieldBegin("i64Value", thrift.I64, 1); err != nil { + return fmt.Errorf("%T write field begin error 1:i64Value: %s", p, err) + } + if err := oprot.WriteI64(int64(*p.I64Value)); err != nil { + return fmt.Errorf("%T.i64Value (1) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 1:i64Value: %s", p, err) + } + } + return err +} + +func (p *GaugeValue) writeField2(oprot thrift.TProtocol) (err error) { + if p.IsSetDValue() { + if err := oprot.WriteFieldBegin("dValue", thrift.DOUBLE, 2); err != nil { + return fmt.Errorf("%T write field begin error 2:dValue: %s", p, err) + } + if err := oprot.WriteDouble(float64(*p.DValue)); err != nil { + return fmt.Errorf("%T.dValue (2) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 2:dValue: %s", p, err) + } + } + return err +} + +func (p *GaugeValue) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("GaugeValue(%+v)", *p) +} + +type TimerValue struct { + I64Value *int64 `thrift:"i64Value,1" json:"i64Value"` + DValue *float64 `thrift:"dValue,2" json:"dValue"` +} + +func NewTimerValue() *TimerValue { + return &TimerValue{} +} + +var TimerValue_I64Value_DEFAULT int64 + +func (p *TimerValue) GetI64Value() int64 { + if !p.IsSetI64Value() { + return TimerValue_I64Value_DEFAULT + } + return *p.I64Value +} + +var TimerValue_DValue_DEFAULT float64 + +func (p *TimerValue) GetDValue() float64 { + if !p.IsSetDValue() { + return TimerValue_DValue_DEFAULT + } + return *p.DValue +} +func (p *TimerValue) IsSetI64Value() bool { + return p.I64Value != nil +} + +func (p *TimerValue) IsSetDValue() bool { + return p.DValue != nil +} + +func (p *TimerValue) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return fmt.Errorf("%T read error: %s", p, err) + } + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return fmt.Errorf("%T field %d read error: %s", p, fieldId, err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return fmt.Errorf("%T read struct end error: %s", p, err) + } + return nil +} + +func (p *TimerValue) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return fmt.Errorf("error reading field 1: %s", err) + } else { + p.I64Value = &v + } + return nil +} + +func (p *TimerValue) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadDouble(); err != nil { + return fmt.Errorf("error reading field 2: %s", err) + } else { + p.DValue = &v + } + return nil +} + +func (p *TimerValue) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("TimerValue"); err != nil { + return fmt.Errorf("%T write struct begin error: %s", p, err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return fmt.Errorf("write field stop error: %s", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return fmt.Errorf("write struct stop error: %s", err) + } + return nil +} + +func (p *TimerValue) writeField1(oprot thrift.TProtocol) (err error) { + if p.IsSetI64Value() { + if err := oprot.WriteFieldBegin("i64Value", thrift.I64, 1); err != nil { + return fmt.Errorf("%T write field begin error 1:i64Value: %s", p, err) + } + if err := oprot.WriteI64(int64(*p.I64Value)); err != nil { + return fmt.Errorf("%T.i64Value (1) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 1:i64Value: %s", p, err) + } + } + return err +} + +func (p *TimerValue) writeField2(oprot thrift.TProtocol) (err error) { + if p.IsSetDValue() { + if err := oprot.WriteFieldBegin("dValue", thrift.DOUBLE, 2); err != nil { + return fmt.Errorf("%T write field begin error 2:dValue: %s", p, err) + } + if err := oprot.WriteDouble(float64(*p.DValue)); err != nil { + return fmt.Errorf("%T.dValue (2) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 2:dValue: %s", p, err) + } + } + return err +} + +func (p *TimerValue) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("TimerValue(%+v)", *p) +} + +type MetricTag struct { + TagName string `thrift:"tagName,1" json:"tagName"` + TagValue *string `thrift:"tagValue,2" json:"tagValue"` +} + +func NewMetricTag() *MetricTag { + return &MetricTag{} +} + +func (p *MetricTag) GetTagName() string { + return p.TagName +} + +var MetricTag_TagValue_DEFAULT string + +func (p *MetricTag) GetTagValue() string { + if !p.IsSetTagValue() { + return MetricTag_TagValue_DEFAULT + } + return *p.TagValue +} +func (p *MetricTag) IsSetTagValue() bool { + return p.TagValue != nil +} + +func (p *MetricTag) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return fmt.Errorf("%T read error: %s", p, err) + } + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return fmt.Errorf("%T field %d read error: %s", p, fieldId, err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return fmt.Errorf("%T read struct end error: %s", p, err) + } + return nil +} + +func (p *MetricTag) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return fmt.Errorf("error reading field 1: %s", err) + } else { + p.TagName = v + } + return nil +} + +func (p *MetricTag) ReadField2(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return fmt.Errorf("error reading field 2: %s", err) + } else { + p.TagValue = &v + } + return nil +} + +func (p *MetricTag) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("MetricTag"); err != nil { + return fmt.Errorf("%T write struct begin error: %s", p, err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return fmt.Errorf("write field stop error: %s", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return fmt.Errorf("write struct stop error: %s", err) + } + return nil +} + +func (p *MetricTag) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("tagName", thrift.STRING, 1); err != nil { + return fmt.Errorf("%T write field begin error 1:tagName: %s", p, err) + } + if err := oprot.WriteString(string(p.TagName)); err != nil { + return fmt.Errorf("%T.tagName (1) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 1:tagName: %s", p, err) + } + return err +} + +func (p *MetricTag) writeField2(oprot thrift.TProtocol) (err error) { + if p.IsSetTagValue() { + if err := oprot.WriteFieldBegin("tagValue", thrift.STRING, 2); err != nil { + return fmt.Errorf("%T write field begin error 2:tagValue: %s", p, err) + } + if err := oprot.WriteString(string(*p.TagValue)); err != nil { + return fmt.Errorf("%T.tagValue (2) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 2:tagValue: %s", p, err) + } + } + return err +} + +func (p *MetricTag) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("MetricTag(%+v)", *p) +} + +type Metric struct { + Name string `thrift:"name,1" json:"name"` + MetricValue *MetricValue `thrift:"metricValue,2" json:"metricValue"` + Timestamp *int64 `thrift:"timestamp,3" json:"timestamp"` + Tags map[*MetricTag]bool `thrift:"tags,4" json:"tags"` +} + +func NewMetric() *Metric { + return &Metric{} +} + +func (p *Metric) GetName() string { + return p.Name +} + +var Metric_MetricValue_DEFAULT *MetricValue + +func (p *Metric) GetMetricValue() *MetricValue { + if !p.IsSetMetricValue() { + return Metric_MetricValue_DEFAULT + } + return p.MetricValue +} + +var Metric_Timestamp_DEFAULT int64 + +func (p *Metric) GetTimestamp() int64 { + if !p.IsSetTimestamp() { + return Metric_Timestamp_DEFAULT + } + return *p.Timestamp +} + +var Metric_Tags_DEFAULT map[*MetricTag]bool + +func (p *Metric) GetTags() map[*MetricTag]bool { + return p.Tags +} +func (p *Metric) IsSetMetricValue() bool { + return p.MetricValue != nil +} + +func (p *Metric) IsSetTimestamp() bool { + return p.Timestamp != nil +} + +func (p *Metric) IsSetTags() bool { + return p.Tags != nil +} + +func (p *Metric) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return fmt.Errorf("%T read error: %s", p, err) + } + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return fmt.Errorf("%T field %d read error: %s", p, fieldId, err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + case 3: + if err := p.ReadField3(iprot); err != nil { + return err + } + case 4: + if err := p.ReadField4(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return fmt.Errorf("%T read struct end error: %s", p, err) + } + return nil +} + +func (p *Metric) ReadField1(iprot thrift.TProtocol) error { + if v, err := iprot.ReadString(); err != nil { + return fmt.Errorf("error reading field 1: %s", err) + } else { + p.Name = v + } + return nil +} + +func (p *Metric) ReadField2(iprot thrift.TProtocol) error { + p.MetricValue = &MetricValue{} + if err := p.MetricValue.Read(iprot); err != nil { + return fmt.Errorf("%T error reading struct: %s", p.MetricValue, err) + } + return nil +} + +func (p *Metric) ReadField3(iprot thrift.TProtocol) error { + if v, err := iprot.ReadI64(); err != nil { + return fmt.Errorf("error reading field 3: %s", err) + } else { + p.Timestamp = &v + } + return nil +} + +func (p *Metric) ReadField4(iprot thrift.TProtocol) error { + _, size, err := iprot.ReadSetBegin() + if err != nil { + return fmt.Errorf("error reading set begin: %s", err) + } + tSet := make(map[*MetricTag]bool, size) + p.Tags = tSet + for i := 0; i < size; i++ { + _elem0 := &MetricTag{} + if err := _elem0.Read(iprot); err != nil { + return fmt.Errorf("%T error reading struct: %s", _elem0, err) + } + p.Tags[_elem0] = true + } + if err := iprot.ReadSetEnd(); err != nil { + return fmt.Errorf("error reading set end: %s", err) + } + return nil +} + +func (p *Metric) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("Metric"); err != nil { + return fmt.Errorf("%T write struct begin error: %s", p, err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := p.writeField3(oprot); err != nil { + return err + } + if err := p.writeField4(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return fmt.Errorf("write field stop error: %s", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return fmt.Errorf("write struct stop error: %s", err) + } + return nil +} + +func (p *Metric) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("name", thrift.STRING, 1); err != nil { + return fmt.Errorf("%T write field begin error 1:name: %s", p, err) + } + if err := oprot.WriteString(string(p.Name)); err != nil { + return fmt.Errorf("%T.name (1) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 1:name: %s", p, err) + } + return err +} + +func (p *Metric) writeField2(oprot thrift.TProtocol) (err error) { + if p.IsSetMetricValue() { + if err := oprot.WriteFieldBegin("metricValue", thrift.STRUCT, 2); err != nil { + return fmt.Errorf("%T write field begin error 2:metricValue: %s", p, err) + } + if err := p.MetricValue.Write(oprot); err != nil { + return fmt.Errorf("%T error writing struct: %s", p.MetricValue, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 2:metricValue: %s", p, err) + } + } + return err +} + +func (p *Metric) writeField3(oprot thrift.TProtocol) (err error) { + if p.IsSetTimestamp() { + if err := oprot.WriteFieldBegin("timestamp", thrift.I64, 3); err != nil { + return fmt.Errorf("%T write field begin error 3:timestamp: %s", p, err) + } + if err := oprot.WriteI64(int64(*p.Timestamp)); err != nil { + return fmt.Errorf("%T.timestamp (3) field write error: %s", p, err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 3:timestamp: %s", p, err) + } + } + return err +} + +func (p *Metric) writeField4(oprot thrift.TProtocol) (err error) { + if p.IsSetTags() { + if err := oprot.WriteFieldBegin("tags", thrift.SET, 4); err != nil { + return fmt.Errorf("%T write field begin error 4:tags: %s", p, err) + } + if err := oprot.WriteSetBegin(thrift.STRUCT, len(p.Tags)); err != nil { + return fmt.Errorf("error writing set begin: %s", err) + } + for v := range p.Tags { + if err := v.Write(oprot); err != nil { + return fmt.Errorf("%T error writing struct: %s", v, err) + } + } + if err := oprot.WriteSetEnd(); err != nil { + return fmt.Errorf("error writing set end: %s", err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 4:tags: %s", p, err) + } + } + return err +} + +func (p *Metric) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("Metric(%+v)", *p) +} + +type MetricBatch struct { + Metrics []*Metric `thrift:"metrics,1" json:"metrics"` + CommonTags map[*MetricTag]bool `thrift:"commonTags,2" json:"commonTags"` +} + +func NewMetricBatch() *MetricBatch { + return &MetricBatch{} +} + +func (p *MetricBatch) GetMetrics() []*Metric { + return p.Metrics +} + +var MetricBatch_CommonTags_DEFAULT map[*MetricTag]bool + +func (p *MetricBatch) GetCommonTags() map[*MetricTag]bool { + return p.CommonTags +} +func (p *MetricBatch) IsSetCommonTags() bool { + return p.CommonTags != nil +} + +func (p *MetricBatch) Read(iprot thrift.TProtocol) error { + if _, err := iprot.ReadStructBegin(); err != nil { + return fmt.Errorf("%T read error: %s", p, err) + } + for { + _, fieldTypeId, fieldId, err := iprot.ReadFieldBegin() + if err != nil { + return fmt.Errorf("%T field %d read error: %s", p, fieldId, err) + } + if fieldTypeId == thrift.STOP { + break + } + switch fieldId { + case 1: + if err := p.ReadField1(iprot); err != nil { + return err + } + case 2: + if err := p.ReadField2(iprot); err != nil { + return err + } + default: + if err := iprot.Skip(fieldTypeId); err != nil { + return err + } + } + if err := iprot.ReadFieldEnd(); err != nil { + return err + } + } + if err := iprot.ReadStructEnd(); err != nil { + return fmt.Errorf("%T read struct end error: %s", p, err) + } + return nil +} + +func (p *MetricBatch) ReadField1(iprot thrift.TProtocol) error { + _, size, err := iprot.ReadListBegin() + if err != nil { + return fmt.Errorf("error reading list begin: %s", err) + } + tSlice := make([]*Metric, 0, size) + p.Metrics = tSlice + for i := 0; i < size; i++ { + _elem1 := &Metric{} + if err := _elem1.Read(iprot); err != nil { + return fmt.Errorf("%T error reading struct: %s", _elem1, err) + } + p.Metrics = append(p.Metrics, _elem1) + } + if err := iprot.ReadListEnd(); err != nil { + return fmt.Errorf("error reading list end: %s", err) + } + return nil +} + +func (p *MetricBatch) ReadField2(iprot thrift.TProtocol) error { + _, size, err := iprot.ReadSetBegin() + if err != nil { + return fmt.Errorf("error reading set begin: %s", err) + } + tSet := make(map[*MetricTag]bool, size) + p.CommonTags = tSet + for i := 0; i < size; i++ { + _elem2 := &MetricTag{} + if err := _elem2.Read(iprot); err != nil { + return fmt.Errorf("%T error reading struct: %s", _elem2, err) + } + p.CommonTags[_elem2] = true + } + if err := iprot.ReadSetEnd(); err != nil { + return fmt.Errorf("error reading set end: %s", err) + } + return nil +} + +func (p *MetricBatch) Write(oprot thrift.TProtocol) error { + if err := oprot.WriteStructBegin("MetricBatch"); err != nil { + return fmt.Errorf("%T write struct begin error: %s", p, err) + } + if err := p.writeField1(oprot); err != nil { + return err + } + if err := p.writeField2(oprot); err != nil { + return err + } + if err := oprot.WriteFieldStop(); err != nil { + return fmt.Errorf("write field stop error: %s", err) + } + if err := oprot.WriteStructEnd(); err != nil { + return fmt.Errorf("write struct stop error: %s", err) + } + return nil +} + +func (p *MetricBatch) writeField1(oprot thrift.TProtocol) (err error) { + if err := oprot.WriteFieldBegin("metrics", thrift.LIST, 1); err != nil { + return fmt.Errorf("%T write field begin error 1:metrics: %s", p, err) + } + if err := oprot.WriteListBegin(thrift.STRUCT, len(p.Metrics)); err != nil { + return fmt.Errorf("error writing list begin: %s", err) + } + for _, v := range p.Metrics { + if err := v.Write(oprot); err != nil { + return fmt.Errorf("%T error writing struct: %s", v, err) + } + } + if err := oprot.WriteListEnd(); err != nil { + return fmt.Errorf("error writing list end: %s", err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 1:metrics: %s", p, err) + } + return err +} + +func (p *MetricBatch) writeField2(oprot thrift.TProtocol) (err error) { + if p.IsSetCommonTags() { + if err := oprot.WriteFieldBegin("commonTags", thrift.SET, 2); err != nil { + return fmt.Errorf("%T write field begin error 2:commonTags: %s", p, err) + } + if err := oprot.WriteSetBegin(thrift.STRUCT, len(p.CommonTags)); err != nil { + return fmt.Errorf("error writing set begin: %s", err) + } + for v := range p.CommonTags { + if err := v.Write(oprot); err != nil { + return fmt.Errorf("%T error writing struct: %s", v, err) + } + } + if err := oprot.WriteSetEnd(); err != nil { + return fmt.Errorf("error writing set end: %s", err) + } + if err := oprot.WriteFieldEnd(); err != nil { + return fmt.Errorf("%T write field end error 2:commonTags: %s", p, err) + } + } + return err +} + +func (p *MetricBatch) String() string { + if p == nil { + return "" + } + return fmt.Sprintf("MetricBatch(%+v)", *p) +} diff --git a/m3/thrift/v1.0.0/m3.thrift b/m3/thrift/v1.0.0/m3.thrift new file mode 100644 index 00000000..05509543 --- /dev/null +++ b/m3/thrift/v1.0.0/m3.thrift @@ -0,0 +1,70 @@ +/** +* Different types of values that m3 emits. Each metric +* must contain one of these values +*/ +union MetricValue { + 1: optional CountValue count + 2: optional GaugeValue gauge + 3: optional TimerValue timer +} + +/** +* Different types of count values +*/ +union CountValue { + 1: optional i64 i64Value +} + +/** +* Different types of gauge values +*/ +union GaugeValue { + 1: optional i64 i64Value + 2: optional double dValue +} + +/** +* Different types of timer values +*/ +union TimerValue { + 1: optional i64 i64Value + 2: optional double dValue +} + +/** +* Tags that can be applied to a metric +*/ +struct MetricTag { + 1: string tagName, + 2: optional string tagValue +} + +/** +* The metric that is being emitted +*/ +struct Metric { + 1: string name, + 2: optional MetricValue metricValue, + 3: optional i64 timestamp, + 4: optional set tags +} + +/** +* Structure that holds a group of metrics which share +* common properties like the cluster and service. +*/ +struct MetricBatch { + 1: list metrics + 2: optional set commonTags +} + +/** +* M3 Metrics Service +*/ +service M3 { + + /** + * Emits a batch of metrics. + */ + oneway void emitMetricBatch(1: MetricBatch batch) +} diff --git a/m3/thriftudp/multitransport.go b/m3/thriftudp/multitransport.go new file mode 100644 index 00000000..d81ae419 --- /dev/null +++ b/m3/thriftudp/multitransport.go @@ -0,0 +1,120 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package thriftudp + +import ( + "fmt" + + "github.com/apache/thrift/lib/go/thrift" +) + +// TMultiUDPTransport does multiUDP as a thrift.TTransport +type TMultiUDPTransport struct { + transports []thrift.TTransport +} + +// NewTMultiUDPClientTransport creates a set of net.UDPConn-backed TTransports for Thrift clients +// All writes are buffered and flushed in one UDP packet. If locHostPort is not "", it +// will be used as the local address for the connection +// Example: +// trans, err := thriftudp.NewTMultiUDPClientTransport([]string{"192.168.1.1:9090","192.168.1.2:9090"}, "") +func NewTMultiUDPClientTransport( + destHostPorts []string, + locHostPort string, +) (*TMultiUDPTransport, error) { + var transports []thrift.TTransport + for i := range destHostPorts { + trans, err := NewTUDPClientTransport(destHostPorts[i], locHostPort) + if err != nil { + return nil, err + } + transports = append(transports, trans) + } + + return &TMultiUDPTransport{transports: transports}, nil +} + +// Open the connections of the underlying transports +func (p *TMultiUDPTransport) Open() error { + for _, trans := range p.transports { + if err := trans.Open(); err != nil { + return err + } + } + return nil +} + +// IsOpen returns true if the connections of the underlying transports are open +func (p *TMultiUDPTransport) IsOpen() bool { + for _, trans := range p.transports { + if open := trans.IsOpen(); !open { + return false + } + } + return true +} + +// Close closes the connections of the underlying transports +func (p *TMultiUDPTransport) Close() error { + for _, trans := range p.transports { + if err := trans.Close(); err != nil { + return err + } + } + return nil +} + +// Read is not supported for multiple underlying transports +func (p *TMultiUDPTransport) Read(buf []byte) (int, error) { + // Not applicable, required by TTransport however + return 0, fmt.Errorf("not supported") +} + +// RemainingBytes is not supported for multiple underlying transports +func (p *TMultiUDPTransport) RemainingBytes() uint64 { + // Not applicable, required by TTransport however + return 0 +} + +// Write writes specified buf to the write buffer of underlying transports +func (p *TMultiUDPTransport) Write(buff []byte) (int, error) { + n := 0 + for _, trans := range p.transports { + written, err := trans.Write(buff) + if err != nil { + return n, err + } + if written > n { + n = written + } + } + return n, nil +} + +// Flush flushes the write buffer of the underlying transports +func (p *TMultiUDPTransport) Flush() error { + for _, trans := range p.transports { + if err := trans.Flush(); err != nil { + return err + } + } + return nil +} diff --git a/m3/thriftudp/multitransport_test.go b/m3/thriftudp/multitransport_test.go new file mode 100644 index 00000000..6f413822 --- /dev/null +++ b/m3/thriftudp/multitransport_test.go @@ -0,0 +1,204 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package thriftudp + +import ( + "fmt" + "testing" + + "github.com/apache/thrift/lib/go/thrift" + + "github.com/stretchr/testify/assert" +) + +func TestNewTMultiUDPClientTransport(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{"127.0.0.1:9090", "127.0.0.1:9090"}, "") + assert.NotNil(t, trans) + assert.Nil(t, err) + trans.Close() +} + +func TestNewTMultiUDPClientTransportBadAddress(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{"not an address"}, "") + assert.Nil(t, trans) + assert.NotNil(t, err) +} + +func TestTMultiUDPTransportOpen(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{}, "") + assert.Nil(t, err) + + trans.transports = []thrift.TTransport{newMockTransport(), newMockTransport()} + assert.Nil(t, trans.Open()) + + trans.Close() +} + +func TestTMultiUDPTransportOpenFail(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{}, "") + assert.Nil(t, err) + + trans.transports = []thrift.TTransport{newMockTransport(), newMockTransport()} + trans.transports[1].(*mockTransport).openError = fmt.Errorf("error") + assert.NotNil(t, trans.Open()) + trans.Close() +} + +func TestTMultiUDPTransportIsOpen(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{}, "") + assert.Nil(t, err) + + trans.transports = []thrift.TTransport{newMockTransport(), newMockTransport()} + trans.transports[0].(*mockTransport).currentlyOpen = true + trans.transports[1].(*mockTransport).currentlyOpen = true + assert.True(t, trans.IsOpen()) + trans.Close() +} + +func TestTMultiUDPTransportIsNotOpen(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{}, "") + assert.Nil(t, err) + + trans.transports = []thrift.TTransport{newMockTransport(), newMockTransport()} + assert.False(t, trans.IsOpen()) + trans.Close() +} + +func TestTMultiUDPTransportReadNotSupported(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{}, "") + assert.Nil(t, err) + + n, err := trans.Read([]byte{}) + assert.Equal(t, int(0), n) + assert.NotNil(t, err) + trans.Close() +} + +func TestTMultiUDPTransportRemainingBytesNotSupported(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{}, "") + assert.Nil(t, err) + + n := trans.RemainingBytes() + assert.Equal(t, uint64(0), n) + trans.Close() +} + +func TestTMultiUDPTransportWrite(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{}, "") + assert.Nil(t, err) + + trans.transports = []thrift.TTransport{newMockTransport(), newMockTransport()} + trans.transports[0].(*mockTransport).onWrite = func(buf []byte) (int, error) { + return len(buf), nil + } + trans.transports[1].(*mockTransport).onWrite = func(buf []byte) (int, error) { + return len(buf), nil + } + + b := []byte{1, 2, 3} + n, err := trans.Write(b) + assert.Equal(t, int(len(b)), n) + assert.Nil(t, err) + + trans.Flush() + trans.Close() +} + +func TestTMultiUDPTransportWriteError(t *testing.T) { + trans, err := NewTMultiUDPClientTransport([]string{}, "") + assert.Nil(t, err) + + trans.transports = []thrift.TTransport{newMockTransport(), newMockTransport()} + trans.transports[0].(*mockTransport).onWrite = func(buf []byte) (int, error) { + return 0, fmt.Errorf("error") + } + trans.transports[1].(*mockTransport).onWrite = func(buf []byte) (int, error) { + return len(buf), nil + } + + b := []byte{1, 2, 3} + n, err := trans.Write(b) + assert.Equal(t, int(0), n) + assert.NotNil(t, err) + trans.Close() +} + +type mockTransport struct { + currentlyOpen bool + onRead func(buf []byte) (int, error) + onRemainingBytes func() uint64 + onWrite func(buf []byte) (int, error) + onFlush func() error + openError error + closeError error +} + +func newMockTransport() *mockTransport { + return &mockTransport{} +} + +func (m *mockTransport) Open() error { + if m.openError != nil { + return m.openError + } + m.currentlyOpen = true + return nil +} + +func (m *mockTransport) IsOpen() bool { + return m.currentlyOpen +} + +func (m *mockTransport) Close() error { + if m.closeError != nil { + return m.closeError + } + m.currentlyOpen = false + return nil +} + +func (m *mockTransport) Read(buf []byte) (int, error) { + if m.onRead != nil { + return m.onRead(buf) + } + return 0, fmt.Errorf("no mock Read implementation") +} + +func (m *mockTransport) RemainingBytes() uint64 { + if m.onRemainingBytes != nil { + return m.onRemainingBytes() + } + return 0 +} + +func (m *mockTransport) Write(buf []byte) (int, error) { + if m.onWrite != nil { + return m.onWrite(buf) + } + return 0, fmt.Errorf("no mock Write implementation") +} + +func (m *mockTransport) Flush() error { + if m.onFlush != nil { + return m.onFlush() + } + return fmt.Errorf("no mock Flush implementation") +} diff --git a/m3/thriftudp/transport.go b/m3/thriftudp/transport.go new file mode 100644 index 00000000..1fd03eb6 --- /dev/null +++ b/m3/thriftudp/transport.go @@ -0,0 +1,152 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package thriftudp + +import ( + "bytes" + "net" + + "github.com/apache/thrift/lib/go/thrift" + "github.com/uber-go/atomic" +) + +//MaxLength of UDP packet +const MaxLength = 65000 + +// TUDPTransport does UDP as a thrift.TTransport +type TUDPTransport struct { + conn *net.UDPConn + addr net.Addr + writeBuf bytes.Buffer + closed atomic.Bool +} + +// NewTUDPClientTransport creates a net.UDPConn-backed TTransport for Thrift clients +// All writes are buffered and flushed in one UDP packet. If locHostPort is not "", it +// will be used as the local address for the connection +// Example: +// trans, err := thriftudp.NewTUDPClientTransport("192.168.1.1:9090", "") +func NewTUDPClientTransport(destHostPort string, locHostPort string) (*TUDPTransport, error) { + destAddr, err := net.ResolveUDPAddr("udp", destHostPort) + if err != nil { + return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error()) + } + + var locAddr *net.UDPAddr + if locHostPort != "" { + locAddr, err = net.ResolveUDPAddr("udp", locHostPort) + if err != nil { + return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error()) + } + } + + conn, err := net.DialUDP(destAddr.Network(), locAddr, destAddr) + if err != nil { + return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error()) + } + + return &TUDPTransport{addr: destAddr, conn: conn}, nil +} + +// NewTUDPServerTransport creates a net.UDPConn-backed TTransport for Thrift servers +// It will listen for incoming udp packets on the specified host/port +// Example: +// trans, err := thriftudp.NewTUDPClientTransport("localhost:9001") +func NewTUDPServerTransport(hostPort string) (*TUDPTransport, error) { + addr, err := net.ResolveUDPAddr("udp", hostPort) + if err != nil { + return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error()) + } + conn, err := net.ListenUDP(addr.Network(), addr) + if err != nil { + return nil, thrift.NewTTransportException(thrift.NOT_OPEN, err.Error()) + } + return &TUDPTransport{addr: conn.LocalAddr(), conn: conn}, nil +} + +// Open does nothing as connection is opened on creation +// Required to maintain thrift.TTransport interface +func (p *TUDPTransport) Open() error { + return nil +} + +// Conn retrieves the underlying net.UDPConn +func (p *TUDPTransport) Conn() *net.UDPConn { + return p.conn +} + +// IsOpen returns true if the connection is open +func (p *TUDPTransport) IsOpen() bool { + return !p.closed.Load() +} + +// Close closes the transport and the underlying connection. +// Note: the current implementation allows Close to be called multiple times without an error. +func (p *TUDPTransport) Close() error { + if closed := p.closed.Swap(true); !closed { + return p.conn.Close() + } + return nil +} + +// Addr returns the address that the transport is listening on or writing to +func (p *TUDPTransport) Addr() net.Addr { + return p.addr +} + +// Read reads one UDP packet and puts it in the specified buf +func (p *TUDPTransport) Read(buf []byte) (int, error) { + if !p.IsOpen() { + return 0, thrift.NewTTransportException(thrift.NOT_OPEN, "Connection not open") + } + n, err := p.conn.Read(buf) + return n, thrift.NewTTransportExceptionFromError(err) +} + +// RemainingBytes returns the max number of bytes (same as Thrift's StreamTransport) as we +// do not know how many bytes we have left. +func (p *TUDPTransport) RemainingBytes() uint64 { + const maxSize = ^uint64(0) + return maxSize +} + +// Write writes specified buf to the write buffer +func (p *TUDPTransport) Write(buf []byte) (int, error) { + if !p.IsOpen() { + return 0, thrift.NewTTransportException(thrift.NOT_OPEN, "Connection not open") + } + if len(p.writeBuf.Bytes())+len(buf) > MaxLength { + return 0, thrift.NewTTransportException(thrift.INVALID_DATA, "Data does not fit within one UDP packet") + } + n, err := p.writeBuf.Write(buf) + return n, thrift.NewTTransportExceptionFromError(err) +} + +// Flush flushes the write buffer as one udp packet +func (p *TUDPTransport) Flush() error { + if !p.IsOpen() { + return thrift.NewTTransportException(thrift.NOT_OPEN, "Connection not open") + } + + _, err := p.conn.Write(p.writeBuf.Bytes()) + p.writeBuf.Reset() // always reset the buffer, even in case of an error + return err +} diff --git a/m3/thriftudp/transport_test.go b/m3/thriftudp/transport_test.go new file mode 100644 index 00000000..317e0e06 --- /dev/null +++ b/m3/thriftudp/transport_test.go @@ -0,0 +1,241 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package thriftudp + +import ( + "net" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var localListenAddr = &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1)} + +func TestNewTUDPClientTransport(t *testing.T) { + _, err := NewTUDPClientTransport("fakeAddressAndPort", "") + require.NotNil(t, err) + + _, err = NewTUDPClientTransport("localhost:9090", "fakeaddressandport") + require.NotNil(t, err) + + withLocalServer(t, func(addr string) { + trans, err := NewTUDPClientTransport(addr, "") + require.Nil(t, err) + require.True(t, trans.IsOpen()) + require.NotNil(t, trans.Addr()) + + //Check address + assert.True(t, strings.HasPrefix(trans.Addr().String(), "127.0.0.1:"), "address check") + require.Equal(t, "udp", trans.Addr().Network()) + + err = trans.Open() + require.Nil(t, err) + + err = trans.Close() + require.Nil(t, err) + require.False(t, trans.IsOpen()) + }) +} + +func TestNewTUDPServerTransport(t *testing.T) { + _, err := NewTUDPServerTransport("fakeAddressAndPort") + require.NotNil(t, err) + + trans, err := NewTUDPServerTransport(localListenAddr.String()) + require.Nil(t, err) + require.True(t, trans.IsOpen()) + require.Equal(t, ^uint64(0), trans.RemainingBytes()) + + //Ensure a second server can't be created on the same address + trans2, err := NewTUDPServerTransport(trans.Addr().String()) + if trans2 != nil { + //close the second server if one got created + trans2.Close() + } + require.NotNil(t, err) + + err = trans.Close() + require.Nil(t, err) + require.False(t, trans.IsOpen()) +} + +func TestTUDPServerTransportIsOpen(t *testing.T) { + _, err := NewTUDPServerTransport("fakeAddressAndPort") + require.NotNil(t, err) + + trans, err := NewTUDPServerTransport(localListenAddr.String()) + require.Nil(t, err) + require.True(t, trans.IsOpen()) + require.Equal(t, ^uint64(0), trans.RemainingBytes()) + + wg := sync.WaitGroup{} + wg.Add(2) + go func() { + time.Sleep(2 * time.Millisecond) + err = trans.Close() + require.Nil(t, err) + wg.Done() + }() + + go func() { + for i := 0; i < 4; i++ { + time.Sleep(1 * time.Millisecond) + trans.IsOpen() + } + wg.Done() + }() + + wg.Wait() + require.False(t, trans.IsOpen()) +} + +func TestWriteRead(t *testing.T) { + server, err := NewTUDPServerTransport(localListenAddr.String()) + require.Nil(t, err) + defer server.Close() + + client, err := NewTUDPClientTransport(server.Addr().String(), "") + require.Nil(t, err) + defer client.Close() + + n, err := client.Write([]byte("test")) + require.Nil(t, err) + require.Equal(t, 4, n) + n, err = client.Write([]byte("string")) + require.Nil(t, err) + require.Equal(t, 6, n) + err = client.Flush() + require.Nil(t, err) + + expected := []byte("teststring") + readBuf := make([]byte, 20) + n, err = server.Read(readBuf) + require.Nil(t, err) + require.Equal(t, len(expected), n) + require.Equal(t, expected, readBuf[0:n]) +} + +func TestIndirectCloseError(t *testing.T) { + trans, err := NewTUDPServerTransport(localListenAddr.String()) + require.Nil(t, err) + require.True(t, trans.IsOpen()) + + //Close connection object directly + conn := trans.Conn() + require.NotNil(t, conn) + err = conn.Close() + require.NoError(t, err, "calling close directly on connection") + + err = trans.Close() + require.Error(t, err, "calling close on transport") +} + +// Note: this test is here merely to capture the existing functionality. +// It's questionable whether multiple calls to Close() should succeed or not. +func TestDoubleCloseIsOK(t *testing.T) { + trans, err := NewTUDPServerTransport(localListenAddr.String()) + require.Nil(t, err) + require.True(t, trans.IsOpen()) + + conn := trans.Conn() + require.NotNil(t, conn) + err = trans.Close() + require.NoError(t, err, "closing transport") + + err = trans.Close() + require.NoError(t, err, "closing transport second time") +} + +func TestConnClosedReadWrite(t *testing.T) { + trans, err := NewTUDPServerTransport(localListenAddr.String()) + require.Nil(t, err) + require.True(t, trans.IsOpen()) + trans.Close() + require.False(t, trans.IsOpen()) + + _, err = trans.Read(make([]byte, 1)) + require.NotNil(t, err) + _, err = trans.Write([]byte("test")) + require.NotNil(t, err) +} + +func TestHugeWrite(t *testing.T) { + withLocalServer(t, func(addr string) { + trans, err := NewTUDPClientTransport(addr, "") + require.Nil(t, err) + + hugeMessage := make([]byte, 40000) + _, err = trans.Write(hugeMessage) + require.Nil(t, err) + + //expect buffer to exceed max + _, err = trans.Write(hugeMessage) + require.NotNil(t, err) + }) +} + +func TestFlushErrors(t *testing.T) { + withLocalServer(t, func(addr string) { + trans, err := NewTUDPClientTransport(addr, "") + require.Nil(t, err) + + //flushing closed transport + trans.Close() + err = trans.Flush() + require.NotNil(t, err) + + //error when trying to write in flush + trans, err = NewTUDPClientTransport(addr, "") + require.Nil(t, err) + trans.conn.Close() + + trans.Write([]byte{1, 2, 3, 4}) + err = trans.Flush() + require.Error(t, trans.Flush(), "Flush with data should fail") + }) +} + +func TestResetInFlush(t *testing.T) { + conn, err := net.ListenUDP(localListenAddr.Network(), localListenAddr) + require.NoError(t, err, "ListenUDP failed") + + trans, err := NewTUDPClientTransport(conn.LocalAddr().String(), "") + require.Nil(t, err) + + trans.Write([]byte("some nonsense")) + trans.conn.Close() // close the transport's connection via back door + + err = trans.Flush() + require.NotNil(t, err, "should fail to write to closed connection") + assert.Equal(t, 0, trans.writeBuf.Len(), "should reset the buffer") +} + +func withLocalServer(t *testing.T, f func(addr string)) { + conn, err := net.ListenUDP(localListenAddr.Network(), localListenAddr) + require.NoError(t, err, "ListenUDP failed") + + f(conn.LocalAddr().String()) + require.NoError(t, conn.Close(), "Close failed") +} diff --git a/reporter.go b/reporter.go index 8cdfded7..ba255316 100644 --- a/reporter.go +++ b/reporter.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/scope.go b/scope.go index 846fa657..61863bc5 100644 --- a/scope.go +++ b/scope.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -137,7 +137,7 @@ func newRootScope( registry: &scopeRegistry{ subscopes: make(map[string]*scope), }, - quit: make(chan struct{}), + quit: make(chan struct{}, 1), counters: make(map[string]*counter), gauges: make(map[string]*gauge), diff --git a/scope_benchmark_test.go b/scope_benchmark_test.go index e643e19e..fa774262 100644 --- a/scope_benchmark_test.go +++ b/scope_benchmark_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package tally import "testing" diff --git a/scope_test.go b/scope_test.go index 32eba5a4..c4e64716 100644 --- a/scope_test.go +++ b/scope_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/stats.go b/stats.go index 7bfc4b8f..b1b69258 100644 --- a/stats.go +++ b/stats.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/stats_benchmark_test.go b/stats_benchmark_test.go index bdc9f624..edd1d2fd 100644 --- a/stats_benchmark_test.go +++ b/stats_benchmark_test.go @@ -1,3 +1,23 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package tally import "testing" diff --git a/stats_test.go b/stats_test.go index c1bbd17e..c7892b3b 100644 --- a/stats_test.go +++ b/stats_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/statsd/options.go b/statsd/options.go index 45b9656f..fc5ca6a0 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/statsd/reporter.go b/statsd/reporter.go index 4710e2c3..e5ec3a71 100644 --- a/statsd/reporter.go +++ b/statsd/reporter.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/statsd/reporter_test.go b/statsd/reporter_test.go index adffbdca..15ec78e6 100644 --- a/statsd/reporter_test.go +++ b/statsd/reporter_test.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal diff --git a/types.go b/types.go index 0aa74164..dadd51bf 100644 --- a/types.go +++ b/types.go @@ -1,4 +1,4 @@ -// Copyright (c) 2016 Uber Technologies, Inc. +// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal