Skip to content
This repository has been archived by the owner on Mar 19, 2024. It is now read-only.

WIP GRPC rate limiting #533

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/533.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
Use Consul Catalog API instead of Agent API to register APIGateway to prevent GRPC rate limits
```
Comment on lines +1 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
```release-note:bug
Use Consul Catalog API instead of Agent API to register APIGateway to prevent GRPC rate limits
```
```release-note:bug
Use Consul Catalog API instead of Agent API to register gateways in agentless deployments, to avoid gRPC rate limiting for subsequent API calls handled by Consul servers other than the one with which the gateway was registered

2 changes: 1 addition & 1 deletion internal/commands/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func RunExec(config ExecConfig) (ret int) {
config.GatewayConfig.Host,
)
if config.isTest {
registry = registry.WithTries(1)
registry = registry.WithRetries(1)
}

config.Logger.Trace("registering service")
Expand Down
4 changes: 2 additions & 2 deletions internal/commands/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,8 @@ func runMockConsulServer(t *testing.T, opts mockConsulOptions) *mockConsulServer

loginPath := "/v1/acl/login"
logoutPath := "/v1/acl/logout"
registerPath := "/v1/agent/service/register"
deregisterPath := "/v1/agent/service/deregister"
registerPath := "/v1/catalog/register"
deregisterPath := "/v1/catalog/deregister"
leafPath := "/v1/agent/connect/ca/leaf"
rootPath := "/v1/agent/connect/ca/roots"

Expand Down
10 changes: 5 additions & 5 deletions internal/consul/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Authenticator struct {

method string
namespace string
tries uint64
retries uint64
backoffInterval time.Duration
}

Expand All @@ -36,13 +36,13 @@ func NewAuthenticator(logger hclog.Logger, consul *api.Client, method, namespace
logger: logger,
method: method,
namespace: namespace,
tries: defaultMaxAttempts,
retries: defaultMaxRetries,
backoffInterval: defaultBackoffInterval,
}
}

func (a *Authenticator) WithTries(tries uint64) *Authenticator {
a.tries = tries
func (a *Authenticator) WithRetries(retries uint64) *Authenticator {
a.retries = retries
return a
}

Expand All @@ -58,7 +58,7 @@ func (a *Authenticator) Authenticate(ctx context.Context, service, bearerToken s
a.logger.Error("error authenticating", "error", err)
}
return err
}, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(a.backoffInterval), a.tries), ctx))
}, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(a.backoffInterval), a.retries), ctx))
return token, err
}

