Skip to content

Commit

Permalink
Merge pull request #146 from yandex/arcadia
Browse files Browse the repository at this point in the history
pool reflection once for grpc gun
  • Loading branch information
ligreen authored Jan 25, 2022
2 parents e7713bc + a83475b commit 7e36fd3
Show file tree
Hide file tree
Showing 20 changed files with 274 additions and 61 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Run the binary with your config (see config examples at [examples](https://githu
pandora myconfig.yaml
```

Or use Pandora with [Yandex.Tank](http://yandextank.readthedocs.org/en/latest/configuration.html#pandora) and
Or use Pandora with [Yandex.Tank](https://yandextank.readthedocs.io/en/latest/core_and_modules.html#pandora) and
[Overload](https://overload.yandex.net).

### Documentation
Expand Down
2 changes: 1 addition & 1 deletion cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ const Version = "0.3.5"
const defaultConfigFile = "load"
const stdinConfigSelector = "-"

var useStdinConfig = false
var configSearchDirs = []string{"./", "./config", "/etc/pandora"}

type cliConfig struct {
Expand Down Expand Up @@ -189,6 +188,7 @@ func readConfig() *cliConfig {

v := newViper()

var useStdinConfig = false
args := flag.Args()
if len(args) > 0 {
switch {
Expand Down
81 changes: 56 additions & 25 deletions components/grpc/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package grpc
import (
"context"
"encoding/json"
"fmt"
"log"
"time"

"github.com/jhump/protoreflect/grpcreflect"
"github.com/yandex/pandora/core/warmup"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"

"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/aggregator/netsample"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"github.com/jhump/protoreflect/grpcreflect"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
)

Expand All @@ -34,7 +37,8 @@ type Sample struct {
}

type GunConfig struct {
Target string `validate:"required"`
Target string `validate:"required"`
Timeout time.Duration `config:"timeout"` // grpc request timeout
}

type Gun struct {
Expand All @@ -48,52 +52,72 @@ type Gun struct {
services map[string]desc.MethodDescriptor
}

func NewGun(conf GunConfig) *Gun {
return &Gun{conf: conf}
}

func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error {
func (g *Gun) WarmUp(opts *warmup.Options) (interface{}, error) {
conn, err := grpc.Dial(
g.conf.Target,
grpc.WithInsecure(),
grpc.WithTimeout(time.Second),
grpc.WithUserAgent("load test, pandora universal grpc shooter"))
if err != nil {
log.Fatalf("FATAL: grpc.Dial failed\n %s\n", err)
}
g.client = conn
g.aggr = aggr
g.GunDeps = deps
g.stub = grpcdynamic.NewStub(conn)

log := deps.Log

if ent := log.Check(zap.DebugLevel, "Gun bind"); ent != nil {
// Enable debug level logging during shooting. Creating log entries isn't free.
g.DebugLog = true
return nil, fmt.Errorf("failed to connect to target: %w", err)
}
defer conn.Close()

meta := make(metadata.MD)
refCtx := metadata.NewOutgoingContext(context.Background(), meta)
refClient := grpcreflect.NewClient(refCtx, reflectpb.NewServerReflectionClient(conn))
listServices, err := refClient.ListServices()
if err != nil {
log.Fatal("Fatal: failed to get services list\n %s\n", zap.Error(err))
opts.Log.Fatal("Fatal: failed to get services list\n %s\n", zap.Error(err))
}
g.services = make(map[string]desc.MethodDescriptor)
services := make(map[string]desc.MethodDescriptor)
for _, s := range listServices {
service, err := refClient.ResolveService(s)
if err != nil {
if grpcreflect.IsElementNotFoundError(err) {
continue
}
log.Fatal("FATAL ResolveService: %s", zap.Error(err))
opts.Log.Fatal("FATAL ResolveService: %s", zap.Error(err))
}
listMethods := service.GetMethods()
for _, m := range listMethods {
g.services[m.GetFullyQualifiedName()] = *m
services[m.GetFullyQualifiedName()] = *m
}
}
return services, nil
}

func (g *Gun) AcceptWarmUpResult(i interface{}) error {
services, ok := i.(map[string]desc.MethodDescriptor)
if !ok {
return fmt.Errorf("grpc WarmUp result should be services: map[string]desc.MethodDescriptor")
}
g.services = services
return nil
}

func NewGun(conf GunConfig) *Gun {
return &Gun{conf: conf}
}

func (g *Gun) Bind(aggr core.Aggregator, deps core.GunDeps) error {
conn, err := grpc.Dial(
g.conf.Target,
grpc.WithInsecure(),
grpc.WithTimeout(time.Second),
grpc.WithUserAgent("load test, pandora universal grpc shooter"))
if err != nil {
log.Fatalf("FATAL: grpc.Dial failed\n %s\n", err)
}
g.client = conn
g.aggr = aggr
g.GunDeps = deps
g.stub = grpcdynamic.NewStub(conn)

if ent := deps.Log.Check(zap.DebugLevel, "Gun bind"); ent != nil {
// Enable debug level logging during shooting. Creating log entries isn't free.
g.DebugLog = true
}

return nil
}
Expand Down Expand Up @@ -140,7 +164,14 @@ func (g *Gun) shoot(ammo *Ammo) {
}
}

ctx := metadata.NewOutgoingContext(context.Background(), meta)
timeout := time.Second * 15
if g.conf.Timeout != 0 {
timeout = time.Second * g.conf.Timeout
}

ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ctx = metadata.NewOutgoingContext(ctx, meta)
out, err := g.stub.InvokeRpc(ctx, &method, message)
code = convertGrpcStatus(err)

Expand Down
12 changes: 12 additions & 0 deletions components/grpc/core_tests/core_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package core

import (
"testing"

"github.com/yandex/pandora/components/grpc"
"github.com/yandex/pandora/core/warmup"
)

func TestGrpcGunImplementsWarmedUp(t *testing.T) {
_ = warmup.WarmedUp(&grpc.Gun{})
}
72 changes: 70 additions & 2 deletions components/phttp/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@
package phttp

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httputil"
"net/url"

"github.com/pkg/errors"
Expand All @@ -25,6 +28,7 @@ const (

type BaseGunConfig struct {
AutoTag AutoTagConfig `config:"auto-tag"`
AnswLog AnswLogConfig `config:"answlog"`
}

// AutoTagConfig configure automatic tags generation based on ammo URI. First AutoTag URI path elements becomes tag.
Expand All @@ -35,13 +39,25 @@ type AutoTagConfig struct {
NoTagOnly bool `config:"no-tag-only"` // When true, autotagged only ammo that has no tag before.
}

type AnswLogConfig struct {
Enabled bool `config:"enabled"`
Path string `config:"path"`
Filter string `config:"filter" valid:"oneof=all warning error"`
}

func DefaultBaseGunConfig() BaseGunConfig {
return BaseGunConfig{
AutoTagConfig{
Enabled: false,
URIElements: 2,
NoTagOnly: true,
}}
},
AnswLogConfig{
Enabled: false,
Path: "answ.log",
Filter: "error",
},
}
}

type BaseGun struct {
Expand All @@ -51,6 +67,7 @@ type BaseGun struct {
Connect func(ctx context.Context) error // Optional hook.
OnClose func() error // Optional. Called on Close().
Aggregator netsample.Aggregator // Lazy set via BindResultTo.
AnswLog *zap.Logger
core.GunDeps
}

Expand Down Expand Up @@ -78,6 +95,7 @@ func (b *BaseGun) Bind(aggregator netsample.Aggregator, deps core.GunDeps) error

// Shoot is thread safe iff Do and Connect hooks are thread safe.
func (b *BaseGun) Shoot(ammo Ammo) {
var bodyBytes []byte
if b.Aggregator == nil {
zap.L().Panic("must bind before shoot")
}
Expand All @@ -100,13 +118,15 @@ func (b *BaseGun) Shoot(ammo Ammo) {
if b.DebugLog {
b.Log.Debug("Prepared ammo to shoot", zap.Stringer("url", req.URL))
}

if b.Config.AutoTag.Enabled && (!b.Config.AutoTag.NoTagOnly || sample.Tags() == "") {
sample.AddTag(autotag(b.Config.AutoTag.URIElements, req.URL))
}
if sample.Tags() == "" {
sample.AddTag(EmptyTag)
}
if b.Config.AnswLog.Enabled {
bodyBytes = GetBody(req)
}

var err error
defer func() {
Expand All @@ -127,6 +147,22 @@ func (b *BaseGun) Shoot(ammo Ammo) {
if b.DebugLog {
b.verboseLogging(res)
}
if b.Config.AnswLog.Enabled {
switch b.Config.AnswLog.Filter {
case "all":
b.answLogging(req, bodyBytes, res)

case "warning":
if res.StatusCode >= 400 {
b.answLogging(req, bodyBytes, res)
}

case "error":
if res.StatusCode >= 500 {
b.answLogging(req, bodyBytes, res)
}
}
}

sample.SetProtoCode(res.StatusCode)
defer res.Body.Close()
Expand Down Expand Up @@ -177,6 +213,27 @@ func (b *BaseGun) verboseLogging(res *http.Response) {
)
}

func (b *BaseGun) answLogging(req *http.Request, bodyBytes []byte, res *http.Response) {
isBody := false
if bodyBytes != nil {
req.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
isBody = true
}
dump, err := httputil.DumpRequestOut(req, isBody)
if err != nil {
zap.L().Error("Error dumping request: %s", zap.Error(err))
}
msg := fmt.Sprintf("REQUEST:\n%s\n\n", string(dump))
b.AnswLog.Debug(msg)

dump, err = httputil.DumpResponse(res, true)
if err != nil {
zap.L().Error("Error dumping response: %s", zap.Error(err))
}
msg = fmt.Sprintf("RESPONSE:\n%s", string(dump))
b.AnswLog.Debug(msg)
}

func autotag(depth int, URL *url.URL) string {
path := URL.Path
var ind int
Expand All @@ -190,3 +247,14 @@ func autotag(depth int, URL *url.URL) string {
}
return path[:ind]
}

func GetBody(req *http.Request) []byte {
if req.Body != nil && req.Body != http.NoBody {
bodyBytes, _ := ioutil.ReadAll(req.Body)
req.Body = ioutil.NopCloser(bytes.NewBuffer(bodyBytes))
return bodyBytes
}

return nil

}
2 changes: 2 additions & 0 deletions components/phttp/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"

ammomock "github.com/yandex/pandora/components/phttp/mocks"
"github.com/yandex/pandora/core"
Expand Down Expand Up @@ -109,6 +110,7 @@ var _ = Describe("BaseGun", func() {
Context("Do ok", func() {
BeforeEach(func() {
body = ioutil.NopCloser(strings.NewReader("aaaaaaa"))
base.AnswLog = zap.NewNop()
base.Do = func(doReq *http.Request) (*http.Response, error) {
Expect(doReq).To(Equal(req))
return res, nil
Expand Down
4 changes: 3 additions & 1 deletion components/phttp/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/pkg/errors"
"github.com/yandex/pandora/lib/netutil"
"go.uber.org/zap"
)

type ConnectGunConfig struct {
Expand All @@ -26,7 +27,7 @@ type ConnectGunConfig struct {
BaseGunConfig `config:",squash"`
}

func NewConnectGun(conf ConnectGunConfig) *ConnectGun {
func NewConnectGun(conf ConnectGunConfig, answLog *zap.Logger) *ConnectGun {
scheme := "http"
if conf.SSL {
scheme = "https"
Expand All @@ -41,6 +42,7 @@ func NewConnectGun(conf ConnectGunConfig) *ConnectGun {
client.CloseIdleConnections()
return nil
},
AnswLog: answLog,
},
scheme: scheme,
client: client,
Expand Down
4 changes: 3 additions & 1 deletion components/phttp/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"go.uber.org/zap"

"github.com/yandex/pandora/core/aggregator/netsample"
)
Expand Down Expand Up @@ -85,9 +86,10 @@ var _ = Describe("connect", func() {
proxy := httptest.NewServer(tunnelHandler(origin.URL))
defer proxy.Close()

log := zap.NewNop()
conf := DefaultConnectGunConfig()
conf.Target = proxy.Listener.Addr().String()
connectGun := NewConnectGun(conf)
connectGun := NewConnectGun(conf, log)

results := &netsample.TestAggregator{}
_ = connectGun.Bind(results, testDeps())
Expand Down
Loading

0 comments on commit 7e36fd3

Please sign in to comment.