Skip to content

Commit

Permalink
Merge pull request #155 from yandex/arcadia-conflict
Browse files Browse the repository at this point in the history
Arcadia conflict
  • Loading branch information
trueival authored Sep 13, 2022
2 parents 6ac8e04 + 1ac7a8a commit 4f058fb
Show file tree
Hide file tree
Showing 90 changed files with 881 additions and 387 deletions.
3 changes: 1 addition & 2 deletions acceptance_tests/acceptance_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import (
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/onsi/gomega/gexec"
"go.uber.org/zap"

"github.com/yandex/pandora/lib/ginkgoutil"
"github.com/yandex/pandora/lib/tag"
"go.uber.org/zap"
)

var pandoraBin string
Expand Down
4 changes: 1 addition & 3 deletions acceptance_tests/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package acceptance

import (
"net/http"

"golang.org/x/net/http2"

"net/http/httptest"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"go.uber.org/atomic"
"golang.org/x/net/http2"
)

var _ = Describe("http", func() {
Expand Down
47 changes: 28 additions & 19 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@ import (
"time"

"github.com/spf13/viper"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/yandex/pandora/core/config"
"github.com/yandex/pandora/core/engine"
"github.com/yandex/pandora/lib/zaputil"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const Version = "0.3.5"
const Version = "0.4.0"
const defaultConfigFile = "load"
const stdinConfigSelector = "-"

Expand Down Expand Up @@ -129,46 +128,56 @@ func Run() {
errs := make(chan error)
go runEngine(ctx, pandora, errs)

// waiting for signal or error message from engine
awaitPandoraTermination(pandora, cancel, errs, log)
log.Info("Engine run successfully finished")
}

// helper function that awaits pandora run
func awaitPandoraTermination(pandora *engine.Engine, gracefulShutdown func(), errs chan error, log *zap.Logger) {
sigs := make(chan os.Signal, 2)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

// waiting for signal or error message from engine
select {
case sig := <-sigs:
var interruptTimeout = 3 * time.Second
switch sig {
case syscall.SIGINT:
const interruptTimeout = 5 * time.Second
log.Info("SIGINT received. Trying to stop gracefully.", zap.Duration("timeout", interruptTimeout))
cancel()
select {
case <-time.After(interruptTimeout):
log.Fatal("Interrupt timeout exceeded")
case sig := <-sigs:
log.Fatal("Another signal received. Quiting.", zap.Stringer("signal", sig))
case err := <-errs:
log.Fatal("Engine interrupted", zap.Error(err))
}
// await gun timeout but no longer than 30 sec.
interruptTimeout = 30 * time.Second
log.Info("SIGINT received. Graceful shutdown.", zap.Duration("timeout", interruptTimeout))
gracefulShutdown()
case syscall.SIGTERM:
log.Fatal("SIGTERM received. Quiting.")
log.Info("SIGTERM received. Trying to stop gracefully.", zap.Duration("timeout", interruptTimeout))
gracefulShutdown()
default:
log.Fatal("Unexpected signal received. Quiting.", zap.Stringer("signal", sig))
}

select {
case <-time.After(interruptTimeout):
log.Fatal("Interrupt timeout exceeded")
case sig := <-sigs:
log.Fatal("Another signal received. Quiting.", zap.Stringer("signal", sig))
case err := <-errs:
log.Fatal("Engine interrupted", zap.Error(err))
}

case err := <-errs:
switch err {
case nil:
log.Info("Pandora engine successfully finished it's work")
case err:
const awaitTimeout = 3 * time.Second
log.Error("Engine run failed. Awaiting started tasks.", zap.Error(err), zap.Duration("timeout", awaitTimeout))
cancel()
gracefulShutdown()
time.AfterFunc(awaitTimeout, func() {
log.Fatal("Engine tasks timeout exceeded.")
})
pandora.Wait()
log.Fatal("Engine run failed. Pandora graceful shutdown successfully finished")
}
}
log.Info("Engine run successfully finished")
}

func runEngine(ctx context.Context, engine *engine.Engine, errs chan error) {
Expand Down
3 changes: 1 addition & 2 deletions cli/expvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ package cli
import (
"time"

"go.uber.org/zap"

"github.com/yandex/pandora/core/engine"
"github.com/yandex/pandora/lib/monitoring"
"go.uber.org/zap"
)

func newEngineMetrics() engine.Metrics {
Expand Down
3 changes: 1 addition & 2 deletions components/example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,9 @@ import (
"fmt"
"sync"

"go.uber.org/zap"

"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/aggregator/netsample"
"go.uber.org/zap"
)

type Ammo struct {
Expand Down
13 changes: 8 additions & 5 deletions components/grpc/ammo/grpcjson/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ import (
"bufio"
"context"

"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/yandex/pandora/components/grpc/ammo"

jsoniter "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/spf13/afero"
"github.com/yandex/pandora/components/grpc/ammo"
"github.com/yandex/pandora/lib/confutil"
"go.uber.org/zap"
)

func NewProvider(fs afero.Fs, conf Config) *Provider {
Expand Down Expand Up @@ -51,6 +50,7 @@ type Config struct {
//Maximum number of byte in an ammo. Default is bufio.MaxScanTokenSize
MaxAmmoSize int
Source Source `config:"source"`
ChosenCases []string
}

func (p *Provider) start(ctx context.Context, ammoFile afero.File) error {
Expand All @@ -72,6 +72,9 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error {
return errors.Wrapf(err, "failed to decode ammo at line: %v; data: %q", line, data)
}
}
if !confutil.IsChosenCase(a.Tag, p.Config.ChosenCases) {
continue
}
ammoNum++
select {
case p.Sink <- a:
Expand Down
3 changes: 1 addition & 2 deletions components/grpc/ammo/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (

"github.com/pkg/errors"
"github.com/spf13/afero"
"go.uber.org/atomic"

"github.com/yandex/pandora/core"
"go.uber.org/atomic"
)

func NewProvider(fs afero.Fs, fileName string, start func(ctx context.Context, file afero.File) error) Provider {
Expand Down
23 changes: 13 additions & 10 deletions components/grpc/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,20 @@ import (
"log"
"time"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/yandex/pandora/core/warmup"
"google.golang.org/grpc/credentials"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"

"github.com/yandex/pandora/components/grpc/ammo"
"github.com/yandex/pandora/core"
"github.com/yandex/pandora/core/aggregator/netsample"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic"
"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"github.com/yandex/pandora/core/warmup"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
)

Expand All @@ -33,7 +31,8 @@ type Sample struct {
}

type grpcDialOptions struct {
Authority string `config:"authority"`
Authority string `config:"authority"`
Timeout time.Duration `config:"timeout"`
}

type GunConfig struct {
Expand Down Expand Up @@ -186,7 +185,11 @@ func makeGRPCConnect(target string, isTLS bool, dialOptions grpcDialOptions) (co
} else {
opts = append(opts, grpc.WithInsecure())
}
opts = append(opts, grpc.WithTimeout(time.Second))
timeout := time.Second
if dialOptions.Timeout != 0 {
timeout = dialOptions.Timeout
}
opts = append(opts, grpc.WithTimeout(timeout))
opts = append(opts, grpc.WithUserAgent("load test, pandora universal grpc shooter"))

if dialOptions.Authority != "" {
Expand Down
4 changes: 4 additions & 0 deletions components/phttp/ammo/simple/ammo.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,8 @@ func (a *Ammo) IsValid() bool {
return !a.isInvalid
}

func (a *Ammo) Tag() string {
return a.tag
}

var _ phttp.Ammo = (*Ammo)(nil)
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
"github.com/spf13/afero"

"github.com/yandex/pandora/components/phttp/ammo/simple"
"github.com/yandex/pandora/core"
"github.com/yandex/pandora/lib/ginkgoutil"
Expand Down
9 changes: 7 additions & 2 deletions components/phttp/ammo/simple/jsonline/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
"strings"

"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/spf13/afero"
"github.com/yandex/pandora/components/phttp/ammo/simple"
"github.com/yandex/pandora/lib/confutil"
"go.uber.org/zap"
)

func NewProvider(fs afero.Fs, conf Config) *Provider {
Expand All @@ -42,10 +42,12 @@ type Config struct {
ContinueOnError bool
//Maximum number of byte in an ammo. Default is bufio.MaxScanTokenSize
MaxAmmoSize int
ChosenCases []string
}

func (p *Provider) start(ctx context.Context, ammoFile afero.File) error {
var ammoNum, passNum int

for {
passNum++
scanner := bufio.NewScanner(ammoFile)
Expand All @@ -63,6 +65,9 @@ func (p *Provider) start(ctx context.Context, ammoFile afero.File) error {
return errors.Wrapf(err, "failed to decode ammo at line: %v; data: %q", line, data)
}
}
if !confutil.IsChosenCase(a.Tag(), p.Config.ChosenCases) {
continue
}
ammoNum++
select {
case p.Sink <- a:
Expand Down
3 changes: 1 addition & 2 deletions components/phttp/ammo/simple/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (

"github.com/pkg/errors"
"github.com/spf13/afero"
"go.uber.org/atomic"

"github.com/yandex/pandora/core"
"go.uber.org/atomic"
)

func NewProvider(fs afero.Fs, fileName string, start func(ctx context.Context, file afero.File) error) Provider {
Expand Down
26 changes: 0 additions & 26 deletions components/phttp/ammo/simple/raw/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"net/http"
"strconv"
"strings"

"github.com/pkg/errors"
)

type Header struct {
Expand All @@ -29,30 +27,6 @@ func decodeRequest(reqString []byte) (req *http.Request, err error) {
if err != nil {
return
}
if req.Host != "" {
req.URL.Host = req.Host
}
req.RequestURI = ""
return
}

func decodeHTTPConfigHeaders(headers []string) (configHTTPHeaders []Header, err error) {
for _, header := range headers {
line := []byte(header)
if len(line) < 3 || line[0] != '[' || line[len(line)-1] != ']' {
return nil, errors.New("header line should be like '[key: value]")
}
line = line[1 : len(line)-1]
colonIdx := bytes.IndexByte(line, ':')
if colonIdx < 0 {
return nil, errors.New("missing colon")
}
configHTTPHeaders = append(
configHTTPHeaders,
Header{
string(bytes.TrimSpace(line[:colonIdx])),
string(bytes.TrimSpace(line[colonIdx+1:])),
})
}
return
}
16 changes: 7 additions & 9 deletions components/phttp/ammo/simple/raw/decoder_bench_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package raw

import "testing"
import (
"testing"

"github.com/yandex/pandora/components/phttp/ammo/simple"
)

var (
benchTestConfigHeaders = []string{
Expand All @@ -27,16 +31,10 @@ func BenchmarkRawDecoder(b *testing.B) {

func BenchmarkRawDecoderWithHeaders(b *testing.B) {
b.StopTimer()
decodedHTTPConfigHeaders, _ := decodeHTTPConfigHeaders(benchTestConfigHeaders)
decodedHTTPConfigHeaders, _ := simple.DecodeHTTPConfigHeaders(benchTestConfigHeaders)
b.StartTimer()
for i := 0; i < b.N; i++ {
req, _ := decodeRequest([]byte(benchTestRequest))
for _, header := range decodedHTTPConfigHeaders {
if header.key == "Host" {
req.URL.Host = header.value
} else {
req.Header.Set(header.key, header.value)
}
}
simple.UpdateRequestWithHeaders(req, decodedHTTPConfigHeaders)
}
}
Loading

0 comments on commit 4f058fb

Please sign in to comment.