Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Circuit breaking all the way #372

Merged
merged 15 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestGetProxies(t *testing.T) {
Network: config.DefaultNetwork,
Address: config.DefaultAddress,
}
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{})
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{}, nil)
newPool := pool.NewPool(context.TODO(), 1)
assert.Nil(t, newPool.Put(client.ID, client))

Expand Down Expand Up @@ -225,7 +225,7 @@ func TestGetServers(t *testing.T) {
Network: config.DefaultNetwork,
Address: config.DefaultAddress,
}
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{})
client := network.NewClient(context.TODO(), clientConfig, zerolog.Logger{}, nil)
newPool := pool.NewPool(context.TODO(), 1)
assert.Nil(t, newPool.Put(client.ID, client))

Expand Down
47 changes: 42 additions & 5 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ var runCmd = &cobra.Command{
)

// Load plugins and register their hooks.
pluginRegistry.LoadPlugins(runCtx, conf.Plugin.Plugins)
pluginRegistry.LoadPlugins(runCtx, conf.Plugin.Plugins, conf.Plugin.StartTimeout)

// Start the metrics merger if enabled.
var metricsMerger *metrics.Merger
Expand Down Expand Up @@ -295,7 +295,7 @@ var runCmd = &cobra.Command{
logger.Info().Str("name", pluginId.Name).Msg("Reloading crashed plugin")
pluginConfig := conf.Plugin.GetPlugins(pluginId.Name)
if pluginConfig != nil {
pluginRegistry.LoadPlugins(runCtx, pluginConfig)
pluginRegistry.LoadPlugins(runCtx, pluginConfig, conf.Plugin.StartTimeout)
}
} else {
logger.Trace().Str("name", pluginId.Name).Msg("Successfully pinged plugin")
Expand Down Expand Up @@ -415,7 +415,15 @@ var runCmd = &cobra.Command{

// Check if the metrics server is already running before registering the handler.
if _, err = http.Get(address); err != nil { //nolint:gosec
mux.Handle(metricsConfig.Path, gziphandler.GzipHandler(handler))
// The timeout handler limits the nested handlers from running for too long.
mux.Handle(
metricsConfig.Path,
http.TimeoutHandler(
gziphandler.GzipHandler(handler),
metricsConfig.GetTimeout(),
"The request timed out while fetching the metrics",
),
)
} else {
logger.Warn().Msg("Metrics server is already running, consider changing the port")
span.RecordError(err)
Expand All @@ -426,9 +434,16 @@ var runCmd = &cobra.Command{
Addr: metricsConfig.Address,
Handler: mux,
ReadHeaderTimeout: metricsConfig.GetReadHeaderTimeout(),
ReadTimeout: metricsConfig.GetTimeout(),
WriteTimeout: metricsConfig.GetTimeout(),
IdleTimeout: metricsConfig.GetTimeout(),
}

logger.Info().Str("address", address).Msg("Metrics are exposed")
logger.Info().Fields(map[string]interface{}{
"address": address,
"timeout": metricsConfig.GetTimeout().String(),
"readHeaderTimeout": metricsConfig.GetReadHeaderTimeout().String(),
}).Msg("Metrics are exposed")

if metricsConfig.CertFile != "" && metricsConfig.KeyFile != "" {
// Set up TLS.
Expand Down Expand Up @@ -507,11 +522,21 @@ var runCmd = &cobra.Command{
clients[name].ReceiveTimeout = clients[name].GetReceiveTimeout()
clients[name].SendDeadline = clients[name].GetSendDeadline()
clients[name].ReceiveChunkSize = clients[name].GetReceiveChunkSize()
clients[name].DialTimeout = clients[name].GetDialTimeout()

// Add clients to the pool.
for i := 0; i < cfg.GetSize(); i++ {
clientConfig := clients[name]
client := network.NewClient(runCtx, clientConfig, logger)
client := network.NewClient(
runCtx, clientConfig, logger,
network.NewRetry(
clientConfig.Retries,
clientConfig.GetBackoff(),
clientConfig.BackoffMultiplier,
clientConfig.DisableBackoffCaps,
loggers[name],
),
)

if client != nil {
eventOptions := trace.WithAttributes(
Expand All @@ -522,10 +547,15 @@ var runCmd = &cobra.Command{
attribute.String("receiveDeadline", client.ReceiveDeadline.String()),
attribute.String("receiveTimeout", client.ReceiveTimeout.String()),
attribute.String("sendDeadline", client.SendDeadline.String()),
attribute.String("dialTimeout", client.DialTimeout.String()),
attribute.Bool("tcpKeepAlive", client.TCPKeepAlive),
attribute.String("tcpKeepAlivePeriod", client.TCPKeepAlivePeriod.String()),
attribute.String("localAddress", client.LocalAddr()),
attribute.String("remoteAddress", client.RemoteAddr()),
attribute.Int("retries", clientConfig.Retries),
attribute.String("backoff", clientConfig.GetBackoff().String()),
attribute.Float64("backoffMultiplier", clientConfig.BackoffMultiplier),
attribute.Bool("disableBackoffCaps", clientConfig.DisableBackoffCaps),
)
if client.ID != "" {
eventOptions = trace.WithAttributes(
Expand All @@ -547,8 +577,15 @@ var runCmd = &cobra.Command{
"receiveDeadline": client.ReceiveDeadline.String(),
"receiveTimeout": client.ReceiveTimeout.String(),
"sendDeadline": client.SendDeadline.String(),
"dialTimeout": client.DialTimeout.String(),
"tcpKeepAlive": client.TCPKeepAlive,
"tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(),
"localAddress": client.LocalAddr(),
"remoteAddress": client.RemoteAddr(),
"retries": clientConfig.Retries,
"backoff": clientConfig.GetBackoff().String(),
"backoffMultiplier": clientConfig.BackoffMultiplier,
"disableBackoffCaps": clientConfig.DisableBackoffCaps,
}
_, err := pluginRegistry.Run(
pluginTimeoutCtx, clientCfg, v1.HookName_HOOK_NAME_ON_NEW_CLIENT)
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ func (c *Config) LoadDefaults(ctx context.Context) {
ReceiveDeadline: DefaultReceiveDeadline,
ReceiveTimeout: DefaultReceiveTimeout,
SendDeadline: DefaultSendDeadline,
DialTimeout: DefaultDialTimeout,
Retries: DefaultRetries,
Backoff: DefaultBackoff,
BackoffMultiplier: DefaultBackoffMultiplier,
DisableBackoffCaps: DefaultDisableBackoffCaps,
}

defaultPool := Pool{
Expand Down Expand Up @@ -210,6 +215,7 @@ func (c *Config) LoadDefaults(ctx context.Context) {
HealthCheckPeriod: DefaultPluginHealthCheckPeriod,
ReloadOnCrash: true,
Timeout: DefaultPluginTimeout,
StartTimeout: DefaultPluginStartTimeout,
}

if c.GlobalKoanf != nil {
Expand Down
6 changes: 6 additions & 0 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ const (
DefaultMetricsMergerPeriod = 5 * time.Second
DefaultPluginHealthCheckPeriod = 5 * time.Second
DefaultPluginTimeout = 30 * time.Second
DefaultPluginStartTimeout = 1 * time.Minute

// Client constants.
DefaultNetwork = "tcp"
Expand All @@ -102,6 +103,11 @@ const (
DefaultTCPKeepAlivePeriod = 30 * time.Second
DefaultTCPKeepAlive = false
DefaultReceiveTimeout = 0
DefaultDialTimeout = 60 * time.Second
DefaultRetries = 3
DefaultBackoff = 1 * time.Second
DefaultBackoffMultiplier = 2.0
DefaultDisableBackoffCaps = false

// Pool constants.
EmptyPoolCapacity = 0
Expand Down
33 changes: 26 additions & 7 deletions config/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,31 +106,31 @@ func (p PluginConfig) GetTerminationPolicy() TerminationPolicy {

// GetTCPKeepAlivePeriod returns the TCP keep alive period from config file or default value.
func (c Client) GetTCPKeepAlivePeriod() time.Duration {
if c.TCPKeepAlivePeriod <= 0 {
if c.TCPKeepAlivePeriod < 0 {
return DefaultTCPKeepAlivePeriod
}
return c.TCPKeepAlivePeriod
}

// GetReceiveDeadline returns the receive deadline from config file or default value.
func (c Client) GetReceiveDeadline() time.Duration {
if c.ReceiveDeadline <= 0 {
if c.ReceiveDeadline < 0 {
return DefaultReceiveDeadline
}
return c.ReceiveDeadline
}

// GetReceiveTimeout returns the receive timeout from config file or default value.
func (c Client) GetReceiveTimeout() time.Duration {
if c.ReceiveTimeout <= 0 {
if c.ReceiveTimeout < 0 {
return DefaultReceiveTimeout
}
return c.ReceiveTimeout
}

// GetSendDeadline returns the send deadline from config file or default value.
func (c Client) GetSendDeadline() time.Duration {
if c.SendDeadline <= 0 {
if c.SendDeadline < 0 {
return DefaultSendDeadline
}
return c.SendDeadline
Expand All @@ -144,6 +144,22 @@ func (c Client) GetReceiveChunkSize() int {
return c.ReceiveChunkSize
}

// GetDialTimeout returns the dial timeout from config file or default value.
func (c Client) GetDialTimeout() time.Duration {
if c.DialTimeout < 0 {
return DefaultDialTimeout
}
return c.DialTimeout
}

// GetBackoff returns the backoff from config file or default value.
func (c Client) GetBackoff() time.Duration {
if c.Backoff < 0 {
return DefaultBackoff
}
return c.Backoff
}

// GetHealthCheckPeriod returns the health check period from config file or default value.
func (pr Proxy) GetHealthCheckPeriod() time.Duration {
if pr.HealthCheckPeriod <= 0 {
Expand All @@ -154,7 +170,7 @@ func (pr Proxy) GetHealthCheckPeriod() time.Duration {

// GetTickInterval returns the tick interval from config file or default value.
func (s Server) GetTickInterval() time.Duration {
if s.TickInterval <= 0 {
if s.TickInterval < 0 {
return DefaultTickInterval
}
return s.TickInterval
Expand Down Expand Up @@ -247,20 +263,23 @@ func GetDefaultConfigFilePath(filename string) string {
return filepath.Join("./", filename)
}

// GetReadHeaderTimeout returns the read header timeout from config file or default value.
func (m Metrics) GetReadHeaderTimeout() time.Duration {
if m.ReadHeaderTimeout <= 0 {
if m.ReadHeaderTimeout < 0 {
return DefaultReadHeaderTimeout
}
return m.ReadHeaderTimeout
}

// GetTimeout returns the metrics server timeout from config file or default value.
func (m Metrics) GetTimeout() time.Duration {
if m.Timeout <= 0 {
if m.Timeout < 0 {
return DefaultMetricsServerTimeout
}
return m.Timeout
}

// Filter returns a filtered global config based on the group name.
func (gc GlobalConfig) Filter(groupName string) *GlobalConfig {
if _, ok := gc.Servers[groupName]; !ok {
return nil
Expand Down
8 changes: 4 additions & 4 deletions config/getters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestGetTerminationPolicy(t *testing.T) {
// TestGetTCPKeepAlivePeriod tests the GetTCPKeepAlivePeriod function.
func TestGetTCPKeepAlivePeriod(t *testing.T) {
client := Client{}
assert.Equal(t, DefaultTCPKeepAlivePeriod, client.GetTCPKeepAlivePeriod())
assert.Equal(t, client.GetTCPKeepAlivePeriod(), time.Duration(0))
}

// TestGetReceiveDeadline tests the GetReceiveDeadline function.
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestGetHealthCheckPeriod(t *testing.T) {
// TestGetTickInterval tests the GetTickInterval function.
func TestGetTickInterval(t *testing.T) {
server := Server{}
assert.Equal(t, DefaultTickInterval, server.GetTickInterval())
assert.Equal(t, server.GetTickInterval(), time.Duration(0))
}

// TestGetSize tests the GetSize function.
Expand Down Expand Up @@ -120,13 +120,13 @@ func TestGetDefaultConfigFilePath(t *testing.T) {
// TestGetReadTimeout tests the GetReadTimeout function.
func TestGetReadHeaderTimeout(t *testing.T) {
metrics := Metrics{}
assert.Equal(t, DefaultReadHeaderTimeout, metrics.GetReadHeaderTimeout())
assert.Equal(t, metrics.GetReadHeaderTimeout(), time.Duration(0))
}

// TestGetTimeout tests the GetTimeout function of the metrics server.
func TestGetTimeout(t *testing.T) {
metrics := Metrics{}
assert.Equal(t, DefaultMetricsServerTimeout, metrics.GetTimeout())
assert.Equal(t, metrics.GetTimeout(), time.Duration(0))
}

// TestFilter tests the Filter function.
Expand Down
6 changes: 6 additions & 0 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type PluginConfig struct {
HealthCheckPeriod time.Duration `json:"healthCheckPeriod" jsonschema:"oneof_type=string;integer"`
ReloadOnCrash bool `json:"reloadOnCrash"`
Timeout time.Duration `json:"timeout" jsonschema:"oneof_type=string;integer"`
StartTimeout time.Duration `json:"startTimeout" jsonschema:"oneof_type=string;integer"`
Plugins []Plugin `json:"plugins"`
}

Expand All @@ -36,6 +37,11 @@ type Client struct {
ReceiveDeadline time.Duration `json:"receiveDeadline" jsonschema:"oneof_type=string;integer"`
ReceiveTimeout time.Duration `json:"receiveTimeout" jsonschema:"oneof_type=string;integer"`
SendDeadline time.Duration `json:"sendDeadline" jsonschema:"oneof_type=string;integer"`
DialTimeout time.Duration `json:"dialTimeout" jsonschema:"oneof_type=string;integer"`
Retries int `json:"retries"`
Backoff time.Duration `json:"backoff" jsonschema:"oneof_type=string;integer"`
BackoffMultiplier float64 `json:"backoffMultiplier"`
DisableBackoffCaps bool `json:"disableBackoffCaps"`
}

type Logger struct {
Expand Down
9 changes: 8 additions & 1 deletion gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ loggers:
noColor: False
timeFormat: "unix" # unixms, unixmicro and unixnano
consoleTimeFormat: "RFC3339" # Go time format string
# If output is file, the following fields are used.
# If the output contains "file", the following fields are used:
fileName: "gatewayd.log"
maxSize: 500 # MB
# If maxBackups and maxAge are both 0, no old log files will be deleted.
maxBackups: 5
maxAge: 30 # days
compress: True
Expand Down Expand Up @@ -39,6 +40,12 @@ clients:
receiveDeadline: 0s # duration, 0ms/0s means no deadline
receiveTimeout: 0s # duration, 0ms/0s means no timeout
sendDeadline: 0s # duration, 0ms/0s means no deadline
dialTimeout: 60s # duration
# Retry configuration
retries: 3 # 0 means no retry
backoff: 1s # duration
backoffMultiplier: 2.0 # 0 means no backoff
disableBackoffCaps: false

pools:
default:
Expand Down
3 changes: 3 additions & 0 deletions gatewayd_plugins.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ reloadOnCrash: True
# The timeout controls how long to wait for a plugin to respond to a request before timing out.
timeout: 30s

# The start timeout controls how long to wait for a plugin to start before timing out.
startTimeout: 1m

# The plugin configuration is a list of plugins to load. Each plugin is defined by a name,
# a path to the plugin's executable, and a list of arguments to pass to the plugin. The
# plugin's executable is expected to be a Go plugin that implements the GatewayD plugin
Expand Down
2 changes: 1 addition & 1 deletion metrics/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (m *Merger) Start() {
m.scheduler.StartAsync()
m.Logger.Info().Fields(
map[string]interface{}{
"startDelay": startDelay,
"startDelay": startDelay.Format(time.RFC3339),
"metricsMergerPeriod": m.MetricsMergerPeriod.String(),
},
).Msg("Started the metrics merger scheduler")
Expand Down
Loading