Expand Down
22 changes: 11 additions & 11 deletions internal/consul/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestAuthenticate(t *testing.T) {
service string
expectedMeta string
failures uint64
maxAttempts uint64
maxRetries uint64
fail bool
}{{
name: "success-no-namespace",
Expand All @@ -46,25 +46,25 @@ func TestAuthenticate(t *testing.T) {
service: "consul-api-gateway-test",
expectedMeta: "consul-api-gateway-test",
failures: 3,
maxAttempts: 3,
maxRetries: 3,
}, {
name: "retry-failure",
service: "consul-api-gateway-test",
failures: 3,
maxAttempts: 2,
fail: true,
name: "retry-failure",
service: "consul-api-gateway-test",
failures: 3,
maxRetries: 2,
fail: true,
}} {
t.Run(test.name, func(t *testing.T) {
server := runACLServer(t, test.failures)
method := gwTesting.RandomString()
token := gwTesting.RandomString()

maxAttempts := defaultMaxAttempts
if test.maxAttempts > 0 {
maxAttempts = test.maxAttempts
maxRetries := defaultMaxRetries
if test.maxRetries > 0 {
maxRetries = test.maxRetries
}

auth := NewAuthenticator(hclog.NewNullLogger(), server.consul, method, test.namespace).WithTries(maxAttempts)
auth := NewAuthenticator(hclog.NewNullLogger(), server.consul, method, test.namespace).WithRetries(maxRetries)
auth.backoffInterval = 0
consulToken, err := auth.Authenticate(context.Background(), test.service, token)

Expand Down
2 changes: 1 addition & 1 deletion internal/consul/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ import (
)

const (
defaultMaxAttempts = uint64(30)
defaultMaxRetries = uint64(30)
defaultBackoffInterval = 1 * time.Second
)
159 changes: 107 additions & 52 deletions internal/consul/registration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ import (
"net/http"
"time"

"github.com/hashicorp/go-hclog"

"github.com/cenkalti/backoff"
"github.com/google/uuid"

"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
)

const (
serviceCheckName = "consul-api-gateway Gateway Listener"
serviceCheckInterval = "10s"
serviceCheckTTL = "20s"
serviceDeregistrationTime = "1m"
serviceCheckInterval = time.Second * 10
serviceCheckTTL = time.Second * 20
serviceDeregistrationTime = time.Minute
)

// ServiceRegistry handles the logic for registering a consul-api-gateway service in Consul.
Expand All @@ -39,14 +40,35 @@ type ServiceRegistry struct {
tags []string

cancel context.CancelFunc
tries uint64
retries uint64
backoffInterval time.Duration
reregistrationInterval time.Duration
updateTTLInterval time.Duration
address string
}

// NewServiceRegistry creates a new service registry instance
func NewServiceRegistryWithAddress(logger hclog.Logger, client Client, service, namespace, partition, host, address string) *ServiceRegistry {
return newServiceRegistry(logger, client, service, namespace, partition, host, address)
}

// NewServiceRegistry creates a new service registry instance
func NewServiceRegistry(logger hclog.Logger, client Client, service, namespace, partition, host string) *ServiceRegistry {
// TODO: this is probably wrong, should this be the consul-http-addr flag value
address := ""

// FIXME: this setup call is currently tracked against the max retries allowed
// by the mock Consul server
// nodes, _, err := client.Catalog().Nodes(nil)
// for _, n := range nodes {
// address = n.Address
// }

return newServiceRegistry(logger, client, service, namespace, partition, host, address)
}

// NewServiceRegistry creates a new service registry instance
func newServiceRegistry(logger hclog.Logger, client Client, service, namespace, partition, host, address string) *ServiceRegistry {
return &ServiceRegistry{
logger: logger,
client: client,
Expand All @@ -55,10 +77,11 @@ func NewServiceRegistry(logger hclog.Logger, client Client, service, namespace,
namespace: namespace,
partition: partition,
host: host,
tries: defaultMaxAttempts,
retries: defaultMaxRetries,
backoffInterval: defaultBackoffInterval,
reregistrationInterval: 30 * time.Second,
updateTTLInterval: 10 * time.Second,
address: address,
}
}

Expand All @@ -68,59 +91,79 @@ func (s *ServiceRegistry) WithTags(tags []string) *ServiceRegistry {
return s
}

// WithTries tells the service registry to retry on any remote operations.
func (s *ServiceRegistry) WithTries(tries uint64) *ServiceRegistry {
s.tries = tries
// WithRetries tells the service registry to retry on any remote operations.
func (s *ServiceRegistry) WithRetries(retries uint64) *ServiceRegistry {
s.retries = retries
return s
}

// Register registers a Gateway service with Consul.
func (s *ServiceRegistry) RegisterGateway(ctx context.Context, ttl bool) error {
serviceChecks := api.AgentServiceChecks{{
Name: fmt.Sprintf("%s - Ready", serviceCheckName),
TCP: fmt.Sprintf("%s:%d", s.host, 20000),
Interval: serviceCheckInterval,
DeregisterCriticalServiceAfter: serviceDeregistrationTime,
serviceChecks := api.HealthChecks{{
Name: fmt.Sprintf("%s - Ready", serviceCheckName),
Definition: api.HealthCheckDefinition{
TCP: fmt.Sprintf("%s:%d", s.host, 20000),
IntervalDuration: serviceCheckInterval,
DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime,
},
}}
if ttl {
serviceChecks = api.AgentServiceChecks{{
CheckID: s.id,
Name: fmt.Sprintf("%s - Health", s.name),
TTL: serviceCheckTTL,
DeregisterCriticalServiceAfter: serviceDeregistrationTime,
serviceChecks = api.HealthChecks{{
CheckID: s.id,
Name: fmt.Sprintf("%s - Health", s.name),
Definition: api.HealthCheckDefinition{
TCP: fmt.Sprintf("%s:%d", s.host, 20000),
TimeoutDuration: serviceCheckTTL,
DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime,
},
}}
}

return s.register(ctx, &api.AgentServiceRegistration{
Kind: api.ServiceKind(api.IngressGateway),
ID: s.id,
Name: s.name,
Namespace: s.namespace,
Partition: s.partition,
Address: s.host,
Tags: s.tags,
Meta: map[string]string{
"external-source": "consul-api-gateway",
//node := api.Catalog

return s.register(ctx, &api.CatalogRegistration{
ID: s.id,
Node: s.name,
Address: s.host,
Service: &api.AgentService{
Kind: api.ServiceKind(api.IngressGateway),
ID: s.id,
Service: s.name,
Namespace: s.namespace,
Partition: s.partition,
Address: s.host,
Tags: s.tags,
Meta: map[string]string{
"external-source": "consul-api-gateway",
},
},
Checks: serviceChecks,
}, ttl)
}

// Register registers a service with Consul.
func (s *ServiceRegistry) Register(ctx context.Context) error {
return s.register(ctx, &api.AgentServiceRegistration{
Kind: api.ServiceKindTypical,
ID: s.id,
Name: s.name,
Namespace: s.namespace,
Partition: s.partition,
Address: s.host,
Tags: s.tags,
Checks: api.AgentServiceChecks{{
CheckID: s.id,
Name: fmt.Sprintf("%s - Health", s.name),
TTL: serviceCheckTTL,
DeregisterCriticalServiceAfter: serviceDeregistrationTime,
return s.register(ctx, &api.CatalogRegistration{
ID: s.id,
Node: s.name,
Address: s.host,
Service: &api.AgentService{
Kind: api.ServiceKindTypical,
ID: s.id,
Service: s.name,
Namespace: s.namespace,
Partition: s.partition,
Address: s.host,
Tags: s.tags,
},

Checks: api.HealthChecks{{
CheckID: s.id,
Name: fmt.Sprintf("%s - Health", s.name),
Definition: api.HealthCheckDefinition{
TimeoutDuration: serviceCheckTTL,
DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime,
},
}},
}, true)
}
Expand All @@ -130,7 +173,7 @@ func (s *ServiceRegistry) updateTTL(ctx context.Context) error {
return s.client.Agent().UpdateTTLOpts(s.id, "service healthy", "pass", opts.WithContext(ctx))
}

func (s *ServiceRegistry) register(ctx context.Context, registration *api.AgentServiceRegistration, ttl bool) error {
func (s *ServiceRegistry) register(ctx context.Context, registration *api.CatalogRegistration, ttl bool) error {
if s.cancel != nil {
return nil
}
Expand Down Expand Up @@ -168,10 +211,13 @@ func (s *ServiceRegistry) register(ctx context.Context, registration *api.AgentS
return nil
}

func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *api.AgentServiceRegistration) {
_, _, err := s.client.Agent().Service(s.id, &api.QueryOptions{
func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *api.CatalogRegistration) {
// TODO: will this actually return an error for the catalog API, or just an
// empty list?
_, _, err := s.client.Catalog().Service(s.id, "", &api.QueryOptions{
Namespace: s.namespace,
})

if err == nil {
return
}
Expand All @@ -192,18 +238,21 @@ func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *
s.logger.Error("error fetching service", "error", err)
}

func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *api.AgentServiceRegistration) error {
func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *api.CatalogRegistration) error {
return backoff.Retry(func() error {
err := s.registerService(ctx, registration)
if err != nil {
s.logger.Error("error registering service", "error", err)
}
return err
}, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.tries), ctx))
}, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.retries), ctx))
}

func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.AgentServiceRegistration) error {
return s.client.Agent().ServiceRegisterOpts(registration, (&api.ServiceRegisterOpts{}).WithContext(ctx))
func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.CatalogRegistration) error {
writeOptions := &api.WriteOptions{}
_, err := s.client.Catalog().Register(registration, writeOptions.WithContext(ctx))

return err
}

// Deregister de-registers a service from Consul.
Expand All @@ -219,13 +268,19 @@ func (s *ServiceRegistry) Deregister(ctx context.Context) error {
s.logger.Error("error deregistering service", "error", err)
}
return err
}, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.tries), ctx))
}, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.retries), ctx))
}

func (s *ServiceRegistry) deregister(ctx context.Context) error {
return s.client.Agent().ServiceDeregisterOpts(s.id, (&api.QueryOptions{
writeOptions := &api.WriteOptions{}
_, err := s.client.Catalog().Deregister(&api.CatalogDeregistration{
Node: s.id,
Address: s.address,
//ServiceID: s.id,
Namespace: s.namespace,
}).WithContext(ctx))
Partition: s.partition,
}, writeOptions.WithContext(ctx))
return err
}

func (s *ServiceRegistry) ID() string {
Expand Down
Loading