Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify http and grpc proxy to receive a listener rather than creating it #321

Merged
merged 2 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions cmd/agent/commands/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,22 @@ func BuildGrpcCmd(env runtime.Environment, config *agent.Config) *cobra.Command
return fmt.Errorf("upstream host cannot be localhost when running in transparent mode")
}

agent, err := agent.Start(env, config)
if err != nil {
return fmt.Errorf("initializing agent: %w", err)
}

defer agent.Stop()

listenAddress := net.JoinHostPort("", fmt.Sprint(port))
upstreamAddress := net.JoinHostPort(upstreamHost, fmt.Sprint(targetPort))

proxyConfig := grpc.ProxyConfig{
ListenAddress: listenAddress,
UpstreamAddress: upstreamAddress,
listener, err := net.Listen("tcp", listenAddress)
if err != nil {
return fmt.Errorf("setting up listener at %q: %w", listenAddress, err)
}

proxy, err := grpc.NewProxy(proxyConfig, disruption)
proxy, err := grpc.NewProxy(listener, upstreamAddress, disruption)
if err != nil {
return err
}
Expand Down Expand Up @@ -80,8 +87,6 @@ func BuildGrpcCmd(env runtime.Environment, config *agent.Config) *cobra.Command
return err
}

agent := agent.BuildAgent(env, config)

return agent.ApplyDisruption(cmd.Context(), disruptor, duration)
},
}
Expand Down
17 changes: 11 additions & 6 deletions cmd/agent/commands/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,22 @@ func BuildHTTPCmd(env runtime.Environment, config *agent.Config) *cobra.Command
return fmt.Errorf("upstream host cannot be localhost when running in transparent mode")
}

agent, err := agent.Start(env, config)
if err != nil {
return fmt.Errorf("initializing agent: %w", err)
}

defer agent.Stop()

listenAddress := net.JoinHostPort("", fmt.Sprint(port))
upstreamAddress := "http://" + net.JoinHostPort(upstreamHost, fmt.Sprint(targetPort))

proxyConfig := http.ProxyConfig{
ListenAddress: listenAddress,
UpstreamAddress: upstreamAddress,
listener, err := net.Listen("tcp", listenAddress)
if err != nil {
return fmt.Errorf("setting up listener at %q: %w", listenAddress, err)
}

proxy, err := http.NewProxy(proxyConfig, disruption)
proxy, err := http.NewProxy(listener, upstreamAddress, disruption)
if err != nil {
return err
}
Expand Down Expand Up @@ -79,8 +86,6 @@ func BuildHTTPCmd(env runtime.Environment, config *agent.Config) *cobra.Command
return err
}

agent := agent.BuildAgent(env, config)

