Skip to content

Commit

Permalink
Merge pull request #401 from snowAvocado/remove-elastic-mode
Browse files Browse the repository at this point in the history
Remove Elastic Mode #359
  • Loading branch information
mostafa authored Dec 31, 2023
2 parents d1bb67f + 694d28f commit 09366a4
Show file tree
Hide file tree
Showing 15 changed files with 263 additions and 435 deletions.
4 changes: 0 additions & 4 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ func TestGetProxies(t *testing.T) {
context.TODO(),
newPool,
nil,
false,
false,
config.DefaultHealthCheckPeriod,
&config.Client{
Network: config.DefaultNetwork,
Expand Down Expand Up @@ -233,8 +231,6 @@ func TestGetServers(t *testing.T) {
context.TODO(),
newPool,
nil,
true,
false,
config.DefaultHealthCheckPeriod,
&config.Client{
Network: config.DefaultNetwork,
Expand Down
471 changes: 234 additions & 237 deletions api/v1/api.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion api/v1/api.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions api/v1/api.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,8 +732,6 @@ var runCmd = &cobra.Command{
runCtx,
pools[name],
pluginRegistry,
cfg.Elastic,
cfg.ReuseElasticClients,
cfg.HealthCheckPeriod,
clientConfig,
logger,
Expand All @@ -742,8 +740,6 @@ var runCmd = &cobra.Command{

span.AddEvent("Create proxy", trace.WithAttributes(
attribute.String("name", name),
attribute.Bool("elastic", cfg.Elastic),
attribute.Bool("reuseElasticClients", cfg.ReuseElasticClients),
attribute.String("healthCheckPeriod", cfg.HealthCheckPeriod.String()),
))

Expand Down
4 changes: 2 additions & 2 deletions cmd/testdata/gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ pools:

proxies:
default:
elastic: False
healthCheckPeriod: 60s # duration
test:
elastic: False
healthCheckPeriod: 60s # duration

servers:
default:
Expand Down
2 changes: 1 addition & 1 deletion cmd/testdata/gatewayd_tls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pools:

proxies:
default:
elastic: False
healthCheckPeriod: 60s # duration

servers:
default:
Expand Down
4 changes: 1 addition & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ func (c *Config) LoadDefaults(ctx context.Context) {
}

defaultProxy := Proxy{
Elastic: false,
ReuseElasticClients: false,
HealthCheckPeriod: DefaultHealthCheckPeriod,
HealthCheckPeriod: DefaultHealthCheckPeriod,
}

defaultServer := Server{
Expand Down
4 changes: 1 addition & 3 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ type Pool struct {
}

type Proxy struct {
Elastic bool `json:"elastic"`
ReuseElasticClients bool `json:"reuseElasticClients"`
HealthCheckPeriod time.Duration `json:"healthCheckPeriod" jsonschema:"oneof_type=string;integer"`
HealthCheckPeriod time.Duration `json:"healthCheckPeriod" jsonschema:"oneof_type=string;integer"`
}

type Server struct {
Expand Down
2 changes: 0 additions & 2 deletions gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ pools:

proxies:
default:
elastic: False
reuseElasticClients: False
healthCheckPeriod: 60s # duration

servers:
Expand Down
77 changes: 19 additions & 58 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ type Proxy struct {
scheduler *gocron.Scheduler
ctx context.Context //nolint:containedctx
pluginTimeout time.Duration
HealthCheckPeriod time.Duration

Elastic bool
ReuseElasticClients bool
HealthCheckPeriod time.Duration

// ClientConfig is used for elastic proxy and reconnection
// ClientConfig is used for reconnection
ClientConfig *config.Client
}

Expand All @@ -55,7 +52,6 @@ var _ IProxy = (*Proxy)(nil)
func NewProxy(
ctx context.Context,
connPool pool.IPool, pluginRegistry *plugin.Registry,
elastic, reuseElasticClients bool,
healthCheckPeriod time.Duration,
clientConfig *config.Client, logger zerolog.Logger,
pluginTimeout time.Duration,
Expand All @@ -71,8 +67,6 @@ func NewProxy(
scheduler: gocron.NewScheduler(time.UTC),
ctx: proxyCtx,
pluginTimeout: pluginTimeout,
Elastic: elastic,
ReuseElasticClients: reuseElasticClients,
ClientConfig: clientConfig,
HealthCheckPeriod: healthCheckPeriod,
}
Expand Down Expand Up @@ -138,8 +132,7 @@ func NewProxy(
}

// Connect maps a server connection from the available connection pool to a incoming connection.
// It returns an error if the pool is exhausted. If the pool is elastic, it creates a new client
// and maps it to the incoming connection.
// It returns an error if the pool is exhausted.
func (pr *Proxy) Connect(conn *ConnWrapper) *gerr.GatewayDError {
_, span := otel.Tracer(config.TracerName).Start(pr.ctx, "Connect")
defer span.End()
Expand All @@ -156,34 +149,13 @@ func (pr *Proxy) Connect(conn *ConnWrapper) *gerr.GatewayDError {

var client *Client
if pr.IsExhausted() {
// Pool is exhausted or is elastic.
if pr.Elastic {
// Create a new client.
client = NewClient(
pr.ctx, pr.ClientConfig, pr.logger,
NewRetry(
pr.ClientConfig.Retries,
config.If[time.Duration](
pr.ClientConfig.Backoff > 0,
pr.ClientConfig.Backoff,
config.DefaultBackoff,
),
pr.ClientConfig.BackoffMultiplier,
pr.ClientConfig.DisableBackoffCaps,
pr.logger,
),
)
span.AddEvent("Created a new client connection")
pr.logger.Debug().Str("id", client.ID[:7]).Msg("Reused the client connection")
} else {
span.AddEvent(gerr.ErrPoolExhausted.Error())
return gerr.ErrPoolExhausted
}
} else {
// Get the client from the pool with the given clientID.
if cl, ok := pr.availableConnections.Pop(clientID).(*Client); ok {
client = cl
}
// Pool is exhausted
span.AddEvent(gerr.ErrPoolExhausted.Error())
return gerr.ErrPoolExhausted
}
// Get the client from the pool with the given clientID.
if cl, ok := pr.availableConnections.Pop(clientID).(*Client); ok {
client = cl
}

client, err := pr.IsHealthy(client)
Expand Down Expand Up @@ -241,23 +213,17 @@ func (pr *Proxy) Disconnect(conn *ConnWrapper) *gerr.GatewayDError {
return gerr.ErrClientNotFound
}

//nolint:nestif
if client, ok := client.(*Client); ok {
if (pr.Elastic && pr.ReuseElasticClients) || !pr.Elastic {
// Recycle the server connection by reconnecting.
if err := client.Reconnect(); err != nil {
pr.logger.Error().Err(err).Msg("Failed to reconnect to the client")
span.RecordError(err)
}
// Recycle the server connection by reconnecting.
if err := client.Reconnect(); err != nil {
pr.logger.Error().Err(err).Msg("Failed to reconnect to the client")
span.RecordError(err)
}

// If the client is not in the pool, put it back.
if err := pr.availableConnections.Put(client.ID, client); err != nil {
pr.logger.Error().Err(err).Msg("Failed to put the client back in the pool")
span.RecordError(err)
}
} else {
span.RecordError(gerr.ErrClientNotConnected)
return gerr.ErrClientNotConnected
// If the client is not in the pool, put it back.
if err := pr.availableConnections.Put(client.ID, client); err != nil {
pr.logger.Error().Err(err).Msg("Failed to put the client back in the pool")
span.RecordError(err)
}
} else {
// This should never happen, but if it does,
Expand Down Expand Up @@ -627,11 +593,6 @@ func (pr *Proxy) IsHealthy(client *Client) (*Client, *gerr.GatewayDError) {
func (pr *Proxy) IsExhausted() bool {
_, span := otel.Tracer(config.TracerName).Start(pr.ctx, "IsExhausted")
defer span.End()

if pr.Elastic {
return false
}

return pr.availableConnections.Size() == 0 && pr.availableConnections.Cap() > 0
}

Expand Down
Loading

0 comments on commit 09366a4

Please sign in to comment.