diff --git a/README.md b/README.md index ede99d84d..948a82dab 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Run the binary with your config (see config examples at [examples](https://githu pandora myconfig.yaml ``` -Or use Pandora with [Yandex.Tank](http://yandextank.readthedocs.org/en/latest/configuration.html#pandora) and +Or use Pandora with [Yandex.Tank](https://yandextank.readthedocs.io/en/latest/core_and_modules.html#pandora) and [Overload](https://overload.yandex.net). ### Documentation diff --git a/cli/cli.go b/cli/cli.go index 66a21e4b6..f2659c30a 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -29,7 +29,6 @@ const Version = "0.3.5" const defaultConfigFile = "load" const stdinConfigSelector = "-" -var useStdinConfig = false var configSearchDirs = []string{"./", "./config", "/etc/pandora"} type cliConfig struct { @@ -189,6 +188,7 @@ func readConfig() *cliConfig { v := newViper() + var useStdinConfig = false args := flag.Args() if len(args) > 0 { switch { diff --git a/components/grpc/core.go b/components/grpc/core.go index 097ada8cf..62414437f 100644 --- a/components/grpc/core.go +++ b/components/grpc/core.go @@ -3,21 +3,24 @@ package grpc import ( "context" "encoding/json" + "fmt" "log" "time" + "github.com/jhump/protoreflect/grpcreflect" + "github.com/yandex/pandora/core/warmup" + reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" + "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/aggregator/netsample" "github.com/jhump/protoreflect/desc" "github.com/jhump/protoreflect/dynamic" "github.com/jhump/protoreflect/dynamic/grpcdynamic" - "github.com/jhump/protoreflect/grpcreflect" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" - reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" "google.golang.org/grpc/status" ) @@ -34,7 +37,8 @@ type Sample struct { } type GunConfig struct { - Target string `validate:"required"` + Target string `validate:"required"` + Timeout time.Duration `config:"timeout"` // grpc request timeout } type Gun struct { @@ -48,52 +52,72 @@ type Gun struct { services map[string]desc.MethodDescriptor } -func NewGun(conf GunConfig) *Gun { - return &Gun{conf: conf} -} - -func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error { +func (g *Gun) WarmUp(opts *warmup.Options) (interface{}, error) { conn, err := grpc.Dial( g.conf.Target, grpc.WithInsecure(), grpc.WithTimeout(time.Second), grpc.WithUserAgent("load test, pandora universal grpc shooter")) if err != nil { - log.Fatalf("FATAL: grpc.Dial failed\n %s\n", err) - } - g.client = conn - g.aggr = aggr - g.GunDeps = deps - g.stub = grpcdynamic.NewStub(conn) - - log := deps.Log - - if ent := log.Check(zap.DebugLevel, "Gun bind"); ent != nil { - // Enable debug level logging during shooting. Creating log entries isn't free. - g.DebugLog = true + return nil, fmt.Errorf("failed to connect to target: %w", err) } + defer conn.Close() meta := make(metadata.MD) refCtx := metadata.NewOutgoingContext(context.Background(), meta) refClient := grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(conn)) listServices, err := refClient.ListServices() if err != nil { - log.Fatal("Fatal: failed to get services list\n %s\n", zap.Error(err)) + opts.Log.Fatal("Fatal: failed to get services list\n %s\n", zap.Error(err)) } - g.services = make(map[string]desc.MethodDescriptor) + services := make(map[string]desc.MethodDescriptor) for _, s := range listServices { service, err := refClient.ResolveService(s) if err != nil { if grpcreflect.IsElementNotFoundError(err) { continue } - log.Fatal("FATAL ResolveService: %s", zap.Error(err)) + opts.Log.Fatal("FATAL ResolveService: %s", zap.Error(err)) } listMethods := service.GetMethods() for _, m := range listMethods { - g.services[m.GetFullyQualifiedName()] = *m + services[m.GetFullyQualifiedName()] = *m } } + return services, nil +} + +func (g *Gun) AcceptWarmUpResult(i interface{}) error { + services, ok := i.(map[string]desc.MethodDescriptor) + if !ok { + return fmt.Errorf("grpc WarmUp result should be services: map[string]desc.MethodDescriptor") + } + g.services = services + return nil +} + +func NewGun(conf GunConfig) *Gun { + return &Gun{conf: conf} +} + +func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error { + conn, err := grpc.Dial( + g.conf.Target, + grpc.WithInsecure(), + grpc.WithTimeout(time.Second), + grpc.WithUserAgent("load test, pandora universal grpc shooter")) + if err != nil { + log.Fatalf("FATAL: grpc.Dial failed\n %s\n", err) + } + g.client = conn + g.aggr = aggr + g.GunDeps = deps + g.stub = grpcdynamic.NewStub(conn) + + if ent := deps.Log.Check(zap.DebugLevel, "Gun bind"); ent != nil { + // Enable debug level logging during shooting. Creating log entries isn't free. + g.DebugLog = true + } return nil } @@ -140,7 +164,14 @@ func (g *Gun) shoot(ammo *Ammo) { } } - ctx := metadata.NewOutgoingContext(context.Background(), meta) + timeout := time.Second * 15 + if g.conf.Timeout != 0 { + timeout = time.Second * g.conf.Timeout + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ctx = metadata.NewOutgoingContext(ctx, meta) out, err := g.stub.InvokeRpc(ctx, &method, message) code = convertGrpcStatus(err) diff --git a/components/grpc/core_tests/core_test.go b/components/grpc/core_tests/core_test.go new file mode 100644 index 000000000..e355c4adb --- /dev/null +++ b/components/grpc/core_tests/core_test.go @@ -0,0 +1,12 @@ +package core + +import ( + "testing" + + "github.com/yandex/pandora/components/grpc" + "github.com/yandex/pandora/core/warmup" +) + +func TestGrpcGunImplementsWarmedUp(t *testing.T) { + _ = warmup.WarmedUp(&grpc.Gun{}) +} diff --git a/components/phttp/base.go b/components/phttp/base.go index 1409327ca..023443fb9 100644 --- a/components/phttp/base.go +++ b/components/phttp/base.go @@ -6,10 +6,13 @@ package phttp import ( + "bytes" "context" + "fmt" "io" "io/ioutil" "net/http" + "net/http/httputil" "net/url" "github.com/pkg/errors" @@ -25,6 +28,7 @@ const ( type BaseGunConfig struct { AutoTag AutoTagConfig `config:"auto-tag"` + AnswLog AnswLogConfig `config:"answlog"` } // AutoTagConfig configure automatic tags generation based on ammo URI. First AutoTag URI path elements becomes tag. @@ -35,13 +39,25 @@ type AutoTagConfig struct { NoTagOnly bool `config:"no-tag-only"` // When true, autotagged only ammo that has no tag before. } +type AnswLogConfig struct { + Enabled bool `config:"enabled"` + Path string `config:"path"` + Filter string `config:"filter" valid:"oneof=all warning error"` +} + func DefaultBaseGunConfig() BaseGunConfig { return BaseGunConfig{ AutoTagConfig{ Enabled: false, URIElements: 2, NoTagOnly: true, - }} + }, + AnswLogConfig{ + Enabled: false, + Path: "answ.log", + Filter: "error", + }, + } } type BaseGun struct { @@ -51,6 +67,7 @@ type BaseGun struct { Connect func(ctx context.Context) error // Optional hook. OnClose func() error // Optional. Called on Close(). Aggregator netsample.Aggregator // Lazy set via BindResultTo. + AnswLog *zap.Logger core.GunDeps } @@ -78,6 +95,7 @@ func (b *BaseGun) Bind(aggregator netsample.Aggregator, deps core.GunDeps) error // Shoot is thread safe iff Do and Connect hooks are thread safe. func (b *BaseGun) Shoot(ammo Ammo) { + var bodyBytes []byte if b.Aggregator == nil { zap.L().Panic("must bind before shoot") } @@ -100,13 +118,15 @@ func (b *BaseGun) Shoot(ammo Ammo) { if b.DebugLog { b.Log.Debug("Prepared ammo to shoot", zap.Stringer("url", req.URL)) } - if b.Config.AutoTag.Enabled && (!b.Config.AutoTag.NoTagOnly || sample.Tags() == "") { sample.AddTag(autotag(b.Config.AutoTag.URIElements, req.URL)) } if sample.Tags() == "" { sample.AddTag(EmptyTag) } + if b.Config.AnswLog.Enabled { + bodyBytes = GetBody(req) + } var err error defer func() { @@ -127,6 +147,22 @@ func (b *BaseGun) Shoot(ammo Ammo) { if b.DebugLog { b.verboseLogging(res) } + if b.Config.AnswLog.Enabled { + switch b.Config.AnswLog.Filter { + case "all": + b.answLogging(req, bodyBytes, res) + + case "warning": + if res.StatusCode >= 400 { + b.answLogging(req, bodyBytes, res) + } + + case "error": + if res.StatusCode >= 500 { + b.answLogging(req, bodyBytes, res) + } + } + } sample.SetProtoCode(res.StatusCode) defer res.Body.Close() @@ -177,6 +213,27 @@ func (b *BaseGun) verboseLogging(res *http.Response) { ) } +func (b *BaseGun) answLogging(req *http.Request, bodyBytes []byte, res *http.Response) { + isBody := false + if bodyBytes != nil { + req.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes)) + isBody = true + } + dump, err := httputil.DumpRequestOut(req, isBody) + if err != nil { + zap.L().Error("Error dumping request: %s", zap.Error(err)) + } + msg := fmt.Sprintf("REQUEST:\n%s\n\n", string(dump)) + b.AnswLog.Debug(msg) + + dump, err = httputil.DumpResponse(res, true) + if err != nil { + zap.L().Error("Error dumping response: %s", zap.Error(err)) + } + msg = fmt.Sprintf("RESPONSE:\n%s", string(dump)) + b.AnswLog.Debug(msg) +} + func autotag(depth int, URL *url.URL) string { path := URL.Path var ind int @@ -190,3 +247,14 @@ func autotag(depth int, URL *url.URL) string { } return path[:ind] } + +func GetBody(req *http.Request) []byte { + if req.Body != nil && req.Body != http.NoBody { + bodyBytes, _ := ioutil.ReadAll(req.Body) + req.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes)) + return bodyBytes + } + + return nil + +} diff --git a/components/phttp/base_test.go b/components/phttp/base_test.go index 7effd2ddd..d8aa9eb69 100644 --- a/components/phttp/base_test.go +++ b/components/phttp/base_test.go @@ -18,6 +18,7 @@ import ( . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" "github.com/stretchr/testify/mock" + "go.uber.org/zap" ammomock "github.com/yandex/pandora/components/phttp/mocks" "github.com/yandex/pandora/core" @@ -109,6 +110,7 @@ var _ = Describe("BaseGun", func() { Context("Do ok", func() { BeforeEach(func() { body = ioutil.NopCloser(strings.NewReader("aaaaaaa")) + base.AnswLog = zap.NewNop() base.Do = func(doReq *http.Request) (*http.Response, error) { Expect(doReq).To(Equal(req)) return res, nil diff --git a/components/phttp/connect.go b/components/phttp/connect.go index bf8340cdb..110ac58a3 100644 --- a/components/phttp/connect.go +++ b/components/phttp/connect.go @@ -16,6 +16,7 @@ import ( "github.com/pkg/errors" "github.com/yandex/pandora/lib/netutil" + "go.uber.org/zap" ) type ConnectGunConfig struct { @@ -26,7 +27,7 @@ type ConnectGunConfig struct { BaseGunConfig `config:",squash"` } -func NewConnectGun(conf ConnectGunConfig) *ConnectGun { +func NewConnectGun(conf ConnectGunConfig, answLog *zap.Logger) *ConnectGun { scheme := "http" if conf.SSL { scheme = "https" @@ -41,6 +42,7 @@ func NewConnectGun(conf ConnectGunConfig) *ConnectGun { client.CloseIdleConnections() return nil }, + AnswLog: answLog, }, scheme: scheme, client: client, diff --git a/components/phttp/connect_test.go b/components/phttp/connect_test.go index 7492fa7ca..0a0ff3c13 100644 --- a/components/phttp/connect_test.go +++ b/components/phttp/connect_test.go @@ -15,6 +15,7 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + "go.uber.org/zap" "github.com/yandex/pandora/core/aggregator/netsample" ) @@ -85,9 +86,10 @@ var _ = Describe("connect", func() { proxy := httptest.NewServer(tunnelHandler(origin.URL)) defer proxy.Close() + log := zap.NewNop() conf := DefaultConnectGunConfig() conf.Target = proxy.Listener.Addr().String() - connectGun := NewConnectGun(conf) + connectGun := NewConnectGun(conf, log) results := &netsample.TestAggregator{} _ = connectGun.Bind(results, testDeps()) diff --git a/components/phttp/http.go b/components/phttp/http.go index c057b49f6..0e8caf93c 100644 --- a/components/phttp/http.go +++ b/components/phttp/http.go @@ -9,6 +9,7 @@ import ( "net/http" "github.com/pkg/errors" + "go.uber.org/zap" ) type ClientGunConfig struct { @@ -27,14 +28,14 @@ type HTTP2GunConfig struct { Client ClientConfig `config:",squash"` } -func NewHTTPGun(conf HTTPGunConfig) *HTTPGun { +func NewHTTPGun(conf HTTPGunConfig, answLog *zap.Logger) *HTTPGun { transport := NewTransport(conf.Client.Transport, NewDialer(conf.Client.Dialer).DialContext) client := newClient(transport, conf.Client.Redirect) - return NewClientGun(client, conf.Gun) + return NewClientGun(client, conf.Gun, answLog) } // NewHTTP2Gun return simple HTTP/2 gun that can shoot sequentially through one connection. -func NewHTTP2Gun(conf HTTP2GunConfig) (*HTTPGun, error) { +func NewHTTP2Gun(conf HTTP2GunConfig, answLog *zap.Logger) (*HTTPGun, error) { if !conf.Gun.SSL { // Open issue on github if you really need this feature. return nil, errors.New("HTTP/2.0 over TCP is not supported. Please leave SSL option true by default.") @@ -43,10 +44,10 @@ func NewHTTP2Gun(conf HTTP2GunConfig) (*HTTPGun, error) { client := newClient(transport, conf.Client.Redirect) // Will panic and cancel shooting whet target doesn't support HTTP/2. client = &panicOnHTTP1Client{client} - return NewClientGun(client, conf.Gun), nil + return NewClientGun(client, conf.Gun, answLog), nil } -func NewClientGun(client Client, conf ClientGunConfig) *HTTPGun { +func NewClientGun(client Client, conf ClientGunConfig, answLog *zap.Logger) *HTTPGun { scheme := "http" if conf.SSL { scheme = "https" @@ -60,6 +61,7 @@ func NewClientGun(client Client, conf ClientGunConfig) *HTTPGun { client.CloseIdleConnections() return nil }, + AnswLog: answLog, }, scheme: scheme, target: conf.Target, diff --git a/components/phttp/http_test.go b/components/phttp/http_test.go index f9a18d51b..7a378c979 100644 --- a/components/phttp/http_test.go +++ b/components/phttp/http_test.go @@ -45,10 +45,11 @@ var _ = Describe("BaseGun", func() { actualReq = req })) defer server.Close() + log := zap.NewNop() conf := DefaultHTTPGunConfig() conf.Gun.Target = strings.TrimPrefix(server.URL, "http://") results := &netsample.TestAggregator{} - httpGun := NewHTTPGun(conf) + httpGun := NewHTTPGun(conf, log) _ = httpGun.Bind(results, testDeps()) am := newAmmoReq(expectedReq) @@ -94,10 +95,11 @@ var _ = Describe("HTTP", func() { server.Start() } defer server.Close() + log := zap.NewNop() conf := DefaultHTTPGunConfig() conf.Gun.Target = server.Listener.Addr().String() conf.Gun.SSL = https - gun := NewHTTPGun(conf) + gun := NewHTTPGun(conf, log) var aggr netsample.TestAggregator _ = gun.Bind(&aggr, testDeps()) gun.Shoot(newAmmoURL("/")) @@ -119,10 +121,11 @@ var _ = Describe("HTTP", func() { } })) defer server.Close() + log := zap.NewNop() conf := DefaultHTTPGunConfig() conf.Gun.Target = server.Listener.Addr().String() conf.Client.Redirect = redirect - gun := NewHTTPGun(conf) + gun := NewHTTPGun(conf, log) var aggr netsample.TestAggregator _ = gun.Bind(&aggr, testDeps()) gun.Shoot(newAmmoURL("/redirect")) @@ -156,10 +159,11 @@ var _ = Describe("HTTP", func() { Expect(err).NotTo(HaveOccurred()) Expect(res.StatusCode).To(Equal(http.StatusForbidden)) + log := zap.NewNop() conf := DefaultHTTPGunConfig() conf.Gun.Target = server.Listener.Addr().String() conf.Gun.SSL = true - gun := NewHTTPGun(conf) + gun := NewHTTPGun(conf, log) var results netsample.TestAggregator _ = gun.Bind(&results, testDeps()) gun.Shoot(newAmmoURL("/")) @@ -180,9 +184,10 @@ var _ = Describe("HTTP/2", func() { } })) defer server.Close() + log := zap.NewNop() conf := DefaultHTTP2GunConfig() conf.Gun.Target = server.Listener.Addr().String() - gun, _ := NewHTTP2Gun(conf) + gun, _ := NewHTTP2Gun(conf, log) var results netsample.TestAggregator _ = gun.Bind(&results, testDeps()) gun.Shoot(newAmmoURL("/")) @@ -194,9 +199,10 @@ var _ = Describe("HTTP/2", func() { zap.S().Info("Served") })) defer server.Close() + log := zap.NewNop() conf := DefaultHTTP2GunConfig() conf.Gun.Target = server.Listener.Addr().String() - gun, _ := NewHTTP2Gun(conf) + gun, _ := NewHTTP2Gun(conf, log) var results netsample.TestAggregator _ = gun.Bind(&results, testDeps()) var r interface{} @@ -215,10 +221,11 @@ var _ = Describe("HTTP/2", func() { zap.S().Info("Served") })) defer server.Close() + log := zap.NewNop() conf := DefaultHTTP2GunConfig() conf.Gun.Target = server.Listener.Addr().String() conf.Gun.SSL = false - _, err := NewHTTP2Gun(conf) + _, err := NewHTTP2Gun(conf, log) Expect(err).To(HaveOccurred()) }) diff --git a/components/phttp/import/import.go b/components/phttp/import/import.go index 016eb47f5..9d366b612 100644 --- a/components/phttp/import/import.go +++ b/components/phttp/import/import.go @@ -18,10 +18,12 @@ import ( "github.com/yandex/pandora/components/phttp/ammo/simple/uripost" "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/register" + "github.com/yandex/pandora/lib/answlog" "github.com/yandex/pandora/lib/netutil" ) func Import(fs afero.Fs) { + register.Provider("http/json", func(conf jsonline.Config) core.Provider { return jsonline.NewProvider(fs, conf) }) @@ -40,21 +42,24 @@ func Import(fs afero.Fs) { register.Gun("http", func(conf phttp.HTTPGunConfig) func() core.Gun { _ = preResolveTargetAddr(&conf.Client, &conf.Gun.Target) - return func() core.Gun { return phttp.WrapGun(phttp.NewHTTPGun(conf)) } + answLog := answlog.Init(conf.Gun.Base.AnswLog.Path) + return func() core.Gun { return phttp.WrapGun(phttp.NewHTTPGun(conf, answLog)) } }, phttp.DefaultHTTPGunConfig) register.Gun("http2", func(conf phttp.HTTP2GunConfig) func() (core.Gun, error) { _ = preResolveTargetAddr(&conf.Client, &conf.Gun.Target) + answLog := answlog.Init(conf.Gun.Base.AnswLog.Path) return func() (core.Gun, error) { - gun, err := phttp.NewHTTP2Gun(conf) + gun, err := phttp.NewHTTP2Gun(conf, answLog) return phttp.WrapGun(gun), err } }, phttp.DefaultHTTP2GunConfig) register.Gun("connect", func(conf phttp.ConnectGunConfig) func() core.Gun { _ = preResolveTargetAddr(&conf.Client, &conf.Target) + answLog := answlog.Init(conf.BaseGunConfig.AnswLog.Path) return func() core.Gun { - return phttp.WrapGun(phttp.NewConnectGun(conf)) + return phttp.WrapGun(phttp.NewConnectGun(conf, answLog)) } }, phttp.DefaultConnectGunConfig) } diff --git a/core/core.go b/core/core.go index 6de0bf12a..51ac7a7c2 100644 --- a/core/core.go +++ b/core/core.go @@ -63,7 +63,8 @@ type Provider interface { // WARN: another fields could be added in next MINOR versions. // That is NOT considered as a breaking compatibility change. type ProviderDeps struct { - Log *zap.Logger + Log *zap.Logger + PoolID string } //go:generate mockery -name=Gun -case=underscore -outpkg=coremock @@ -105,6 +106,8 @@ type GunDeps struct { // There is a race between Instances for Ammo Acquire, so it's not guaranteed, that // Instance with lower InstanceId gets it's Ammo earlier. InstanceID int + PoolID string + // TODO(skipor): https://github.com/yandex/pandora/issues/71 // Pass parallelism value. InstanceId MUST be -1 if parallelism > 1. } diff --git a/core/engine/engine.go b/core/engine/engine.go index 5979d52a1..502d18ba8 100644 --- a/core/engine/engine.go +++ b/core/engine/engine.go @@ -10,6 +10,8 @@ import ( "fmt" "sync" + "github.com/yandex/pandora/core/warmup" + "github.com/pkg/errors" "go.uber.org/zap" @@ -112,7 +114,7 @@ func (e *Engine) Wait() { func newPool(log *zap.Logger, m Metrics, onWaitDone func(), conf InstancePoolConfig) *instancePool { log = log.With(zap.String("pool", conf.ID)) - return &instancePool{log, m, onWaitDone, conf} + return &instancePool{log, m, onWaitDone, conf, nil} } type instancePool struct { @@ -120,6 +122,7 @@ type instancePool struct { metrics Metrics onWaitDone func() InstancePoolConfig + gunWarmUpResult interface{} } // Run start instance pool. Run blocks until fail happen, or all instances finish. @@ -141,6 +144,11 @@ func (p *instancePool) Run(ctx context.Context) error { cancel() }() + if err := p.warmUpGun(ctx); err != nil { + p.onWaitDone() + return err + } + rh, err := p.runAsync(ctx) if err != nil { return err @@ -162,6 +170,23 @@ func (p *instancePool) Run(ctx context.Context) error { } } +func (p *instancePool) warmUpGun(ctx context.Context) error { + dummyGun, err := p.NewGun() + if err != nil { + return fmt.Errorf("can't initiate a gun: %w", err) + } + if gunWithWarmUp, ok := dummyGun.(warmup.WarmedUp); ok { + p.gunWarmUpResult, err = gunWithWarmUp.WarmUp(&warmup.Options{ + Log: p.log, + Ctx: ctx, + }) + if err != nil { + return fmt.Errorf("gun warm up failed: %w", err) + } + } + return nil +} + type poolAsyncRunHandle struct { runCtx context.Context runCancel context.CancelFunc @@ -195,7 +220,7 @@ func (p *instancePool) runAsync(runCtx context.Context) (*poolAsyncRunHandle, er runRes = make(chan instanceRunResult, runResultBufSize) ) go func() { - deps := core.ProviderDeps{Log: p.log} + deps := core.ProviderDeps{Log: p.log, PoolID: p.ID} providerErr <- p.Provider.Run(runCtx, deps) }() go func() { @@ -340,7 +365,7 @@ func (p *instancePool) startInstances( p.Aggregator, newInstanceSchedule, p.NewGun, - instanceSharedDeps{p.Provider, p.metrics}, + instanceSharedDeps{p.Provider, p.metrics, p.gunWarmUpResult}, } waiter := coreutil.NewWaiter(p.StartupSchedule, startCtx) @@ -351,7 +376,7 @@ func (p *instancePool) startInstances( err = startCtx.Err() return } - firstInstance, err := newInstance(runCtx, p.log, 0, deps) + firstInstance, err := newInstance(runCtx, p.log, p.ID, 0, deps) if err != nil { return } @@ -364,7 +389,7 @@ func (p *instancePool) startInstances( for ; waiter.Wait(); started++ { id := started go func() { - runRes <- instanceRunResult{id, runNewInstance(runCtx, p.log, id, deps)} + runRes <- instanceRunResult{id, runNewInstance(runCtx, p.log, p.ID, id, deps)} }() } err = startCtx.Err() @@ -399,8 +424,8 @@ func (p *instancePool) buildNewInstanceSchedule(startCtx context.Context, cancel return } -func runNewInstance(ctx context.Context, log *zap.Logger, id int, deps instanceDeps) error { - instance, err := newInstance(ctx, log, id, deps) +func runNewInstance(ctx context.Context, log *zap.Logger, poolID string, id int, deps instanceDeps) error { + instance, err := newInstance(ctx, log, poolID, id, deps) if err != nil { return err } diff --git a/core/engine/instance.go b/core/engine/instance.go index 2bb4a6b90..0fe81d261 100644 --- a/core/engine/instance.go +++ b/core/engine/instance.go @@ -7,8 +7,11 @@ package engine import ( "context" + "fmt" "io" + "github.com/yandex/pandora/core/warmup" + "github.com/pkg/errors" "go.uber.org/zap" @@ -25,9 +28,9 @@ type instance struct { instanceSharedDeps } -func newInstance(ctx context.Context, log *zap.Logger, id int, deps instanceDeps) (*instance, error) { +func newInstance(ctx context.Context, log *zap.Logger, poolID string, id int, deps instanceDeps) (*instance, error) { log = log.With(zap.Int("instance", id)) - gunDeps := core.GunDeps{Ctx: ctx, Log: log, InstanceID: id} + gunDeps := core.GunDeps{Ctx: ctx, Log: log, PoolID: poolID, InstanceID: id} sched, err := deps.newSchedule() if err != nil { return nil, err @@ -36,6 +39,11 @@ func newInstance(ctx context.Context, log *zap.Logger, id int, deps instanceDeps if err != nil { return nil, err } + if warmedUp, ok := gun.(warmup.WarmedUp); ok { + if err := warmedUp.AcceptWarmUpResult(deps.gunWarmUpResult); err != nil { + return nil, fmt.Errorf("gun failed to accept warmup result: %w", err) + } + } err = gun.Bind(deps.aggregator, gunDeps) if err != nil { return nil, err @@ -54,6 +62,7 @@ type instanceDeps struct { type instanceSharedDeps struct { provider core.Provider Metrics + gunWarmUpResult interface{} } // Run blocks until ammo finish, error or context cancel. diff --git a/core/engine/instance_test.go b/core/engine/instance_test.go index e13287b1c..e64d8fbf3 100644 --- a/core/engine/instance_test.go +++ b/core/engine/instance_test.go @@ -54,9 +54,10 @@ var _ = Describe("Instance", func() { instanceSharedDeps{ provider, metrics, + nil, }, } - ins, insCreateErr = newInstance(ctx, ginkgoutil.NewLogger(), 0, deps) + ins, insCreateErr = newInstance(ctx, ginkgoutil.NewLogger(), "pool_0", 0, deps) }) AfterEach(func() { diff --git a/core/warmup/interface.go b/core/warmup/interface.go new file mode 100644 index 000000000..6438074bf --- /dev/null +++ b/core/warmup/interface.go @@ -0,0 +1,6 @@ +package warmup + +type WarmedUp interface { + WarmUp(*Options) (interface{}, error) + AcceptWarmUpResult(interface{}) error +} diff --git a/core/warmup/options.go b/core/warmup/options.go new file mode 100644 index 000000000..a09d8224a --- /dev/null +++ b/core/warmup/options.go @@ -0,0 +1,12 @@ +package warmup + +import ( + "context" + + "go.uber.org/zap" +) + +type Options struct { + Log *zap.Logger + Ctx context.Context +} diff --git a/docs/custom.rst b/docs/custom.rst index c72c1a537..31a78f8cf 100644 --- a/docs/custom.rst +++ b/docs/custom.rst @@ -361,7 +361,7 @@ Websockets } else { code = 200 } - defer func() { + func() { sample.SetProtoCode(code) g.aggr.Report(sample) }() diff --git a/docs/tutorial.rst b/docs/tutorial.rst index e5db5e0b0..d4aff36b8 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -17,7 +17,7 @@ Pandora supports config files in `YAML`_ format. Create a new file named ``load. type: http # gun type target: example.com:80 # gun target ammo: - type: uri # ammo format + type: uri # ammo format file: ./ammo.uri # ammo File result: type: phout # report format (phout is compatible with Yandex.Tank) @@ -77,5 +77,5 @@ References .. target-notes:: .. _`Overload`: https://overload.yandex.net -.. _`Yandex.Tank`: http://yandextank.readthedocs.org/en/latest/configuration.html#pandora -.. _`YAML`: https://en.wikipedia.org/wiki/YAML \ No newline at end of file +.. _`Yandex.Tank`: https://yandextank.readthedocs.io/en/latest/core_and_modules.html#pandora +.. _`YAML`: https://en.wikipedia.org/wiki/YAML diff --git a/lib/answlog/logger.go b/lib/answlog/logger.go new file mode 100644 index 000000000..28720a1c5 --- /dev/null +++ b/lib/answlog/logger.go @@ -0,0 +1,26 @@ +package answlog + +import ( + "os" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func Init(path string) *zap.Logger { + writerSyncer := getAnswWriter(path) + encoder := zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()) + core := zapcore.NewCore(encoder, writerSyncer, zapcore.DebugLevel) + + Log := zap.New(core) + defer Log.Sync() + return Log +} + +func getAnswWriter(path string) zapcore.WriteSyncer { + if path == "" { + path = "./answ.log" + } + file, _ := os.Create(path) + return zapcore.AddSync(file) +}