return agent.ApplyDisruption(cmd.Context(), disruptor, duration)
},
}
Expand Down
63 changes: 39 additions & 24 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package agent
import (
"context"
"fmt"
"io"
"os"
"syscall"
"time"

Expand All @@ -19,48 +21,51 @@ type Config struct {

// Agent maintains the state required for executing an agent command
type Agent struct {
env runtime.Environment
config *Config
env runtime.Environment
sc <-chan os.Signal
profileCloser io.Closer
}

// BuildAgent builds a instance of an agent
func BuildAgent(env runtime.Environment, config *Config) *Agent {
return &Agent{
env: env,
config: config,
// Start creates and starts a new instance of an agent.
// Returned agent is guaranteed to be unique in the environment it is running, and will handle signals sent to the
// process.
// Callers must Stop the returned agent at the end of its lifecycle.
func Start(env runtime.Environment, config *Config) (*Agent, error) {
a := &Agent{
env: env,
}

if err := a.start(config); err != nil {
a.Stop() // Stop any initialized component if initialization failed.
return nil, err
}

return a, nil
}

// ApplyDisruption applies a disruption to the target
func (r *Agent) ApplyDisruption(ctx context.Context, disruptor protocol.Disruptor, duration time.Duration) error {
sc := r.env.Signal().Notify(syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
defer func() {
r.env.Signal().Reset()
}()
func (a *Agent) start(config *Config) error {
a.sc = a.env.Signal().Notify(syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)

acquired, err := r.env.Lock().Acquire()
acquired, err := a.env.Lock().Acquire()
if err != nil {
return fmt.Errorf("could not acquire process lock: %w", err)
}

if !acquired {
return fmt.Errorf("another instance of the agent is already running")
}

defer func() {
_ = r.env.Lock().Release()
}()

// start profiler
profiler, err := r.env.Profiler().Start(ctx, *r.config.Profiler)
a.profileCloser, err = a.env.Profiler().Start(*config.Profiler)
if err != nil {
return fmt.Errorf("could not create profiler %w", err)
}

// ensure the profiler is closed even if there's an error executing the command
defer func() {
_ = profiler.Close()
}()
return nil
}

// ApplyDisruption applies a disruption to the target
func (a *Agent) ApplyDisruption(ctx context.Context, disruptor protocol.Disruptor, duration time.Duration) error {
// set context for command
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -83,7 +88,17 @@ func (r *Agent) ApplyDisruption(ctx context.Context, disruptor protocol.Disrupto
return ctx.Err()
case err := <-cc:
return err
case s := <-sc:
case s := <-a.sc:
return fmt.Errorf("received signal %q", s)
}
}

// Stop stops a running agent: It releases
func (a *Agent) Stop() {
a.env.Signal().Reset()
_ = a.env.Lock().Release()

if a.profileCloser != nil {
_ = a.profileCloser.Close()
}
}
18 changes: 14 additions & 4 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ func Test_CancelContext(t *testing.T) {
t.Parallel()
env := runtime.NewFakeRuntime(tc.args, tc.vars)

agent := BuildAgent(env, tc.config)
agent, err := Start(env, tc.config)
if err != nil {
t.Fatalf("starting agent: %v", err)
}

defer agent.Stop()

ctx, cancel := context.WithCancel(context.Background())
go func() {
Expand All @@ -72,7 +77,7 @@ func Test_CancelContext(t *testing.T) {
}()

disruptor := &FakeProtocolDisruptor{}
err := agent.ApplyDisruption(ctx, disruptor, tc.delay)
err = agent.ApplyDisruption(ctx, disruptor, tc.delay)
if !errors.Is(err, tc.expected) {
t.Errorf("expected %v got %v", tc.err, err)
}
Expand Down Expand Up @@ -126,7 +131,12 @@ func Test_Signals(t *testing.T) {
t.Parallel()
env := runtime.NewFakeRuntime(tc.args, tc.vars)

agent := BuildAgent(env, tc.config)
agent, err := Start(env, tc.config)
if err != nil {
t.Fatalf("starting agent: %v", err)
}

defer agent.Stop()

go func() {
time.Sleep(1 * time.Second)
Expand All @@ -136,7 +146,7 @@ func Test_Signals(t *testing.T) {
}()

disruptor := &FakeProtocolDisruptor{}
err := agent.ApplyDisruption(context.TODO(), disruptor, tc.delay)
err = agent.ApplyDisruption(context.TODO(), disruptor, tc.delay)
if tc.expectErr && err == nil {
t.Errorf("should had failed")
return
Expand Down
95 changes: 36 additions & 59 deletions pkg/agent/protocol/grpc/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,6 @@ import (
"google.golang.org/grpc"
)

// ProxyConfig configures the Proxy options
type ProxyConfig struct {
// network used for communication (valid values are "unix" and "tcp")
Network string
// Address to listen for incoming requests
ListenAddress string
// Address where to redirect requests
UpstreamAddress string
}

// Disruption specifies disruptions in grpc requests
type Disruption struct {
// Average delay introduced to requests
Expand All @@ -41,27 +31,15 @@ type Disruption struct {

// Proxy defines the parameters used by the proxy for processing grpc requests and its execution state
type proxy struct {
config ProxyConfig
disruption Disruption
srv *grpc.Server
metrics *protocol.MetricMap
listener net.Listener
srv *grpc.Server
cancel func()
metrics *protocol.MetricMap
}

// NewProxy return a new Proxy
func NewProxy(c ProxyConfig, d Disruption) (protocol.Proxy, error) {
if c.Network == "" {
c.Network = "tcp"
}

if c.Network != "tcp" && c.Network != "unix" {
return nil, fmt.Errorf("only 'tcp' and 'unix' (for sockets) networks are supported")
}

if c.ListenAddress == "" {
return nil, fmt.Errorf("proxy's listening address must be provided")
}

if c.UpstreamAddress == "" {
func NewProxy(listener net.Listener, upstreamAddress string, d Disruption) (protocol.Proxy, error) {
if upstreamAddress == "" {
return nil, fmt.Errorf("proxy's forwarding address must be provided")
}

Expand All @@ -77,40 +55,40 @@ func NewProxy(c ProxyConfig, d Disruption) (protocol.Proxy, error) {
return nil, fmt.Errorf("status code cannot be 0 (OK)")
}

return &proxy{
disruption: d,
config: c,
metrics: protocol.NewMetricMap(
protocol.MetricRequests,
protocol.MetricRequestsExcluded,
protocol.MetricRequestsDisrupted,
),
}, nil
}

// Start starts the execution of the proxy
func (p *proxy) Start() error {
// should receive the context as a parameter of the Start function
ctx, cancel := context.WithCancel(context.Background())
conn, err := grpc.DialContext(
context.Background(),
p.config.UpstreamAddress,
ctx,
upstreamAddress,
grpc.WithInsecure(),
roobre marked this conversation as resolved.
Show resolved Hide resolved
)
if err != nil {
return fmt.Errorf("error dialing %s: %w", p.config.UpstreamAddress, err)
cancel()
return nil, fmt.Errorf("error dialing %s: %w", upstreamAddress, err)
}
handler := NewHandler(p.disruption, conn, p.metrics)

p.srv = grpc.NewServer(
metrics := protocol.NewMetricMap(
protocol.MetricRequests,
protocol.MetricRequestsExcluded,
protocol.MetricRequestsDisrupted,
)

handler := NewHandler(d, conn, metrics)

srv := grpc.NewServer(
grpc.UnknownServiceHandler(handler),
)

listener, err := net.Listen(p.config.Network, p.config.ListenAddress)
if err != nil {
return fmt.Errorf("error listening to %s: %w", p.config.ListenAddress, err)
}
return &proxy{
listener: listener,
srv: srv,
cancel: cancel,
metrics: metrics,
}, nil
}

err = p.srv.Serve(listener)
// Start starts the execution of the proxy
func (p *proxy) Start() error {
err := p.srv.Serve(p.listener)
if err != nil {
return fmt.Errorf("proxy terminated with error: %w", err)
}
Expand All @@ -120,23 +98,22 @@ func (p *proxy) Start() error {

// Stop stops the execution of the proxy
func (p *proxy) Stop() error {
if p.srv != nil {
p.srv.GracefulStop()
}
p.cancel()
p.srv.GracefulStop()

return nil
}

// Metrics returns runtime metrics for the proxy.
// TODO: Add metrics.
func (p *proxy) Metrics() map[string]uint {
return p.metrics.Map()
}

// Force stops the proxy without waiting for connections to drain
// In grpc this action is a nop
func (p *proxy) Force() error {
if p.srv != nil {
p.srv.Stop()
}
p.cancel()
p.srv.Stop()

return nil
}
Loading
Loading