diff --git a/acceptance_tests/acceptance_suite_test.go b/acceptance_tests/acceptance_suite_test.go index 6b14bb506..6f4ac8764 100644 --- a/acceptance_tests/acceptance_suite_test.go +++ b/acceptance_tests/acceptance_suite_test.go @@ -14,10 +14,9 @@ import ( . "github.com/onsi/gomega" "github.com/onsi/gomega/gbytes" "github.com/onsi/gomega/gexec" - "go.uber.org/zap" - "github.com/yandex/pandora/lib/ginkgoutil" "github.com/yandex/pandora/lib/tag" + "go.uber.org/zap" ) var pandoraBin string diff --git a/acceptance_tests/http_test.go b/acceptance_tests/http_test.go index ab71fd37d..fea9dc757 100644 --- a/acceptance_tests/http_test.go +++ b/acceptance_tests/http_test.go @@ -2,14 +2,12 @@ package acceptance import ( "net/http" - - "golang.org/x/net/http2" - "net/http/httptest" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "go.uber.org/atomic" + "golang.org/x/net/http2" ) var _ = Describe("http", func() { diff --git a/cli/cli.go b/cli/cli.go index f2659c30a..2310e55d2 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -17,15 +17,14 @@ import ( "time" "github.com/spf13/viper" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "github.com/yandex/pandora/core/config" "github.com/yandex/pandora/core/engine" "github.com/yandex/pandora/lib/zaputil" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) -const Version = "0.3.5" +const Version = "0.4.0" const defaultConfigFile = "load" const stdinConfigSelector = "-" @@ -129,30 +128,41 @@ func Run() { errs := make(chan error) go runEngine(ctx, pandora, errs) + // waiting for signal or error message from engine + awaitPandoraTermination(pandora, cancel, errs, log) + log.Info("Engine run successfully finished") +} + +// helper function that awaits pandora run +func awaitPandoraTermination(pandora *engine.Engine, gracefulShutdown func(), errs chan error, log *zap.Logger) { sigs := make(chan os.Signal, 2) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - // waiting for signal or error message from engine select { case sig := <-sigs: + var interruptTimeout = 3 * time.Second switch sig { case syscall.SIGINT: - const interruptTimeout = 5 * time.Second - log.Info("SIGINT received. Trying to stop gracefully.", zap.Duration("timeout", interruptTimeout)) - cancel() - select { - case <-time.After(interruptTimeout): - log.Fatal("Interrupt timeout exceeded") - case sig := <-sigs: - log.Fatal("Another signal received. Quiting.", zap.Stringer("signal", sig)) - case err := <-errs: - log.Fatal("Engine interrupted", zap.Error(err)) - } + // await gun timeout but no longer than 30 sec. + interruptTimeout = 30 * time.Second + log.Info("SIGINT received. Graceful shutdown.", zap.Duration("timeout", interruptTimeout)) + gracefulShutdown() case syscall.SIGTERM: - log.Fatal("SIGTERM received. Quiting.") + log.Info("SIGTERM received. Trying to stop gracefully.", zap.Duration("timeout", interruptTimeout)) + gracefulShutdown() default: log.Fatal("Unexpected signal received. Quiting.", zap.Stringer("signal", sig)) } + + select { + case <-time.After(interruptTimeout): + log.Fatal("Interrupt timeout exceeded") + case sig := <-sigs: + log.Fatal("Another signal received. Quiting.", zap.Stringer("signal", sig)) + case err := <-errs: + log.Fatal("Engine interrupted", zap.Error(err)) + } + case err := <-errs: switch err { case nil: @@ -160,7 +170,7 @@ func Run() { case err: const awaitTimeout = 3 * time.Second log.Error("Engine run failed. Awaiting started tasks.", zap.Error(err), zap.Duration("timeout", awaitTimeout)) - cancel() + gracefulShutdown() time.AfterFunc(awaitTimeout, func() { log.Fatal("Engine tasks timeout exceeded.") }) @@ -168,7 +178,6 @@ func Run() { log.Fatal("Engine run failed. Pandora graceful shutdown successfully finished") } } - log.Info("Engine run successfully finished") } func runEngine(ctx context.Context, engine *engine.Engine, errs chan error) { diff --git a/cli/expvar.go b/cli/expvar.go index ed8f436d9..054ffe9f1 100644 --- a/cli/expvar.go +++ b/cli/expvar.go @@ -3,10 +3,9 @@ package cli import ( "time" - "go.uber.org/zap" - "github.com/yandex/pandora/core/engine" "github.com/yandex/pandora/lib/monitoring" + "go.uber.org/zap" ) func newEngineMetrics() engine.Metrics { diff --git a/components/example/example.go b/components/example/example.go index f6dab2222..c5515f787 100644 --- a/components/example/example.go +++ b/components/example/example.go @@ -10,10 +10,9 @@ import ( "fmt" "sync" - "go.uber.org/zap" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator/netsample" + "go.uber.org/zap" ) type Ammo struct { diff --git a/components/grpc/ammo/grpcjson/provider.go b/components/grpc/ammo/grpcjson/provider.go index 79c3e5dcd..067012fec 100644 --- a/components/grpc/ammo/grpcjson/provider.go +++ b/components/grpc/ammo/grpcjson/provider.go @@ -9,13 +9,12 @@ import ( "bufio" "context" - "github.com/pkg/errors" - "go.uber.org/zap" - - "github.com/yandex/pandora/components/grpc/ammo" - jsoniter "github.com/json-iterator/go" + "github.com/pkg/errors" "github.com/spf13/afero" + "github.com/yandex/pandora/components/grpc/ammo" + "github.com/yandex/pandora/lib/confutil" + "go.uber.org/zap" ) func NewProvider(fs afero.Fs, conf Config) *Provider { @@ -51,6 +50,7 @@ type Config struct { //Maximum number of byte in an ammo. Default is bufio.MaxScanTokenSize MaxAmmoSize int Source Source `config:"source"` + ChosenCases []string } func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { @@ -72,6 +72,9 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { return errors.Wrapf(err, "failed to decode ammo at line: %v; data: %q", line, data) } } + if !confutil.IsChosenCase(a.Tag, p.Config.ChosenCases) { + continue + } ammoNum++ select { case p.Sink <- a: diff --git a/components/grpc/ammo/provider.go b/components/grpc/ammo/provider.go index e390b1947..120a1258c 100644 --- a/components/grpc/ammo/provider.go +++ b/components/grpc/ammo/provider.go @@ -11,9 +11,8 @@ import ( "github.com/pkg/errors" "github.com/spf13/afero" - "go.uber.org/atomic" - "github.com/yandex/pandora/core" + "go.uber.org/atomic" ) func NewProvider(fs afero.Fs, fileName string, start func(ctx context.Context, file afero.File) error) Provider { diff --git a/components/grpc/core.go b/components/grpc/core.go index 5fa5a9340..ab7bf0beb 100644 --- a/components/grpc/core.go +++ b/components/grpc/core.go @@ -8,22 +8,20 @@ import ( "log" "time" + "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/dynamic" + "github.com/jhump/protoreflect/dynamic/grpcdynamic" "github.com/jhump/protoreflect/grpcreflect" - "github.com/yandex/pandora/core/warmup" - "google.golang.org/grpc/credentials" - reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" - "github.com/yandex/pandora/components/grpc/ammo" "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator/netsample" - - "github.com/jhump/protoreflect/desc" - "github.com/jhump/protoreflect/dynamic" - "github.com/jhump/protoreflect/dynamic/grpcdynamic" + "github.com/yandex/pandora/core/warmup" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" + reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" "google.golang.org/grpc/status" ) @@ -33,7 +31,8 @@ type Sample struct { } type grpcDialOptions struct { - Authority string `config:"authority"` + Authority string `config:"authority"` + Timeout time.Duration `config:"timeout"` } type GunConfig struct { @@ -186,7 +185,11 @@ func makeGRPCConnect(target string, isTLS bool, dialOptions grpcDialOptions) (co } else { opts = append(opts, grpc.WithInsecure()) } - opts = append(opts, grpc.WithTimeout(time.Second)) + timeout := time.Second + if dialOptions.Timeout != 0 { + timeout = dialOptions.Timeout + } + opts = append(opts, grpc.WithTimeout(timeout)) opts = append(opts, grpc.WithUserAgent("load test, pandora universal grpc shooter")) if dialOptions.Authority != "" { diff --git a/components/phttp/ammo/simple/ammo.go b/components/phttp/ammo/simple/ammo.go index 6db888e39..5fdc5e756 100644 --- a/components/phttp/ammo/simple/ammo.go +++ b/components/phttp/ammo/simple/ammo.go @@ -51,4 +51,8 @@ func (a *Ammo) IsValid() bool { return !a.isInvalid } +func (a *Ammo) Tag() string { + return a.tag +} + var _ phttp.Ammo = (*Ammo)(nil) diff --git a/components/phttp/ammo/simple/jsonline/jsonline_suite_test.go b/components/phttp/ammo/simple/jsonline/jsonline_suite_test.go index d149cd1b7..bcdc60ec4 100644 --- a/components/phttp/ammo/simple/jsonline/jsonline_suite_test.go +++ b/components/phttp/ammo/simple/jsonline/jsonline_suite_test.go @@ -17,7 +17,6 @@ import ( . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" "github.com/spf13/afero" - "github.com/yandex/pandora/components/phttp/ammo/simple" "github.com/yandex/pandora/core" "github.com/yandex/pandora/lib/ginkgoutil" diff --git a/components/phttp/ammo/simple/jsonline/provider.go b/components/phttp/ammo/simple/jsonline/provider.go index 0a999a8b5..25c5f540f 100644 --- a/components/phttp/ammo/simple/jsonline/provider.go +++ b/components/phttp/ammo/simple/jsonline/provider.go @@ -12,10 +12,10 @@ import ( "strings" "github.com/pkg/errors" - "go.uber.org/zap" - "github.com/spf13/afero" "github.com/yandex/pandora/components/phttp/ammo/simple" + "github.com/yandex/pandora/lib/confutil" + "go.uber.org/zap" ) func NewProvider(fs afero.Fs, conf Config) *Provider { @@ -42,10 +42,12 @@ type Config struct { ContinueOnError bool //Maximum number of byte in an ammo. Default is bufio.MaxScanTokenSize MaxAmmoSize int + ChosenCases []string } func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { var ammoNum, passNum int + for { passNum++ scanner := bufio.NewScanner(ammoFile) @@ -63,6 +65,9 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { return errors.Wrapf(err, "failed to decode ammo at line: %v; data: %q", line, data) } } + if !confutil.IsChosenCase(a.Tag(), p.Config.ChosenCases) { + continue + } ammoNum++ select { case p.Sink <- a: diff --git a/components/phttp/ammo/simple/provider.go b/components/phttp/ammo/simple/provider.go index 117fafa4f..70ebda16f 100644 --- a/components/phttp/ammo/simple/provider.go +++ b/components/phttp/ammo/simple/provider.go @@ -11,9 +11,8 @@ import ( "github.com/pkg/errors" "github.com/spf13/afero" - "go.uber.org/atomic" - "github.com/yandex/pandora/core" + "go.uber.org/atomic" ) func NewProvider(fs afero.Fs, fileName string, start func(ctx context.Context, file afero.File) error) Provider { diff --git a/components/phttp/ammo/simple/raw/decoder.go b/components/phttp/ammo/simple/raw/decoder.go index b6b7e63e5..e34481d68 100644 --- a/components/phttp/ammo/simple/raw/decoder.go +++ b/components/phttp/ammo/simple/raw/decoder.go @@ -6,8 +6,6 @@ import ( "net/http" "strconv" "strings" - - "github.com/pkg/errors" ) type Header struct { @@ -29,30 +27,6 @@ func decodeRequest(reqString []byte) (req *http.Request, err error) { if err != nil { return } - if req.Host != "" { - req.URL.Host = req.Host - } req.RequestURI = "" return } - -func decodeHTTPConfigHeaders(headers []string) (configHTTPHeaders []Header, err error) { - for _, header := range headers { - line := []byte(header) - if len(line) < 3 || line[0] != '[' || line[len(line)-1] != ']' { - return nil, errors.New("header line should be like '[key: value]") - } - line = line[1 : len(line)-1] - colonIdx := bytes.IndexByte(line, ':') - if colonIdx < 0 { - return nil, errors.New("missing colon") - } - configHTTPHeaders = append( - configHTTPHeaders, - Header{ - string(bytes.TrimSpace(line[:colonIdx])), - string(bytes.TrimSpace(line[colonIdx+1:])), - }) - } - return -} diff --git a/components/phttp/ammo/simple/raw/decoder_bench_test.go b/components/phttp/ammo/simple/raw/decoder_bench_test.go index 6b6745572..adf049f4d 100644 --- a/components/phttp/ammo/simple/raw/decoder_bench_test.go +++ b/components/phttp/ammo/simple/raw/decoder_bench_test.go @@ -1,6 +1,10 @@ package raw -import "testing" +import ( + "testing" + + "github.com/yandex/pandora/components/phttp/ammo/simple" +) var ( benchTestConfigHeaders = []string{ @@ -27,16 +31,10 @@ func BenchmarkRawDecoder(b *testing.B) { func BenchmarkRawDecoderWithHeaders(b *testing.B) { b.StopTimer() - decodedHTTPConfigHeaders, _ := decodeHTTPConfigHeaders(benchTestConfigHeaders) + decodedHTTPConfigHeaders, _ := simple.DecodeHTTPConfigHeaders(benchTestConfigHeaders) b.StartTimer() for i := 0; i < b.N; i++ { req, _ := decodeRequest([]byte(benchTestRequest)) - for _, header := range decodedHTTPConfigHeaders { - if header.key == "Host" { - req.URL.Host = header.value - } else { - req.Header.Set(header.key, header.value) - } - } + simple.UpdateRequestWithHeaders(req, decodedHTTPConfigHeaders) } } diff --git a/components/phttp/ammo/simple/raw/decoder_test.go b/components/phttp/ammo/simple/raw/decoder_test.go index 5f8fb7673..696522f20 100644 --- a/components/phttp/ammo/simple/raw/decoder_test.go +++ b/components/phttp/ammo/simple/raw/decoder_test.go @@ -83,34 +83,5 @@ var _ = Describe("Decoder", func() { req, err := decodeRequest([]byte(raw)) Expect(err).To(BeNil()) Expect(req.Host).To(Equal("hostname.tld")) - Expect(req.URL.Host).To(Equal("hostname.tld")) - }) - It("should replace header Host from config", func() { - const host = "hostname.tld" - const newhost = "newhostname.tld" - - raw := "GET / HTTP/1.1\r\n" + - "Host: " + host + "\r\n" + - "Content-Length: 0\r\n" + - "\r\n" - configHeaders := []string{ - "[Host: " + newhost + "]", - "[SomeTestKey: sometestvalue]", - } - req, err := decodeRequest([]byte(raw)) - Expect(err).To(BeNil()) - Expect(req.Host).To(Equal(host)) - Expect(req.URL.Host).To(Equal(host)) - decodedConfigHeaders, _ := decodeHTTPConfigHeaders(configHeaders) - for _, header := range decodedConfigHeaders { - // special behavior for `Host` header - if header.key == "Host" { - req.URL.Host = header.value - } else { - req.Header.Set(header.key, header.value) - } - } - Expect(req.URL.Host).To(Equal(newhost)) - Expect(req.Header.Get("SomeTestKey")).To(Equal("sometestvalue")) }) }) diff --git a/components/phttp/ammo/simple/raw/provider.go b/components/phttp/ammo/simple/raw/provider.go index 428f42a95..25100bb4e 100644 --- a/components/phttp/ammo/simple/raw/provider.go +++ b/components/phttp/ammo/simple/raw/provider.go @@ -7,9 +7,9 @@ import ( "github.com/pkg/errors" "github.com/spf13/afero" - "go.uber.org/zap" - "github.com/yandex/pandora/components/phttp/ammo/simple" + "github.com/yandex/pandora/lib/confutil" + "go.uber.org/zap" ) /* @@ -54,7 +54,8 @@ type Config struct { // Redefine HTTP headers Headers []string // Passes limits ammo file passes. Unlimited if zero. - Passes int `validate:"min=0"` + Passes int `validate:"min=0"` + ChosenCases []string } func NewProvider(fs afero.Fs, conf Config) *Provider { @@ -76,7 +77,7 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { var passNum int var ammoNum int // parse and prepare Headers from config - decodedConfigHeaders, err := decodeHTTPConfigHeaders(p.Config.Headers) + decodedConfigHeaders, err := simple.DecodeHTTPConfigHeaders(p.Config.Headers) if err != nil { return err } @@ -101,6 +102,9 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { if reqSize == 0 { break // start over from the beginning of file if ammo size is 0 } + if !confutil.IsChosenCase(tag, p.Config.ChosenCases) { + continue + } buff := make([]byte, reqSize) if n, err := io.ReadFull(reader, buff); err != nil { return errors.Wrapf(err, "failed to read ammo at position: %v; tried to read: %v; have read: %v", filePosition(ammoFile), reqSize, n) @@ -110,15 +114,8 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { return errors.Wrapf(err, "failed to decode ammo at position: %v; data: %q", filePosition(ammoFile), buff) } - // redefine request Headers from config - for _, header := range decodedConfigHeaders { - // special behavior for `Host` header - if header.key == "Host" { - req.URL.Host = header.value - } else { - req.Header.Set(header.key, header.value) - } - } + // add new Headers to request from config + simple.UpdateRequestWithHeaders(req, decodedConfigHeaders) sh := p.Pool.Get().(*simple.Ammo) sh.Reset(req, tag) diff --git a/components/phttp/ammo/simple/raw/provider_test.go b/components/phttp/ammo/simple/raw/provider_test.go index 547068f39..d73931d38 100644 --- a/components/phttp/ammo/simple/raw/provider_test.go +++ b/components/phttp/ammo/simple/raw/provider_test.go @@ -13,7 +13,6 @@ import ( . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" "github.com/spf13/afero" - "github.com/yandex/pandora/components/phttp/ammo/simple" "github.com/yandex/pandora/core" ) diff --git a/components/phttp/ammo/simple/request.go b/components/phttp/ammo/simple/request.go new file mode 100644 index 000000000..08e2f1f90 --- /dev/null +++ b/components/phttp/ammo/simple/request.go @@ -0,0 +1,56 @@ +// Copyright (c) 2017 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package simple + +import ( + "bytes" + "net/http" + + "github.com/pkg/errors" +) + +type Header struct { + key string + value string +} + +func DecodeHTTPConfigHeaders(headers []string) (configHTTPHeaders []Header, err error) { + for _, header := range headers { + line := []byte(header) + if len(line) < 3 || line[0] != '[' || line[len(line)-1] != ']' { + return nil, errors.New("header line should be like '[key: value]") + } + line = line[1 : len(line)-1] + colonIdx := bytes.IndexByte(line, ':') + if colonIdx < 0 { + return nil, errors.New("missing colon") + } + configHTTPHeaders = append( + configHTTPHeaders, + Header{ + string(bytes.TrimSpace(line[:colonIdx])), + string(bytes.TrimSpace(line[colonIdx+1:])), + }) + } + return +} + +func UpdateRequestWithHeaders(req *http.Request, headers []Header) { + origHeaders := req.Header.Clone() + for _, header := range headers { + if origHeaders.Get(header.key) != "" { + continue + } + // special behavior for `Host` header + if header.key == "Host" { + if req.Host == "" { + req.Host = header.value + } + } else { + req.Header.Add(header.key, header.value) + } + } +} diff --git a/components/phttp/ammo/simple/uri/decoder.go b/components/phttp/ammo/simple/uri/decoder.go index 4a66b2a2e..30aa6c51e 100644 --- a/components/phttp/ammo/simple/uri/decoder.go +++ b/components/phttp/ammo/simple/uri/decoder.go @@ -13,8 +13,8 @@ import ( "sync" "github.com/pkg/errors" - "github.com/yandex/pandora/components/phttp/ammo/simple" + "github.com/yandex/pandora/lib/confutil" ) type decoder struct { @@ -24,20 +24,17 @@ type decoder struct { ammoNum int header http.Header - configHeaders []ConfigHeader -} - -type ConfigHeader struct { - key string - value string + configHeaders []simple.Header + chosenCases []string } -func newDecoder(ctx context.Context, sink chan<- *simple.Ammo, pool *sync.Pool) *decoder { +func newDecoder(ctx context.Context, sink chan<- *simple.Ammo, pool *sync.Pool, chosenCases []string) *decoder { return &decoder{ - sink: sink, - header: http.Header{}, - pool: pool, - ctx: ctx, + sink: sink, + header: http.Header{}, + pool: pool, + ctx: ctx, + chosenCases: chosenCases, } } @@ -62,6 +59,9 @@ func (d *decoder) decodeURI(line []byte) error { if len(parts) > 1 { tag = parts[1] } + if !confutil.IsChosenCase(tag, d.chosenCases) { + return nil + } req, err := http.NewRequest("GET", string(url), nil) if err != nil { return errors.Wrap(err, "uri decode") @@ -70,20 +70,13 @@ func (d *decoder) decodeURI(line []byte) error { // http.Request.Write sends Host header based on req.URL.Host if k == "Host" { req.Host = v[0] - req.URL.Host = v[0] } else { req.Header[k] = v } } - // redefine request Headers from config - for _, configHeader := range d.configHeaders { - if configHeader.key == "Host" { - req.Host = configHeader.value - req.URL.Host = configHeader.value - } else { - req.Header.Set(configHeader.key, configHeader.value) - } - } + + // add new Headers to request from config + simple.UpdateRequestWithHeaders(req, d.configHeaders) sh := d.pool.Get().(*simple.Ammo) sh.Reset(req, tag) @@ -119,23 +112,3 @@ func (d *decoder) ResetHeader() { delete(d.header, k) } } - -func decodeHTTPConfigHeaders(headers []string) (configHTTPHeaders []ConfigHeader, err error) { - for _, header := range headers { - line := []byte(header) - if len(line) < 3 || line[0] != '[' || line[len(line)-1] != ']' { - return nil, errors.New("header line should be like '[key: value]") - } - line = line[1 : len(line)-1] - colonIdx := bytes.IndexByte(line, ':') - if colonIdx < 0 { - return nil, errors.New("missing colon") - } - preparedHeader := ConfigHeader{ - string(bytes.TrimSpace(line[:colonIdx])), - string(bytes.TrimSpace(line[colonIdx+1:])), - } - configHTTPHeaders = append(configHTTPHeaders, preparedHeader) - } - return -} diff --git a/components/phttp/ammo/simple/uri/decoder_test.go b/components/phttp/ammo/simple/uri/decoder_test.go index 4a35ebcdd..c3b731218 100644 --- a/components/phttp/ammo/simple/uri/decoder_test.go +++ b/components/phttp/ammo/simple/uri/decoder_test.go @@ -9,7 +9,6 @@ import ( . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" - "github.com/yandex/pandora/components/phttp/ammo/simple" "github.com/yandex/pandora/core" ) @@ -22,7 +21,7 @@ func newAmmoPool() *sync.Pool { var _ = Describe("Decoder", func() { It("uri decode ctx cancel", func() { ctx, cancel := context.WithCancel(context.Background()) - decoder := newDecoder(ctx, make(chan *simple.Ammo), newAmmoPool()) + decoder := newDecoder(ctx, make(chan *simple.Ammo), newAmmoPool(), nil) cancel() err := decoder.Decode([]byte("/some/path")) Expect(err).To(Equal(context.Canceled)) @@ -33,7 +32,7 @@ var _ = Describe("Decoder", func() { ) BeforeEach(func() { ammoCh = make(chan *simple.Ammo, 10) - decoder = newDecoder(context.Background(), ammoCh, newAmmoPool()) + decoder = newDecoder(context.Background(), ammoCh, newAmmoPool(), nil) }) DescribeTable("invalid input", func(line string) { @@ -69,7 +68,6 @@ var _ = Describe("Decoder", func() { req, sample := sh.Request() Expect(*req.URL).To(MatchFields(IgnoreExtras, Fields{ "Path": Equal(line), - "Host": Equal(host), "Scheme": BeEmpty(), })) Expect(req.Host).To(Equal(host)) @@ -95,7 +93,6 @@ var _ = Describe("Decoder", func() { req, sample := sh.Request() Expect(*req.URL).To(MatchFields(IgnoreExtras, Fields{ "Path": Equal("/some/path"), - "Host": Equal(host), "Scheme": BeEmpty(), })) Expect(req.Host).To(Equal(host)) @@ -142,18 +139,6 @@ var _ = Describe("Decoder", func() { "C": []string{""}, })) }) - It("overwrite by config", func() { - decodedConfigHeaders, _ := decodeHTTPConfigHeaders([]string{ - "[Host: youhost.tld]", - "[SomeHeader: somevalue]", - }) - decoder.configHeaders = decodedConfigHeaders - cfgHeaders := []ConfigHeader{ - {"Host", "youhost.tld"}, - {"SomeHeader", "somevalue"}, - } - Expect(decoder.configHeaders).To(Equal(cfgHeaders)) - }) }) It("Reset", func() { decoder.header.Set("a", "b") diff --git a/components/phttp/ammo/simple/uri/provider.go b/components/phttp/ammo/simple/uri/provider.go index 219e40f5d..a763d5c9f 100644 --- a/components/phttp/ammo/simple/uri/provider.go +++ b/components/phttp/ammo/simple/uri/provider.go @@ -12,20 +12,20 @@ import ( "github.com/pkg/errors" "github.com/spf13/afero" - "go.uber.org/zap" - "github.com/yandex/pandora/components/phttp/ammo/simple" + "go.uber.org/zap" ) type Config struct { File string // Limit limits total num of ammo. Unlimited if zero. Limit int `validate:"min=0"` - // Redefine HTTP headers + // Additional HTTP headers Headers []string // Passes limits ammo file passes. Unlimited if zero. - Passes int `validate:"min=0"` - Uris []string + Passes int `validate:"min=0"` + Uris []string + ChosenCases []string } // TODO: pass logger and metricsRegistry @@ -74,9 +74,9 @@ type Provider struct { } func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { - p.decoder = newDecoder(ctx, p.Sink, &p.Pool) + p.decoder = newDecoder(ctx, p.Sink, &p.Pool, p.Config.ChosenCases) // parse and prepare Headers from config - decodedConfigHeaders, err := decodeHTTPConfigHeaders(p.Config.Headers) + decodedConfigHeaders, err := simple.DecodeHTTPConfigHeaders(p.Config.Headers) if err != nil { return err } diff --git a/components/phttp/ammo/simple/uri/provider_test.go b/components/phttp/ammo/simple/uri/provider_test.go index a0d05411b..ec2245ec3 100644 --- a/components/phttp/ammo/simple/uri/provider_test.go +++ b/components/phttp/ammo/simple/uri/provider_test.go @@ -9,9 +9,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" - "github.com/spf13/afero" - "github.com/pkg/errors" + "github.com/spf13/afero" "github.com/yandex/pandora/components/phttp/ammo/simple" "github.com/yandex/pandora/core" ) @@ -147,7 +146,6 @@ var _ = Describe("provider decode", func() { "Host": Equal(expectedData.host), "URL": PointTo(MatchFields(IgnoreExtras, Fields{ "Scheme": BeEmpty(), - "Host": Equal(expectedData.host), "Path": Equal(expectedData.path), })), "Header": Equal(expectedData.header), diff --git a/components/phttp/ammo/simple/uripost/decoder.go b/components/phttp/ammo/simple/uripost/decoder.go index 20f26f2cb..504eb0a21 100644 --- a/components/phttp/ammo/simple/uripost/decoder.go +++ b/components/phttp/ammo/simple/uripost/decoder.go @@ -8,11 +8,6 @@ import ( "github.com/pkg/errors" ) -type Header struct { - key string - value string -} - func decodeHeader(line []byte) (key string, val string, err error) { if len(line) < 3 || line[0] != '[' || line[len(line)-1] != ']' { return key, val, errors.New("header line should be like '[key: value]'") @@ -49,24 +44,3 @@ func decodeURI(uriString []byte) (bodySize int, uri string, tag string, err erro return } - -func decodeHTTPConfigHeaders(headers []string) (configHTTPHeaders []Header, err error) { - for _, header := range headers { - line := []byte(header) - if len(line) < 3 || line[0] != '[' || line[len(line)-1] != ']' { - return nil, errors.New("header line should be like '[key: value]") - } - line = line[1 : len(line)-1] - colonIdx := bytes.IndexByte(line, ':') - if colonIdx < 0 { - return nil, errors.New("missing colon") - } - configHTTPHeaders = append( - configHTTPHeaders, - Header{ - string(bytes.TrimSpace(line[:colonIdx])), - string(bytes.TrimSpace(line[colonIdx+1:])), - }) - } - return -} diff --git a/components/phttp/ammo/simple/uripost/provider.go b/components/phttp/ammo/simple/uripost/provider.go index 6be768a1a..28c5b8440 100644 --- a/components/phttp/ammo/simple/uripost/provider.go +++ b/components/phttp/ammo/simple/uripost/provider.go @@ -10,9 +10,9 @@ import ( "github.com/pkg/errors" "github.com/spf13/afero" - "go.uber.org/zap" - "github.com/yandex/pandora/components/phttp/ammo/simple" + "github.com/yandex/pandora/lib/confutil" + "go.uber.org/zap" ) func filePosition(file afero.File) (position int64) { @@ -24,10 +24,11 @@ type Config struct { File string `validate:"required"` // Limit limits total num of ammo. Unlimited if zero. Limit int `validate:"min=0"` - // Redefine HTTP headers + // Additional HTTP headers Headers []string // Passes limits ammo file passes. Unlimited if zero. - Passes int `validate:"min=0"` + Passes int `validate:"min=0"` + ChosenCases []string } func NewProvider(fs afero.Fs, conf Config) *Provider { @@ -56,7 +57,7 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { header := make(http.Header) // parse and prepare Headers from config - decodedConfigHeaders, err := decodeHTTPConfigHeaders(p.Config.Headers) + decodedConfigHeaders, err := simple.DecodeHTTPConfigHeaders(p.Config.Headers) if err != nil { return err } @@ -91,6 +92,9 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { if bodySize == 0 { break // start over from the beginning of file if ammo size is 0 } + if !confutil.IsChosenCase(tag, p.Config.ChosenCases) { + continue + } buff := make([]byte, bodySize) if n, err := io.ReadFull(reader, buff); err != nil { return errors.Wrapf(err, "failed to read ammo at position: %v; tried to read: %v; have read: %v", filePosition(ammoFile), bodySize, n) @@ -104,21 +108,13 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error { // http.Request.Write sends Host header based on req.URL.Host if k == "Host" { req.Host = v[0] - req.URL.Host = v[0] } else { req.Header[k] = v } } - // redefine request Headers from config - for _, header := range decodedConfigHeaders { - // special behavior for `Host` header - if header.key == "Host" { - req.URL.Host = header.value - } else { - req.Header.Set(header.key, header.value) - } - } + // add new Headers to request from config + simple.UpdateRequestWithHeaders(req, decodedConfigHeaders) sh := p.Pool.Get().(*simple.Ammo) sh.Reset(req, tag) diff --git a/components/phttp/base.go b/components/phttp/base.go index 023443fb9..985985762 100644 --- a/components/phttp/base.go +++ b/components/phttp/base.go @@ -16,10 +16,9 @@ import ( "net/url" "github.com/pkg/errors" - "go.uber.org/zap" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator/netsample" + "go.uber.org/zap" ) const ( diff --git a/components/phttp/base_test.go b/components/phttp/base_test.go index d8aa9eb69..8834f9c5b 100644 --- a/components/phttp/base_test.go +++ b/components/phttp/base_test.go @@ -18,13 +18,12 @@ import ( . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" - "go.uber.org/zap" - ammomock "github.com/yandex/pandora/components/phttp/mocks" "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator/netsample" "github.com/yandex/pandora/core/coretest" "github.com/yandex/pandora/lib/ginkgoutil" + "go.uber.org/zap" ) func testDeps() core.GunDeps { diff --git a/components/phttp/client.go b/components/phttp/client.go index d8246d1e1..10502a8ee 100644 --- a/components/phttp/client.go +++ b/components/phttp/client.go @@ -13,11 +13,10 @@ import ( "time" "github.com/pkg/errors" - "go.uber.org/zap" - "golang.org/x/net/http2" - "github.com/yandex/pandora/core/config" "github.com/yandex/pandora/lib/netutil" + "go.uber.org/zap" + "golang.org/x/net/http2" ) //go:generate mockery -name=Client -case=underscore -inpkg -testonly @@ -172,3 +171,11 @@ func checkHTTP2(state *tls.ConnectionState) error { } return nil } + +func getHostWithoutPort(target string) string { + host, _, err := net.SplitHostPort(target) + if err != nil { + host = target + } + return host +} diff --git a/components/phttp/connect_test.go b/components/phttp/connect_test.go index 0a0ff3c13..cf8abc521 100644 --- a/components/phttp/connect_test.go +++ b/components/phttp/connect_test.go @@ -15,9 +15,8 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "go.uber.org/zap" - "github.com/yandex/pandora/core/aggregator/netsample" + "go.uber.org/zap" ) var _ = Describe("connect", func() { diff --git a/components/phttp/http.go b/components/phttp/http.go index 0e8caf93c..df1451345 100644 --- a/components/phttp/http.go +++ b/components/phttp/http.go @@ -28,14 +28,14 @@ type HTTP2GunConfig struct { Client ClientConfig `config:",squash"` } -func NewHTTPGun(conf HTTPGunConfig, answLog *zap.Logger) *HTTPGun { +func NewHTTPGun(conf HTTPGunConfig, answLog *zap.Logger, targetResolved string) *HTTPGun { transport := NewTransport(conf.Client.Transport, NewDialer(conf.Client.Dialer).DialContext) client := newClient(transport, conf.Client.Redirect) - return NewClientGun(client, conf.Gun, answLog) + return NewClientGun(client, conf.Gun, answLog, targetResolved) } // NewHTTP2Gun return simple HTTP/2 gun that can shoot sequentially through one connection. -func NewHTTP2Gun(conf HTTP2GunConfig, answLog *zap.Logger) (*HTTPGun, error) { +func NewHTTP2Gun(conf HTTP2GunConfig, answLog *zap.Logger, targetResolved string) (*HTTPGun, error) { if !conf.Gun.SSL { // Open issue on github if you really need this feature. return nil, errors.New("HTTP/2.0 over TCP is not supported. Please leave SSL option true by default.") @@ -44,10 +44,10 @@ func NewHTTP2Gun(conf HTTP2GunConfig, answLog *zap.Logger) (*HTTPGun, error) { client := newClient(transport, conf.Client.Redirect) // Will panic and cancel shooting whet target doesn't support HTTP/2. client = &panicOnHTTP1Client{client} - return NewClientGun(client, conf.Gun, answLog), nil + return NewClientGun(client, conf.Gun, answLog, targetResolved), nil } -func NewClientGun(client Client, conf ClientGunConfig, answLog *zap.Logger) *HTTPGun { +func NewClientGun(client Client, conf ClientGunConfig, answLog *zap.Logger, targetResolved string) *HTTPGun { scheme := "http" if conf.SSL { scheme = "https" @@ -63,25 +63,30 @@ func NewClientGun(client Client, conf ClientGunConfig, answLog *zap.Logger) *HTT }, AnswLog: answLog, }, - scheme: scheme, - target: conf.Target, - client: client, + scheme: scheme, + hostname: getHostWithoutPort(conf.Target), + targetResolved: targetResolved, + client: client, } return &g } type HTTPGun struct { BaseGun - scheme string - target string - client Client + scheme string + hostname string + targetResolved string + client Client } var _ Gun = (*HTTPGun)(nil) func (g *HTTPGun) Do(req *http.Request) (*http.Response, error) { - req.Host = req.URL.Host - req.URL.Host = g.target + if req.Host == "" { + req.Host = g.hostname + } + + req.URL.Host = g.targetResolved req.URL.Scheme = g.scheme return g.client.Do(req) } diff --git a/components/phttp/http_test.go b/components/phttp/http_test.go index 7a378c979..be5cb5c7d 100644 --- a/components/phttp/http_test.go +++ b/components/phttp/http_test.go @@ -14,13 +14,12 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" . "github.com/onsi/gomega/gstruct" - "go.uber.org/atomic" - "go.uber.org/zap" - "golang.org/x/net/http2" - ammomock "github.com/yandex/pandora/components/phttp/mocks" "github.com/yandex/pandora/core/aggregator/netsample" "github.com/yandex/pandora/core/config" + "go.uber.org/atomic" + "go.uber.org/zap" + "golang.org/x/net/http2" ) var _ = Describe("BaseGun", func() { @@ -47,9 +46,10 @@ var _ = Describe("BaseGun", func() { defer server.Close() log := zap.NewNop() conf := DefaultHTTPGunConfig() - conf.Gun.Target = strings.TrimPrefix(server.URL, "http://") + conf.Gun.Target = host + ":80" + targetResolved := strings.TrimPrefix(server.URL, "http://") results := &netsample.TestAggregator{} - httpGun := NewHTTPGun(conf, log) + httpGun := NewHTTPGun(conf, log, targetResolved) _ = httpGun.Bind(results, testDeps()) am := newAmmoReq(expectedReq) @@ -59,9 +59,9 @@ var _ = Describe("BaseGun", func() { Expect(*actualReq).To(MatchFields(IgnoreExtras, Fields{ "Method": Equal("GET"), "Proto": Equal("HTTP/1.1"), - "Host": Equal(host), // Not server host, but host from ammo. + "Host": Equal(host), // Server host "URL": PointTo(MatchFields(IgnoreExtras, Fields{ - "Host": BeEmpty(), // Server request. + "Host": BeEmpty(), // Set in Do(). "Path": Equal(path), })), })) @@ -99,7 +99,7 @@ var _ = Describe("HTTP", func() { conf := DefaultHTTPGunConfig() conf.Gun.Target = server.Listener.Addr().String() conf.Gun.SSL = https - gun := NewHTTPGun(conf, log) + gun := NewHTTPGun(conf, log, conf.Gun.Target) var aggr netsample.TestAggregator _ = gun.Bind(&aggr, testDeps()) gun.Shoot(newAmmoURL("/")) @@ -125,7 +125,7 @@ var _ = Describe("HTTP", func() { conf := DefaultHTTPGunConfig() conf.Gun.Target = server.Listener.Addr().String() conf.Client.Redirect = redirect - gun := NewHTTPGun(conf, log) + gun := NewHTTPGun(conf, log, conf.Gun.Target) var aggr netsample.TestAggregator _ = gun.Bind(&aggr, testDeps()) gun.Shoot(newAmmoURL("/redirect")) @@ -163,7 +163,7 @@ var _ = Describe("HTTP", func() { conf := DefaultHTTPGunConfig() conf.Gun.Target = server.Listener.Addr().String() conf.Gun.SSL = true - gun := NewHTTPGun(conf, log) + gun := NewHTTPGun(conf, log, conf.Gun.Target) var results netsample.TestAggregator _ = gun.Bind(&results, testDeps()) gun.Shoot(newAmmoURL("/")) @@ -187,7 +187,7 @@ var _ = Describe("HTTP/2", func() { log := zap.NewNop() conf := DefaultHTTP2GunConfig() conf.Gun.Target = server.Listener.Addr().String() - gun, _ := NewHTTP2Gun(conf, log) + gun, _ := NewHTTP2Gun(conf, log, conf.Gun.Target) var results netsample.TestAggregator _ = gun.Bind(&results, testDeps()) gun.Shoot(newAmmoURL("/")) @@ -202,7 +202,7 @@ var _ = Describe("HTTP/2", func() { log := zap.NewNop() conf := DefaultHTTP2GunConfig() conf.Gun.Target = server.Listener.Addr().String() - gun, _ := NewHTTP2Gun(conf, log) + gun, _ := NewHTTP2Gun(conf, log, conf.Gun.Target) var results netsample.TestAggregator _ = gun.Bind(&results, testDeps()) var r interface{} @@ -225,7 +225,7 @@ var _ = Describe("HTTP/2", func() { conf := DefaultHTTP2GunConfig() conf.Gun.Target = server.Listener.Addr().String() conf.Gun.SSL = false - _, err := NewHTTP2Gun(conf, log) + _, err := NewHTTP2Gun(conf, log, conf.Gun.Target) Expect(err).To(HaveOccurred()) }) diff --git a/components/phttp/import/import.go b/components/phttp/import/import.go index 9d366b612..3441b80f1 100644 --- a/components/phttp/import/import.go +++ b/components/phttp/import/import.go @@ -9,8 +9,6 @@ import ( "net" "github.com/spf13/afero" - "go.uber.org/zap" - "github.com/yandex/pandora/components/phttp" "github.com/yandex/pandora/components/phttp/ammo/simple/jsonline" "github.com/yandex/pandora/components/phttp/ammo/simple/raw" @@ -20,6 +18,7 @@ import ( "github.com/yandex/pandora/core/register" "github.com/yandex/pandora/lib/answlog" "github.com/yandex/pandora/lib/netutil" + "go.uber.org/zap" ) func Import(fs afero.Fs) { @@ -41,22 +40,22 @@ func Import(fs afero.Fs) { }) register.Gun("http", func(conf phttp.HTTPGunConfig) func() core.Gun { - _ = preResolveTargetAddr(&conf.Client, &conf.Gun.Target) + targetResolved, _ := preResolveTargetAddr(&conf.Client, conf.Gun.Target) answLog := answlog.Init(conf.Gun.Base.AnswLog.Path) - return func() core.Gun { return phttp.WrapGun(phttp.NewHTTPGun(conf, answLog)) } + return func() core.Gun { return phttp.WrapGun(phttp.NewHTTPGun(conf, answLog, targetResolved)) } }, phttp.DefaultHTTPGunConfig) register.Gun("http2", func(conf phttp.HTTP2GunConfig) func() (core.Gun, error) { - _ = preResolveTargetAddr(&conf.Client, &conf.Gun.Target) + targetResolved, _ := preResolveTargetAddr(&conf.Client, conf.Gun.Target) answLog := answlog.Init(conf.Gun.Base.AnswLog.Path) return func() (core.Gun, error) { - gun, err := phttp.NewHTTP2Gun(conf, answLog) + gun, err := phttp.NewHTTP2Gun(conf, answLog, targetResolved) return phttp.WrapGun(gun), err } }, phttp.DefaultHTTP2GunConfig) register.Gun("connect", func(conf phttp.ConnectGunConfig) func() core.Gun { - _ = preResolveTargetAddr(&conf.Client, &conf.Target) + conf.Target, _ = preResolveTargetAddr(&conf.Client, conf.Target) answLog := answlog.Init(conf.BaseGunConfig.AnswLog.Path) return func() core.Gun { return phttp.WrapGun(phttp.NewConnectGun(conf, answLog)) @@ -70,22 +69,24 @@ func Import(fs afero.Fs) { // If we can resolve accessible target addr - use it as target, not use caching. // Otherwise just use DNS cache - we should not fail shooting, we should try to // connect on every shoot. DNS cache will save resolved addr after first successful connect. -func preResolveTargetAddr(clientConf *phttp.ClientConfig, target *string) (err error) { +func preResolveTargetAddr(clientConf *phttp.ClientConfig, target string) (targetAddr string, err error) { + targetAddr = target + if !clientConf.Dialer.DNSCache { return } - if endpointIsResolved(*target) { + if endpointIsResolved(target) { clientConf.Dialer.DNSCache = false return } - resolved, err := netutil.LookupReachable(*target) + resolved, err := netutil.LookupReachable(target, clientConf.Dialer.Timeout) if err != nil { zap.L().Warn("DNS target pre resolve failed", - zap.String("target", *target), zap.Error(err)) + zap.String("target", target), zap.Error(err)) return } clientConf.Dialer.DNSCache = false - *target = resolved + targetAddr = resolved return } diff --git a/components/phttp/import/import_suite_test.go b/components/phttp/import/import_suite_test.go index 6b4f5bc49..fd427c25a 100644 --- a/components/phttp/import/import_suite_test.go +++ b/components/phttp/import/import_suite_test.go @@ -8,7 +8,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/spf13/afero" - . "github.com/yandex/pandora/components/phttp" "github.com/yandex/pandora/lib/ginkgoutil" ) @@ -40,7 +39,7 @@ var _ = Describe("preResolveTargetAddr", func() { target := "localhost:" + port expectedResolved := "127.0.0.1:" + port - err = preResolveTargetAddr(conf, &target) + target, err = preResolveTargetAddr(conf, target) Expect(err).NotTo(HaveOccurred()) Expect(conf.Dialer.DNSCache).To(BeFalse()) @@ -53,7 +52,7 @@ var _ = Describe("preResolveTargetAddr", func() { const addr = "127.0.0.1:80" target := addr - err := preResolveTargetAddr(conf, &target) + target, err := preResolveTargetAddr(conf, target) Expect(err).NotTo(HaveOccurred()) Expect(conf.Dialer.DNSCache).To(BeFalse()) Expect(target).To(Equal(addr)) @@ -65,7 +64,7 @@ var _ = Describe("preResolveTargetAddr", func() { const addr = "localhost:54321" target := addr - err := preResolveTargetAddr(conf, &target) + target, err := preResolveTargetAddr(conf, target) Expect(err).To(HaveOccurred()) Expect(conf.Dialer.DNSCache).To(BeTrue()) Expect(target).To(Equal(addr)) diff --git a/core/aggregator/encoder.go b/core/aggregator/encoder.go index 8faf680e8..a13ae9df4 100644 --- a/core/aggregator/encoder.go +++ b/core/aggregator/encoder.go @@ -11,7 +11,6 @@ import ( "time" "github.com/pkg/errors" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/coreutil" "github.com/yandex/pandora/lib/errutil" diff --git a/core/aggregator/encoder_test.go b/core/aggregator/encoder_test.go index ad8a3aadf..302f2df6a 100644 --- a/core/aggregator/encoder_test.go +++ b/core/aggregator/encoder_test.go @@ -17,13 +17,12 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "github.com/yandex/pandora/core" aggregatemock "github.com/yandex/pandora/core/aggregator/mocks" coremock "github.com/yandex/pandora/core/mocks" iomock "github.com/yandex/pandora/lib/ioutil2/mocks" "github.com/yandex/pandora/lib/testutil" + "go.uber.org/zap" ) type EncoderAggregatorTester struct { diff --git a/core/aggregator/jsonlines.go b/core/aggregator/jsonlines.go index 065a721cd..2d19b9067 100644 --- a/core/aggregator/jsonlines.go +++ b/core/aggregator/jsonlines.go @@ -10,11 +10,10 @@ import ( "io" jsoniter "github.com/json-iterator/go" - "github.com/yandex/pandora/lib/ioutil2" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/config" "github.com/yandex/pandora/core/coreutil" + "github.com/yandex/pandora/lib/ioutil2" ) type JSONLineAggregatorConfig struct { diff --git a/core/aggregator/jsonlines_test.go b/core/aggregator/jsonlines_test.go index 3c2549f5d..0ca60fbf9 100644 --- a/core/aggregator/jsonlines_test.go +++ b/core/aggregator/jsonlines_test.go @@ -12,10 +12,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/datasink" + "go.uber.org/zap" ) type jsonTestData struct { diff --git a/core/aggregator/log.go b/core/aggregator/log.go index cf40e5ca3..681dca3ee 100644 --- a/core/aggregator/log.go +++ b/core/aggregator/log.go @@ -8,9 +8,8 @@ package aggregator import ( "context" - "go.uber.org/zap" - "github.com/yandex/pandora/core" + "go.uber.org/zap" ) func NewLog() core.Aggregator { diff --git a/core/aggregator/mocks/sample_encode_closer.go b/core/aggregator/mocks/sample_encode_closer.go index 5e152cc15..0e409af97 100644 --- a/core/aggregator/mocks/sample_encode_closer.go +++ b/core/aggregator/mocks/sample_encode_closer.go @@ -3,7 +3,6 @@ package aggregatemock import ( mock "github.com/stretchr/testify/mock" - core "github.com/yandex/pandora/core" ) diff --git a/core/aggregator/netsample/phout.go b/core/aggregator/netsample/phout.go index 7a770982c..a72733cf6 100644 --- a/core/aggregator/netsample/phout.go +++ b/core/aggregator/netsample/phout.go @@ -11,7 +11,6 @@ import ( "github.com/c2h5oh/datasize" "github.com/pkg/errors" "github.com/spf13/afero" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/coreutil" ) diff --git a/core/aggregator/netsample/phout_test.go b/core/aggregator/netsample/phout_test.go index 95e734758..33f6a7bea 100644 --- a/core/aggregator/netsample/phout_test.go +++ b/core/aggregator/netsample/phout_test.go @@ -8,7 +8,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/spf13/afero" - "github.com/yandex/pandora/core" ) diff --git a/core/aggregator/netsample/sample.go b/core/aggregator/netsample/sample.go index 90d790612..af76ae4c8 100644 --- a/core/aggregator/netsample/sample.go +++ b/core/aggregator/netsample/sample.go @@ -7,6 +7,7 @@ package netsample import ( "net" + "net/url" "os" "sync" "syscall" @@ -16,7 +17,9 @@ import ( ) const ( - ProtoCodeError = 999 + ProtoCodeError = 999 + DiscardedShootCodeError = 777 + DiscardedShootTag = "discarded" ) const ( @@ -117,6 +120,10 @@ func (s *Sample) String() string { } func getErrno(err error) int { + // + if e, ok := err.(net.Error); ok && e.Timeout() { + return 110 // Handle client Timeout as if it connection timeout + } // stackerr.Error and etc. type hasUnderlying interface { Underlying() error @@ -135,6 +142,8 @@ func getErrno(err error) int { err = typed.Err case *os.SyscallError: err = typed.Err + case *url.Error: + err = typed.Err case syscall.Errno: return int(typed) default: @@ -143,3 +152,13 @@ func getErrno(err error) int { } } } + +func DiscardedShootSample() *Sample { + sample := &Sample{ + timeStamp: time.Now(), + tags: DiscardedShootTag, + } + sample.SetUserNet(DiscardedShootCodeError) + + return sample +} diff --git a/core/aggregator/netsample/sample_test.go b/core/aggregator/netsample/sample_test.go index b084a510b..d8f302c8e 100644 --- a/core/aggregator/netsample/sample_test.go +++ b/core/aggregator/netsample/sample_test.go @@ -79,8 +79,8 @@ func TestCustomSets(t *testing.T) { tag, int(userDuration.Nanoseconds()/1000), // keyRTTMicro int(latency.Nanoseconds()/1000), // keyLatencyMicro - reqBytes, // keyRequestBytes - respBytes, // keyResponseBytes + reqBytes, // keyRequestBytes + respBytes, // keyResponseBytes 110, 0, ) diff --git a/core/aggregator/reporter.go b/core/aggregator/reporter.go index 5a3cfed1b..c7809e897 100644 --- a/core/aggregator/reporter.go +++ b/core/aggregator/reporter.go @@ -8,11 +8,10 @@ package aggregator import ( "fmt" - "go.uber.org/atomic" - "go.uber.org/zap" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/coreutil" + "go.uber.org/atomic" + "go.uber.org/zap" ) type ReporterConfig struct { diff --git a/core/aggregator/reporter_test.go b/core/aggregator/reporter_test.go index 880ee14da..a1f950fec 100644 --- a/core/aggregator/reporter_test.go +++ b/core/aggregator/reporter_test.go @@ -10,11 +10,10 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" - coremock "github.com/yandex/pandora/core/mocks" "github.com/yandex/pandora/lib/testutil" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) func TestReporter_DroppedErr(t *testing.T) { diff --git a/core/config/config.go b/core/config/config.go index 429cbbaf8..5652abc91 100644 --- a/core/config/config.go +++ b/core/config/config.go @@ -84,6 +84,7 @@ func AddKindHook(hook KindHook) (_ struct{}) { func DefaultHooks() []mapstructure.DecodeHookFunc { return []mapstructure.DecodeHookFunc{ + VariableInjectHook, DebugHook, TextUnmarshallerHook, mapstructure.StringToTimeDurationHookFunc(), diff --git a/core/config/config_test.go b/core/config/config_test.go index a602c6bc1..33086e662 100644 --- a/core/config/config_test.go +++ b/core/config/config_test.go @@ -6,12 +6,14 @@ package config import ( + "net" "testing" "time" "github.com/facebookgo/stack" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/yandex/pandora/lib/confutil" ) type M map[string]interface{} @@ -231,3 +233,31 @@ func TestNextSquash(t *testing.T) { require.NoError(t, err) assert.Equal(t, "baz", data.Level1.Level2.Foo) } + +func TestConfigEnvVarReplacement(t *testing.T) { + confutil.RegisterTagResolver("", confutil.EnvTagResolver) + confutil.RegisterTagResolver("ENV", confutil.EnvTagResolver) + + t.Setenv("ENV_VAR_1", "value1") + t.Setenv("VAR_2", "value2") + t.Setenv("INT_VAR_3", "15") + t.Setenv("IP_SEQ", "1.2") + var l1 struct { + Val1 string + Val2 string + Val3 int + Val4 net.IP + } + + err := Decode(M{ + "val1": "aa-${ENV_VAR_1}", + "val2": "${ENV:VAR_2}", + "val3": "${INT_VAR_3}", + "val4": "1.1.${ENV:IP_SEQ}", + }, &l1) + assert.NoError(t, err) + assert.Equal(t, "aa-value1", l1.Val1) + assert.Equal(t, "value2", l1.Val2) + assert.Equal(t, 15, l1.Val3) + assert.Equal(t, net.IPv4(1, 1, 1, 2), l1.Val4) +} diff --git a/core/config/hooks.go b/core/config/hooks.go index 2b4df157a..0fe69096b 100644 --- a/core/config/hooks.go +++ b/core/config/hooks.go @@ -17,9 +17,9 @@ import ( "github.com/c2h5oh/datasize" "github.com/facebookgo/stack" "github.com/pkg/errors" - "go.uber.org/zap" - + "github.com/yandex/pandora/lib/confutil" "github.com/yandex/pandora/lib/tag" + "go.uber.org/zap" ) var InvalidURLError = errors.New("string is not valid URL") @@ -138,3 +138,22 @@ func DebugHook(f reflect.Type, t reflect.Type, data interface{}) (p interface{}, ) return } + +// VariableInjectHook injects values into ${VAR_NAME} placeholders +func VariableInjectHook(f reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + if f.Kind() != reflect.String { + return data, nil + } + + str := data.(string) + res, err := confutil.ResolveCustomTags(str, t) + if err == confutil.ErrNoTagsFound { + return data, nil + } + + if err != nil { + return data, err + } + + return res, nil +} diff --git a/core/coretest/sink.go b/core/coretest/sink.go index d54c33167..191c44111 100644 --- a/core/coretest/sink.go +++ b/core/coretest/sink.go @@ -14,7 +14,6 @@ import ( "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/yandex/pandora/core" ) diff --git a/core/coretest/source.go b/core/coretest/source.go index ae5ac8f4c..9aa94bdb7 100644 --- a/core/coretest/source.go +++ b/core/coretest/source.go @@ -14,7 +14,6 @@ import ( "github.com/spf13/afero" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/lib/testutil" ) diff --git a/core/coreutil/buffer_size_config_test.go b/core/coreutil/buffer_size_config_test.go index 13223be73..aeb8fbd13 100644 --- a/core/coreutil/buffer_size_config_test.go +++ b/core/coreutil/buffer_size_config_test.go @@ -9,7 +9,7 @@ import ( "testing" "github.com/c2h5oh/datasize" - "github.com/magiconair/properties/assert" + "github.com/stretchr/testify/assert" ) func TestBufferSizeConfig_BufferSizeOrDefault(t *testing.T) { diff --git a/core/coreutil/schedule_test.go b/core/coreutil/schedule_test.go index 93d84f3c5..a4a3613b1 100644 --- a/core/coreutil/schedule_test.go +++ b/core/coreutil/schedule_test.go @@ -10,7 +10,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/yandex/pandora/core/schedule" ) diff --git a/core/coreutil/waiter.go b/core/coreutil/waiter.go index 06cd1c3fb..88d3ce3dc 100644 --- a/core/coreutil/waiter.go +++ b/core/coreutil/waiter.go @@ -14,8 +14,9 @@ import ( // Waiter goroutine unsafe wrapper for efficient waiting schedule. type Waiter struct { - sched core.Schedule - ctx context.Context + sched core.Schedule + ctx context.Context + slowDownItems int // Lazy initialized. timer *time.Timer @@ -33,23 +34,28 @@ func (w *Waiter) Wait() (ok bool) { // Check, that context is not done. Very quick: 5 ns for op, due to benchmark. select { case <-w.ctx.Done(): + w.slowDownItems = 0 return false default: } next, ok := w.sched.Next() if !ok { + w.slowDownItems = 0 return false } // Get current time lazily. // For once schedule, for example, we need to get it only once. if next.Before(w.lastNow) { + w.slowDownItems++ return true } w.lastNow = time.Now() waitFor := next.Sub(w.lastNow) if waitFor <= 0 { + w.slowDownItems++ return true } + w.slowDownItems = 0 // Lazy init. We don't need timer for unlimited and once schedule. if w.timer == nil { w.timer = time.NewTimer(waitFor) @@ -64,6 +70,16 @@ func (w *Waiter) Wait() (ok bool) { } } +// IsSlowDown returns true, if schedule contains 2 elements before current time. +func (w *Waiter) IsSlowDown() (ok bool) { + select { + case <-w.ctx.Done(): + return false + default: + return w.slowDownItems >= 2 + } +} + // IsFinished is quick check, that wait context is not canceled and there are some tokens left in // schedule. func (w *Waiter) IsFinished() (ok bool) { diff --git a/core/coreutil/waiter_test.go b/core/coreutil/waiter_test.go index 4cb0309e8..d5629ecc2 100644 --- a/core/coreutil/waiter_test.go +++ b/core/coreutil/waiter_test.go @@ -11,7 +11,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/yandex/pandora/core/schedule" ) diff --git a/core/datasink/file.go b/core/datasink/file.go index 48189f707..610cff137 100644 --- a/core/datasink/file.go +++ b/core/datasink/file.go @@ -10,7 +10,6 @@ import ( "os" "github.com/spf13/afero" - "github.com/yandex/pandora/core" ) diff --git a/core/datasink/file_test.go b/core/datasink/file_test.go index 018ba9720..df6ef0dd9 100644 --- a/core/datasink/file_test.go +++ b/core/datasink/file_test.go @@ -10,7 +10,6 @@ import ( "testing" "github.com/spf13/afero" - "github.com/yandex/pandora/core/coretest" ) diff --git a/core/datasource/file.go b/core/datasource/file.go index 0ebb346ef..f5a885521 100644 --- a/core/datasource/file.go +++ b/core/datasource/file.go @@ -10,7 +10,6 @@ import ( "os" "github.com/spf13/afero" - "github.com/yandex/pandora/core" ) diff --git a/core/engine/engine.go b/core/engine/engine.go index 957b5c749..a5ac5ed6e 100644 --- a/core/engine/engine.go +++ b/core/engine/engine.go @@ -10,15 +10,13 @@ import ( "fmt" "sync" - "github.com/yandex/pandora/core/warmup" - "github.com/pkg/errors" - "go.uber.org/zap" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/coreutil" + "github.com/yandex/pandora/core/warmup" "github.com/yandex/pandora/lib/errutil" "github.com/yandex/pandora/lib/monitoring" + "go.uber.org/zap" ) type Config struct { @@ -33,6 +31,7 @@ type InstancePoolConfig struct { RPSPerInstance bool `config:"rps-per-instance"` NewRPSSchedule func() (core.Schedule, error) `config:"rps" validate:"required"` StartupSchedule core.Schedule `config:"startup" validate:"required"` + DiscardOverflow bool `config:"discard_overflow"` } // TODO(skipor): use something github.com/rcrowley/go-metrics based. @@ -364,10 +363,9 @@ func (p *instancePool) startInstances( newInstanceSchedule func() (core.Schedule, error), runRes chan<- instanceRunResult) (started int, err error) { deps := instanceDeps{ - p.Aggregator, newInstanceSchedule, p.NewGun, - instanceSharedDeps{p.Provider, p.metrics, p.gunWarmUpResult}, + instanceSharedDeps{p.Provider, p.metrics, p.gunWarmUpResult, p.Aggregator, p.DiscardOverflow}, } waiter := coreutil.NewWaiter(p.StartupSchedule, startCtx) diff --git a/core/engine/engine_test.go b/core/engine/engine_test.go index 062018a27..5729b7456 100644 --- a/core/engine/engine_test.go +++ b/core/engine/engine_test.go @@ -9,8 +9,6 @@ import ( . "github.com/onsi/gomega" "github.com/pkg/errors" "github.com/stretchr/testify/mock" - "go.uber.org/atomic" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator" "github.com/yandex/pandora/core/config" @@ -18,6 +16,7 @@ import ( "github.com/yandex/pandora/core/provider" "github.com/yandex/pandora/core/schedule" "github.com/yandex/pandora/lib/ginkgoutil" + "go.uber.org/atomic" ) var _ = Describe("config validation", func() { diff --git a/core/engine/instance.go b/core/engine/instance.go index 42f8c882b..fdcd0b6a1 100644 --- a/core/engine/instance.go +++ b/core/engine/instance.go @@ -10,14 +10,13 @@ import ( "fmt" "io" - "github.com/yandex/pandora/core/warmup" - "github.com/pkg/errors" - "go.uber.org/zap" - "github.com/yandex/pandora/core" + "github.com/yandex/pandora/core/aggregator/netsample" "github.com/yandex/pandora/core/coreutil" + "github.com/yandex/pandora/core/warmup" "github.com/yandex/pandora/lib/tag" + "go.uber.org/zap" ) type instance struct { @@ -53,7 +52,6 @@ func newInstance(ctx context.Context, log *zap.Logger, poolID string, id int, de } type instanceDeps struct { - aggregator core.Aggregator newSchedule func() (core.Schedule, error) newGun func() (core.Gun, error) instanceSharedDeps @@ -63,6 +61,8 @@ type instanceSharedDeps struct { provider core.Provider Metrics gunWarmUpResult interface{} + aggregator core.Aggregator + discardOverflow bool } // Run blocks until ammo finish, error or context cancel. @@ -90,24 +90,34 @@ func (i *instance) shoot(ctx context.Context) (err error) { // Checking, that schedule is not finished, required, to not consume extra ammo, // on finish in case of per instance schedule. for !waiter.IsFinished() { - ammo, ok := i.provider.Acquire() - if !ok { - i.log.Debug("Out of ammo") - return outOfAmmoErr - } - if tag.Debug { - i.log.Debug("Ammo acquired", zap.Any("ammo", ammo)) - } - if !waiter.Wait() { - break - } - i.Metrics.Request.Add(1) - if tag.Debug { - i.log.Debug("Shooting", zap.Any("ammo", ammo)) + err := func() error { + ammo, ok := i.provider.Acquire() + if !ok { + i.log.Debug("Out of ammo") + return outOfAmmoErr + } + defer i.provider.Release(ammo) + if tag.Debug { + i.log.Debug("Ammo acquired", zap.Any("ammo", ammo)) + } + if !waiter.Wait() { + return nil + } + if !i.discardOverflow || !waiter.IsSlowDown() { + i.Metrics.Request.Add(1) + if tag.Debug { + i.log.Debug("Shooting", zap.Any("ammo", ammo)) + } + i.gun.Shoot(ammo) + i.Metrics.Response.Add(1) + } else { + i.aggregator.Report(netsample.DiscardedShootSample()) + } + return nil + }() + if err != nil { + return err } - i.gun.Shoot(ammo) - i.Metrics.Response.Add(1) - i.provider.Release(ammo) } return ctx.Err() } diff --git a/core/engine/instance_test.go b/core/engine/instance_test.go index e64d8fbf3..029d93aa6 100644 --- a/core/engine/instance_test.go +++ b/core/engine/instance_test.go @@ -8,7 +8,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" - "github.com/yandex/pandora/core" coremock "github.com/yandex/pandora/core/mocks" "github.com/yandex/pandora/core/schedule" @@ -48,13 +47,15 @@ var _ = Describe("Instance", func() { JustBeforeEach(func() { deps := instanceDeps{ - aggregator, + newSchedule, newGun, instanceSharedDeps{ provider, metrics, nil, + aggregator, + false, }, } ins, insCreateErr = newInstance(ctx, ginkgoutil.NewLogger(), "pool_0", 0, deps) diff --git a/core/import/import.go b/core/import/import.go index cbf1a440c..6d436094a 100644 --- a/core/import/import.go +++ b/core/import/import.go @@ -9,8 +9,6 @@ import ( "reflect" "github.com/spf13/afero" - "go.uber.org/zap" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator" "github.com/yandex/pandora/core/aggregator/netsample" @@ -22,7 +20,9 @@ import ( "github.com/yandex/pandora/core/provider" "github.com/yandex/pandora/core/register" "github.com/yandex/pandora/core/schedule" + "github.com/yandex/pandora/lib/confutil" "github.com/yandex/pandora/lib/tag" + "go.uber.org/zap" ) const ( @@ -88,11 +88,15 @@ func Import(fs afero.Fs) { register.Limiter("once", schedule.NewOnceConf) register.Limiter("unlimited", schedule.NewUnlimitedConf) register.Limiter("step", schedule.NewStepConf) + register.Limiter("instance_step", schedule.NewInstanceStepConf) register.Limiter(compositeScheduleKey, schedule.NewCompositeConf) config.AddTypeHook(sinkStringHook) config.AddTypeHook(scheduleSliceToCompositeConfigHook) + confutil.RegisterTagResolver("", confutil.EnvTagResolver) + confutil.RegisterTagResolver("ENV", confutil.EnvTagResolver) + // Required for decoding plugins. Need to be added after Composite Schedule hacky hook. pluginconfig.AddHooks() } diff --git a/core/import/import_suite_test.go b/core/import/import_suite_test.go index 2d296839c..a83785439 100644 --- a/core/import/import_suite_test.go +++ b/core/import/import_suite_test.go @@ -10,14 +10,13 @@ import ( . "github.com/onsi/gomega" "github.com/spf13/afero" "github.com/stretchr/testify/require" - "go.uber.org/zap" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/config" "github.com/yandex/pandora/core/coretest" "github.com/yandex/pandora/core/plugin" "github.com/yandex/pandora/lib/ginkgoutil" "github.com/yandex/pandora/lib/testutil" + "go.uber.org/zap" ) func TestImport(t *testing.T) { diff --git a/core/plugin/pluginconfig/hooks.go b/core/plugin/pluginconfig/hooks.go index fe669bb49..4c3e44106 100644 --- a/core/plugin/pluginconfig/hooks.go +++ b/core/plugin/pluginconfig/hooks.go @@ -14,11 +14,10 @@ import ( "strings" "github.com/pkg/errors" - "go.uber.org/zap" - "github.com/yandex/pandora/core/config" "github.com/yandex/pandora/core/plugin" "github.com/yandex/pandora/lib/tag" + "go.uber.org/zap" ) func AddHooks() { diff --git a/core/plugin/ptest_test.go b/core/plugin/ptest_test.go index cfae16f84..69850d285 100644 --- a/core/plugin/ptest_test.go +++ b/core/plugin/ptest_test.go @@ -10,7 +10,6 @@ import ( . "github.com/onsi/gomega" "github.com/pkg/errors" - "github.com/yandex/pandora/core/config" ) diff --git a/core/provider/chunk_decoder.go b/core/provider/chunk_decoder.go index 0904b4497..2fd851ca7 100644 --- a/core/provider/chunk_decoder.go +++ b/core/provider/chunk_decoder.go @@ -10,7 +10,6 @@ import ( "fmt" "github.com/pkg/errors" - "github.com/yandex/pandora/core" ) diff --git a/core/provider/decoder.go b/core/provider/decoder.go index 99beede72..01a923c38 100644 --- a/core/provider/decoder.go +++ b/core/provider/decoder.go @@ -11,11 +11,10 @@ import ( "io" "github.com/pkg/errors" - "go.uber.org/zap" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/lib/errutil" "github.com/yandex/pandora/lib/ioutil2" + "go.uber.org/zap" ) type NewAmmoDecoder func(deps core.ProviderDeps, source io.Reader) (AmmoDecoder, error) diff --git a/core/provider/json.go b/core/provider/json.go index 586935c58..820a991f0 100644 --- a/core/provider/json.go +++ b/core/provider/json.go @@ -9,7 +9,6 @@ import ( "io" jsoniter "github.com/json-iterator/go" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/coreutil" "github.com/yandex/pandora/lib/ioutil2" diff --git a/core/provider/json_test.go b/core/provider/json_test.go index 1bb44f222..1690d0086 100644 --- a/core/provider/json_test.go +++ b/core/provider/json_test.go @@ -13,7 +13,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/datasource" ) diff --git a/core/provider/num_test.go b/core/provider/num_test.go index a7722cb67..caf7242f6 100644 --- a/core/provider/num_test.go +++ b/core/provider/num_test.go @@ -5,7 +5,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/yandex/pandora/core" ) diff --git a/core/schedule/composite_test.go b/core/schedule/composite_test.go index e4e0dc86c..67c053837 100644 --- a/core/schedule/composite_test.go +++ b/core/schedule/composite_test.go @@ -11,7 +11,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/coretest" "go.uber.org/atomic" diff --git a/core/schedule/do_at.go b/core/schedule/do_at.go index e33b8ae23..dddcaf696 100644 --- a/core/schedule/do_at.go +++ b/core/schedule/do_at.go @@ -8,9 +8,8 @@ package schedule import ( "time" - "go.uber.org/atomic" - "github.com/yandex/pandora/core" + "go.uber.org/atomic" ) // DoAt returns when i'th operation should be performed, assuming that schedule diff --git a/core/schedule/instance_step.go b/core/schedule/instance_step.go new file mode 100644 index 000000000..9724854ef --- /dev/null +++ b/core/schedule/instance_step.go @@ -0,0 +1,35 @@ +// Copyright (c) 2017 Yandex LLC. All rights reserved. +// Use of this source code is governed by a MPL 2.0 +// license that can be found in the LICENSE file. +// Author: Vladimir Skipor + +package schedule + +import ( + "time" + + "github.com/yandex/pandora/core" +) + +func NewInstanceStep(from, to int64, step int64, stepDuration time.Duration) core.Schedule { + var nexts []core.Schedule + nexts = append(nexts, NewOnce(from)) + + for i := from + step; i <= to; i += step { + nexts = append(nexts, NewConst(0, stepDuration)) + nexts = append(nexts, NewOnce(step)) + } + + return NewCompositeConf(CompositeConf{nexts}) +} + +type InstanceStepConfig struct { + From int64 `validate:"min=0"` + To int64 `validate:"min=0"` + Step int64 `validate:"min=1"` + StepDuration time.Duration `validate:"min-time=1ms"` +} + +func NewInstanceStepConf(conf InstanceStepConfig) core.Schedule { + return NewInstanceStep(conf.From, conf.To, conf.Step, conf.StepDuration) +} diff --git a/core/schedule/schedule_suite_test.go b/core/schedule/schedule_suite_test.go index b63469a7a..aea7fe558 100644 --- a/core/schedule/schedule_suite_test.go +++ b/core/schedule/schedule_suite_test.go @@ -7,7 +7,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" - "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/coretest" "github.com/yandex/pandora/lib/ginkgoutil" @@ -207,6 +206,21 @@ var _ = Describe("step", func() { }) +var _ = Describe("instance_step", func() { + It("", func() { + conf := InstanceStepConfig{ + From: 1, + To: 3, + Step: 1, + StepDuration: 2 * time.Second, + } + testee := NewInstanceStepConf(conf) + Expect(testee.Left()).To(Equal(3)) + + }) + +}) + func BenchmarkLineSchedule(b *testing.B) { schedule := NewLine(0, float64(b.N), 2*time.Second) benchmarkScheduleNext(b, schedule) diff --git a/core/schedule/start_sync.go b/core/schedule/start_sync.go index 4b2b5215f..43f35682b 100644 --- a/core/schedule/start_sync.go +++ b/core/schedule/start_sync.go @@ -6,9 +6,9 @@ package schedule import ( - "go.uber.org/atomic" - "sync" + + "go.uber.org/atomic" ) // StartSync is util to make schedule start goroutine safe. diff --git a/docs/advanced.rst b/docs/advanced.rst index 03d2d0a52..25e843242 100644 --- a/docs/advanced.rst +++ b/docs/advanced.rst @@ -18,7 +18,7 @@ Pay attention to special header `Host` defined ``outside`` of Headers dictionary Ammofile sample: :: - {"uri": "/", "method": "GET", "headers": {"Accept": "*/*", "Accept-Encoding": "gzip, deflate", "User-Agent": "Pandora"}, "host": "example.com"} + {"tag": "tag1", "uri": "/", "method": "GET", "headers": {"Accept": "*/*", "Accept-Encoding": "gzip, deflate", "User-Agent": "Pandora"}, "host": "example.com"} Config sample: @@ -70,7 +70,7 @@ Config sample: type: raw # ammo format file: ./ammofile # ammo file path -You can redefine any headers using special config option `headers`. Format: list of strings. +You can define common headers using special config option `headers`. Headers in ammo file have priority. Format: list of strings. Example: @@ -111,7 +111,7 @@ Config sample: file: ./ammofile # ammo file path -You can redefine any headers using special config option `headers`. Format: list of strings. +You can define common headers using special config option `headers`. Headers in ammo file have priority. Format: list of strings. Example: @@ -124,3 +124,32 @@ Example: headers: - "[Host: yourhost.tld]" - "[User-Agent: some user agent]" + +Ammo filters +------------ + +Each http ammo provider lets you choose specific ammo for your test from ammo file with `chosencases` setting: +.. code-block:: yaml + + pools: + - ammo: + type: uri # ammo format + chosencases: ["tag1", "tag2"] # use only "tag1" and "tag2" ammo for this test + file: ./ammofile # ammo file path + +Tags are defined in ammo files as shown below: + +http/json: +:: + {"tag": "tag1", "uri": "/", + +raw (request-style): +:: + 73 tag1 + GET / HTTP/1.0 + +uri-style: +:: + /?drg tag1 + / + /buy tag2 \ No newline at end of file diff --git a/examples/custom_pandora/custom_main.go b/examples/custom_pandora/custom_main.go index b5304501d..c25569f85 100644 --- a/examples/custom_pandora/custom_main.go +++ b/examples/custom_pandora/custom_main.go @@ -10,13 +10,12 @@ import ( "time" "github.com/spf13/afero" - "go.uber.org/zap" - "github.com/yandex/pandora/cli" phttp "github.com/yandex/pandora/components/phttp/import" "github.com/yandex/pandora/core" coreimport "github.com/yandex/pandora/core/import" "github.com/yandex/pandora/core/register" + "go.uber.org/zap" ) type Ammo struct { diff --git a/go.mod b/go.mod index 565b6cadc..89c3b8493 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( github.com/hashicorp/go-multierror v0.0.0-20171204182908-b7773ae21874 github.com/jhump/protoreflect v1.10.1 github.com/json-iterator/go v0.0.0-20180214060632-e7c7f3b33712 - github.com/magiconair/properties v1.7.6 github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047 github.com/onsi/ginkgo v1.4.0 github.com/onsi/gomega v1.3.0 diff --git a/go.sum b/go.sum index 53cccadaa..f9b39fb6e 100644 --- a/go.sum +++ b/go.sum @@ -75,8 +75,6 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/magiconair/properties v1.7.6 h1:U+1DqNen04MdEPgFiIwdOUiqZ8qPa37xgogX/sd3+54= -github.com/magiconair/properties v1.7.6/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047 h1:zCoDWFD5nrJJVjbXiDZcVhOBSzKn3o9LgRLLMRNuru8= github.com/mitchellh/mapstructure v0.0.0-20180203102830-a4e142e9c047/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso= diff --git a/lib/confutil/chosen_cases_filter.go b/lib/confutil/chosen_cases_filter.go new file mode 100644 index 000000000..bd1619a5c --- /dev/null +++ b/lib/confutil/chosen_cases_filter.go @@ -0,0 +1,15 @@ +package confutil + +// Creates filter that returns true if ammo tag is in chosenCases. If no chosenCases provided - returns true +func IsChosenCase(checkCase string, chosenCases []string) bool { + if len(chosenCases) == 0 { + return true + } + + for _, c := range chosenCases { + if c == checkCase { + return true + } + } + return false +} diff --git a/lib/confutil/chosen_cases_filter_test.go b/lib/confutil/chosen_cases_filter_test.go new file mode 100644 index 000000000..c73a42c69 --- /dev/null +++ b/lib/confutil/chosen_cases_filter_test.go @@ -0,0 +1,25 @@ +package confutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestChosenCases(t *testing.T) { + type testCase struct { + ammoTag string + expected bool + } + + cases := []string{"tag1", "tag3"} + + tests := []testCase{ + {"tag1", true}, + {"tag2", false}, + } + + for _, tc := range tests { + assert.Equal(t, tc.expected, IsChosenCase(tc.ammoTag, cases)) + } +} diff --git a/lib/confutil/custom_tag_resolver.go b/lib/confutil/custom_tag_resolver.go new file mode 100644 index 000000000..6fd2be5a0 --- /dev/null +++ b/lib/confutil/custom_tag_resolver.go @@ -0,0 +1,178 @@ +package confutil + +import ( + "errors" + "fmt" + "reflect" + "regexp" + "strconv" + "strings" +) + +var ( + notoken = "" + ErrNoTagsFound = errors.New("no tags found") + ErrUnsupportedKind = errors.New("unsupported kind") + ErrCantCastVariableToTargetType = errors.New("can't cast variable") + ErrResolverNotRegistered = errors.New("unknown tag type") +) + +type TagResolver func(string) (string, error) + +type tagEntry struct { + tagType string + string string + varname string +} + +var resolvers map[string]TagResolver = make(map[string]TagResolver) + +// Register custom tag resolver for config variables +func RegisterTagResolver(tagType string, resolver TagResolver) { + tagType = strings.ToLower(tagType) + // silent overwrite existing resolver + resolvers[tagType] = resolver +} + +func getTagResolver(tagType string) (TagResolver, error) { + tagType = strings.ToLower(tagType) + r, ok := resolvers[tagType] + if !ok { + return nil, ErrResolverNotRegistered + } + return r, nil +} + +// Resolve config variables in format ${tagType:variable} +func ResolveCustomTags(s string, targetType reflect.Type) (interface{}, error) { + tokens, err := findTags(s) + if err != nil { + return nil, err + } + + if len(tokens) == 0 { + return s, ErrNoTagsFound + } + + res := s + for _, t := range tokens { + resolver, err := getTagResolver(t.tagType) + if err == ErrResolverNotRegistered { + continue + } else if err != nil { + return nil, err + } + + resolved, err := resolver(t.varname) + if err != nil { + return nil, err + } + res = strings.ReplaceAll(res, t.string, resolved) + } + + // cast to target kind only if the whole value is a tag + // otherwise let other hooks process result + if len(tokens) == 1 && strings.TrimSpace(s) == tokens[0].string { + return cast(res, targetType) + } else { + return res, nil + } +} + +func findTags(s string) ([]*tagEntry, error) { + tagRegexp := regexp.MustCompile(`\$\{(?:([^}]+?):)?([^{}]+?)\}`) + tokensFound := tagRegexp.FindAllStringSubmatch(s, -1) + result := make([]*tagEntry, 0, len(tokensFound)) + + for _, token := range tokensFound { + tag := &tagEntry{ + tagType: strings.TrimSpace(token[1]), + varname: strings.TrimSpace(token[2]), + string: token[0], + } + result = append(result, tag) + } + + return result, nil +} + +func cast(v string, t reflect.Type) (interface{}, error) { + switch t.Kind() { + case reflect.Bool: + return castBool(v) + case reflect.Int, + reflect.Int8, + reflect.Int16, + reflect.Int32, + reflect.Int64, + reflect.Uint, + reflect.Uint8, + reflect.Uint16, + reflect.Uint32, + reflect.Uint64: + return castInt(v, t) + case reflect.Float32, + reflect.Float64: + return castFloat(v, t) + case reflect.String: + return v, nil + } + return nil, ErrUnsupportedKind +} + +func castBool(v string) (interface{}, error) { + res, err := strconv.ParseBool(v) + if err != nil { + return false, fmt.Errorf("'%s' cast to bool failed: %w", v, ErrCantCastVariableToTargetType) + } + + return res, nil +} + +func castInt(v string, t reflect.Type) (interface{}, error) { + intV, err := strconv.ParseInt(v, 0, t.Bits()) + if err != nil { + return nil, fmt.Errorf("'%s' cast to %s failed: %w", v, t, ErrCantCastVariableToTargetType) + } + + switch t.Kind() { + case reflect.Int: + return int(intV), nil + case reflect.Int8: + return int8(intV), nil + case reflect.Int16: + return int16(intV), nil + case reflect.Int32: + return int32(intV), nil + case reflect.Int64: + return int64(intV), nil + case reflect.Uint: + return uint(intV), nil + case reflect.Uint8: + return uint8(intV), nil + case reflect.Uint16: + return uint16(intV), nil + case reflect.Uint32: + return uint32(intV), nil + case reflect.Uint64: + return uint64(intV), nil + } + + return nil, ErrUnsupportedKind +} + +func castFloat(v string, t reflect.Type) (interface{}, error) { + intV, err := strconv.ParseFloat(v, 64) + if err != nil { + return 0.0, fmt.Errorf("'%s' cast to %s failed: %w", v, t, ErrCantCastVariableToTargetType) + } + + switch t.Kind() { + case reflect.Float32: + return float32(intV), nil + case reflect.Float64: + return float64(intV), nil + } + + return nil, ErrUnsupportedKind +} diff --git a/lib/confutil/custom_tag_resolver_test.go b/lib/confutil/custom_tag_resolver_test.go new file mode 100644 index 000000000..5a887d580 --- /dev/null +++ b/lib/confutil/custom_tag_resolver_test.go @@ -0,0 +1,122 @@ +package confutil + +import ( + "fmt" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestStringToExpectedCast(t *testing.T) { + type testCase struct { + val string + expected interface{} + err error + } + + tests := []testCase{ + {"True", true, nil}, + {"T", true, nil}, + {"t", true, nil}, + {"TRUE", true, nil}, + {"true", true, nil}, + {"1", true, nil}, + {"False", false, nil}, + {"false", false, nil}, + {"0", false, nil}, + {"f", false, nil}, + {"", false, ErrCantCastVariableToTargetType}, + + {"11", uint(11), nil}, + {"10", uint8(10), nil}, + {"10", uint16(10), nil}, + {"10", uint32(10), nil}, + {"10", uint64(10), nil}, + {"11", int(11), nil}, + {"10", int8(10), nil}, + {"10", int16(10), nil}, + {"10", int32(10), nil}, + {"10", int64(10), nil}, + {"", int(0), ErrCantCastVariableToTargetType}, + {"asdf", int(0), ErrCantCastVariableToTargetType}, + {" -14", int(0), ErrCantCastVariableToTargetType}, + + {"-10", float32(-10), nil}, + {"10.23", float32(10.23), nil}, + {"-10", float64(-10), nil}, + {"10.23", float64(10.23), nil}, + {"", float64(0), ErrCantCastVariableToTargetType}, + {"asdf", float64(0), ErrCantCastVariableToTargetType}, + {" -14", float64(0), ErrCantCastVariableToTargetType}, + + {"10", "10", nil}, + {"value-port", "value-port", nil}, + {"", "", nil}, + } + + for _, test := range tests { + expectedType := reflect.TypeOf(test.expected) + t.Run(fmt.Sprintf("Test string to %s cast", expectedType), func(t *testing.T) { + actual, err := cast(test.val, expectedType) + if test.err == nil { + assert.NoError(t, err) + assert.Exactly(t, test.expected, actual) + } else { + assert.ErrorIs(t, err, test.err) + } + + }) + } +} + +func TestFindTokens(t *testing.T) { + type testCase struct { + val string + expected []*tagEntry + err error + } + + tests := []testCase{ + { + "${token}", + []*tagEntry{{string: "${token}", tagType: "", varname: "token"}}, + nil, + }, + { + "${token}-${ second\t}", + []*tagEntry{ + {string: "${token}", tagType: "", varname: "token"}, + {string: "${ second\t}", tagType: "", varname: "second"}, + }, + nil, + }, + { + "asdf${ee:token}aa", + []*tagEntry{ + {string: "${ee:token}", tagType: "ee", varname: "token"}, + }, + nil, + }, + { + "asdf${ee: to:ken}aa-${ e2 :to }ken}", + []*tagEntry{ + {string: "${ee: to:ken}", tagType: "ee", varname: "to:ken"}, + {string: "${ e2 :to }", tagType: "e2", varname: "to"}, + }, + nil, + }, + } + + for _, test := range tests { + t.Run(fmt.Sprintf("Test findTokens in %s", test.val), func(t *testing.T) { + actual, err := findTags(test.val) + if test.err == nil { + assert.NoError(t, err) + assert.EqualValues(t, test.expected, actual) + } else { + assert.ErrorIs(t, err, test.err) + } + }) + } +} diff --git a/lib/confutil/env_var_resolver.go b/lib/confutil/env_var_resolver.go new file mode 100644 index 000000000..a3cb79072 --- /dev/null +++ b/lib/confutil/env_var_resolver.go @@ -0,0 +1,24 @@ +package confutil + +import ( + "errors" + "fmt" + "os" +) + +var ErrEnvVariableNotProvided error = errors.New("env variable not set") + +// Resolve custom tag token with env variable value +var EnvTagResolver TagResolver = envTokenResolver + +func envTokenResolver(in string) (string, error) { + // TODO: windows os is case-insensitive for env variables, + // so it may requre to load all vars and lookup for env var manually + + val, ok := os.LookupEnv(in) + if !ok { + return "", fmt.Errorf("%s: %w", in, ErrEnvVariableNotProvided) + } + + return val, nil +} diff --git a/lib/confutil/env_var_resolver_test.go b/lib/confutil/env_var_resolver_test.go new file mode 100644 index 000000000..0d452032b --- /dev/null +++ b/lib/confutil/env_var_resolver_test.go @@ -0,0 +1,37 @@ +package confutil + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEnvVarResolver(t *testing.T) { + type testCase struct { + varname string + val string + err error + } + + tests := []testCase{ + {"SOME_BOOL", "True", nil}, + {"INT_VALUE", "10", nil}, + {"V_NAME", "10", nil}, + } + + for _, test := range tests { + t.Setenv(test.varname, test.val) + } + + tests = append(tests, testCase{"NOT_EXISTS", "", ErrEnvVariableNotProvided}) + + for _, test := range tests { + actual, err := envTokenResolver(test.varname) + if test.err != nil { + assert.ErrorIs(t, err, test.err) + } else { + assert.NoError(t, err) + assert.Exactly(t, test.val, actual) + } + } +} diff --git a/lib/netutil/dial.go b/lib/netutil/dial.go index cc7b7527b..e03de0a9d 100644 --- a/lib/netutil/dial.go +++ b/lib/netutil/dial.go @@ -9,6 +9,7 @@ import ( "context" "net" "sync" + "time" "github.com/pkg/errors" ) @@ -56,8 +57,8 @@ var DefaultDNSCache = &SimpleDNSCache{} // This method has much more overhead, but get guaranteed reachable resolved addr. // Example: host is resolved to IPv4 and IPv6, but IPv4 is not working on machine. // LookupReachable will return IPv6 in that case. -func LookupReachable(addr string) (string, error) { - d := net.Dialer{DualStack: true} +func LookupReachable(addr string, timeout time.Duration) (string, error) { + d := net.Dialer{DualStack: true, Timeout: timeout} conn, err := d.Dial("tcp", addr) if err != nil { return "", err diff --git a/lib/netutil/netutil_suite_test.go b/lib/netutil/netutil_suite_test.go index 8103e13d0..a397d4121 100644 --- a/lib/netutil/netutil_suite_test.go +++ b/lib/netutil/netutil_suite_test.go @@ -5,10 +5,10 @@ import ( "net" "strconv" "testing" + "time" "github.com/onsi/ginkgo" "github.com/onsi/gomega" - "github.com/pkg/errors" "github.com/yandex/pandora/lib/ginkgoutil" netmock "github.com/yandex/pandora/lib/netutil/mocks" @@ -29,7 +29,7 @@ var _ = ginkgo.Describe("DNS", func() { addr := "localhost:" + port expectedResolved := "127.0.0.1:" + port - resolved, err := LookupReachable(addr) + resolved, err := LookupReachable(addr, time.Second) gomega.Expect(err).NotTo(gomega.HaveOccurred()) gomega.Expect(resolved).To(gomega.Equal(expectedResolved)) }) diff --git a/lib/tag/debug.go b/lib/tag/debug.go index aaa7c857c..ee36c25b0 100644 --- a/lib/tag/debug.go +++ b/lib/tag/debug.go @@ -3,6 +3,7 @@ // license that can be found in the LICENSE file. // Author: Vladimir Skipor +//go:build debug // +build debug package tag diff --git a/lib/tag/race.go b/lib/tag/race.go index a19547375..0cdaf0afb 100644 --- a/lib/tag/race.go +++ b/lib/tag/race.go @@ -3,6 +3,7 @@ // license that can be found in the LICENSE file. // Author: Vladimir Skipor +//go:build race // +build race package tag diff --git a/main.go b/main.go index bcad8b157..dfd8cba40 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,6 @@ package main import ( "github.com/spf13/afero" - "github.com/yandex/pandora/cli" example "github.com/yandex/pandora/components/example/import" grpc "github.com/yandex/pandora/components/grpc/import" diff --git a/test-sync.txt b/test-sync.txt new file mode 100644 index 000000000..7c9415122 --- /dev/null +++ b/test-sync.txt @@ -0,0 +1 @@ +test-sync-with-arcadia