From 0f35a9f046131edfd64710073f795b4358ea4da8 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Fri, 17 Mar 2023 17:25:19 -0500 Subject: [PATCH 01/17] checkpoint, update registration to use catalog api --- internal/consul/registration.go | 111 ++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 41 deletions(-) diff --git a/internal/consul/registration.go b/internal/consul/registration.go index 72ba9400..8583ccf2 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -19,9 +19,9 @@ import ( const ( serviceCheckName = "consul-api-gateway Gateway Listener" - serviceCheckInterval = "10s" - serviceCheckTTL = "20s" - serviceDeregistrationTime = "1m" + serviceCheckInterval = time.Second * 10 + serviceCheckTTL = time.Second * 20 + serviceDeregistrationTime = time.Minute ) // ServiceRegistry handles the logic for registering a consul-api-gateway service in Consul. @@ -76,31 +76,53 @@ func (s *ServiceRegistry) WithTries(tries uint64) *ServiceRegistry { // Register registers a Gateway service with Consul. func (s *ServiceRegistry) RegisterGateway(ctx context.Context, ttl bool) error { - serviceChecks := api.AgentServiceChecks{{ - Name: fmt.Sprintf("%s - Ready", serviceCheckName), - TCP: fmt.Sprintf("%s:%d", s.host, 20000), - Interval: serviceCheckInterval, - DeregisterCriticalServiceAfter: serviceDeregistrationTime, + //serviceCheck := api.AgentServiceCheck{ + // Name: fmt.Sprintf("%s - Ready", serviceCheckName), + // TCP: fmt.Sprintf("%s:%d", s.host, 20000), + // Interval: serviceCheckInterval, + // DeregisterCriticalServiceAfter: serviceDeregistrationTime, + //} + //if ttl { + // serviceCheck = api.AgentServiceCheck{ + // CheckID: s.id, + // Name: fmt.Sprintf("%s - Health", s.name), + // TTL: serviceCheckTTL, + // DeregisterCriticalServiceAfter: serviceDeregistrationTime, + // } + //} + + serviceChecks := api.HealthChecks{{ + Name: fmt.Sprintf("%s - Ready", serviceCheckName), + Definition: api.HealthCheckDefinition{ + TCP: fmt.Sprintf("%s:%d", s.host, 20000), + IntervalDuration: serviceCheckInterval, + DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime, + }, }} if ttl { - serviceChecks = api.AgentServiceChecks{{ - CheckID: s.id, - Name: fmt.Sprintf("%s - Health", s.name), - TTL: serviceCheckTTL, - DeregisterCriticalServiceAfter: serviceDeregistrationTime, + serviceChecks = api.HealthChecks{{ + CheckID: s.id, + Name: fmt.Sprintf("%s - Health", s.name), + Definition: api.HealthCheckDefinition{ + TCP: fmt.Sprintf("%s:%d", s.host, 20000), + TimeoutDuration: serviceCheckTTL, + DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime, + }, }} } - return s.register(ctx, &api.AgentServiceRegistration{ - Kind: api.ServiceKind(api.IngressGateway), - ID: s.id, - Name: s.name, - Namespace: s.namespace, - Partition: s.partition, - Address: s.host, - Tags: s.tags, - Meta: map[string]string{ - "external-source": "consul-api-gateway", + return s.register(ctx, &api.CatalogRegistration{ + Service: &api.AgentService{ + Kind: api.ServiceKind(api.IngressGateway), + ID: s.id, + Service: s.name, + Namespace: s.namespace, + Partition: s.partition, + Address: s.host, + Tags: s.tags, + Meta: map[string]string{ + "external-source": "consul-api-gateway", + }, }, Checks: serviceChecks, }, ttl) @@ -108,19 +130,24 @@ func (s *ServiceRegistry) RegisterGateway(ctx context.Context, ttl bool) error { // Register registers a service with Consul. func (s *ServiceRegistry) Register(ctx context.Context) error { - return s.register(ctx, &api.AgentServiceRegistration{ - Kind: api.ServiceKindTypical, - ID: s.id, - Name: s.name, - Namespace: s.namespace, - Partition: s.partition, - Address: s.host, - Tags: s.tags, - Checks: api.AgentServiceChecks{{ - CheckID: s.id, - Name: fmt.Sprintf("%s - Health", s.name), - TTL: serviceCheckTTL, - DeregisterCriticalServiceAfter: serviceDeregistrationTime, + return s.register(ctx, &api.CatalogRegistration{ + Service: &api.AgentService{ + Kind: api.ServiceKindTypical, + ID: s.id, + Service: s.name, + Namespace: s.namespace, + Partition: s.partition, + Address: s.host, + Tags: s.tags, + }, + + Checks: api.HealthChecks{{ + CheckID: s.id, + Name: fmt.Sprintf("%s - Health", s.name), + Definition: api.HealthCheckDefinition{ + TimeoutDuration: serviceCheckTTL, + DeregisterCriticalServiceAfterDuration: serviceDeregistrationTime, + }, }}, }, true) } @@ -130,7 +157,7 @@ func (s *ServiceRegistry) updateTTL(ctx context.Context) error { return s.client.Agent().UpdateTTLOpts(s.id, "service healthy", "pass", opts.WithContext(ctx)) } -func (s *ServiceRegistry) register(ctx context.Context, registration *api.AgentServiceRegistration, ttl bool) error { +func (s *ServiceRegistry) register(ctx context.Context, registration *api.CatalogRegistration, ttl bool) error { if s.cancel != nil { return nil } @@ -168,7 +195,7 @@ func (s *ServiceRegistry) register(ctx context.Context, registration *api.AgentS return nil } -func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *api.AgentServiceRegistration) { +func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *api.CatalogRegistration) { _, _, err := s.client.Agent().Service(s.id, &api.QueryOptions{ Namespace: s.namespace, }) @@ -192,7 +219,7 @@ func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration * s.logger.Error("error fetching service", "error", err) } -func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *api.AgentServiceRegistration) error { +func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *api.CatalogRegistration) error { return backoff.Retry(func() error { err := s.registerService(ctx, registration) if err != nil { @@ -202,8 +229,10 @@ func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *a }, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.tries), ctx)) } -func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.AgentServiceRegistration) error { - return s.client.Agent().ServiceRegisterOpts(registration, (&api.ServiceRegisterOpts{}).WithContext(ctx)) +func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.CatalogRegistration) error { + _, err := s.client.Catalog().Register(registration, nil) + return err + //return s.client.Agent().ServiceRegisterOpts(registration, (&api.ServiceRegisterOpts{}).WithContext(ctx)) } // Deregister de-registers a service from Consul. From 0e2b33955a4c05e4b417284617655df59f2908c2 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Fri, 17 Mar 2023 18:15:33 -0500 Subject: [PATCH 02/17] fix unit test assertions --- .changelog/533.txt | 3 +++ internal/consul/registration.go | 19 +++---------------- internal/consul/registration_test.go | 16 ++++++++-------- 3 files changed, 14 insertions(+), 24 deletions(-) create mode 100644 .changelog/533.txt diff --git a/.changelog/533.txt b/.changelog/533.txt new file mode 100644 index 00000000..dc61ff8c --- /dev/null +++ b/.changelog/533.txt @@ -0,0 +1,3 @@ +```release-note:bug +Use Consul Catalog API instead of Agent API to register APIGateway to prevent GRPC rate limits +``` diff --git a/internal/consul/registration.go b/internal/consul/registration.go index 8583ccf2..f56d3e10 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -76,21 +76,6 @@ func (s *ServiceRegistry) WithTries(tries uint64) *ServiceRegistry { // Register registers a Gateway service with Consul. func (s *ServiceRegistry) RegisterGateway(ctx context.Context, ttl bool) error { - //serviceCheck := api.AgentServiceCheck{ - // Name: fmt.Sprintf("%s - Ready", serviceCheckName), - // TCP: fmt.Sprintf("%s:%d", s.host, 20000), - // Interval: serviceCheckInterval, - // DeregisterCriticalServiceAfter: serviceDeregistrationTime, - //} - //if ttl { - // serviceCheck = api.AgentServiceCheck{ - // CheckID: s.id, - // Name: fmt.Sprintf("%s - Health", s.name), - // TTL: serviceCheckTTL, - // DeregisterCriticalServiceAfter: serviceDeregistrationTime, - // } - //} - serviceChecks := api.HealthChecks{{ Name: fmt.Sprintf("%s - Ready", serviceCheckName), Definition: api.HealthCheckDefinition{ @@ -112,6 +97,7 @@ func (s *ServiceRegistry) RegisterGateway(ctx context.Context, ttl bool) error { } return s.register(ctx, &api.CatalogRegistration{ + ID: s.id, Service: &api.AgentService{ Kind: api.ServiceKind(api.IngressGateway), ID: s.id, @@ -230,7 +216,8 @@ func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *a } func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.CatalogRegistration) error { - _, err := s.client.Catalog().Register(registration, nil) + writeOptions := &api.WriteOptions{} + _, err := s.client.Catalog().Register(registration, writeOptions.WithContext(ctx)) return err //return s.client.Agent().ServiceRegisterOpts(registration, (&api.ServiceRegisterOpts{}).WithContext(ctx)) } diff --git a/internal/consul/registration_test.go b/internal/consul/registration_test.go index ef6b63ac..3db549ac 100644 --- a/internal/consul/registration_test.go +++ b/internal/consul/registration_test.go @@ -7,7 +7,7 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "net/http/httptest" "net/url" @@ -73,11 +73,11 @@ func TestRegister(t *testing.T) { require.NoError(t, err) require.Equal(t, id, registry.ID()) require.Equal(t, id, server.lastRegistrationRequest.ID) - require.Equal(t, service, server.lastRegistrationRequest.Name) - require.Equal(t, namespace, server.lastRegistrationRequest.Namespace) - require.Equal(t, test.host, server.lastRegistrationRequest.Address) + require.Equal(t, service, server.lastRegistrationRequest.Service.Service) + require.Equal(t, namespace, server.lastRegistrationRequest.Service.Namespace) + require.Equal(t, test.host, server.lastRegistrationRequest.Service.Address) require.Len(t, server.lastRegistrationRequest.Checks, 1) - require.Equal(t, fmt.Sprintf("%s:20000", test.host), server.lastRegistrationRequest.Checks[0].TCP) + require.Equal(t, fmt.Sprintf("%s:20000", test.host), server.lastRegistrationRequest.Checks[0].Definition.TCP) }) } } @@ -128,7 +128,7 @@ func TestDeregister(t *testing.T) { type registryServer struct { consul *api.Client - lastRegistrationRequest api.AgentServiceRegistration + lastRegistrationRequest api.CatalogRegistration deregistered bool } @@ -137,7 +137,7 @@ func runRegistryServer(t *testing.T, failures uint64, id string) *registryServer server := ®istryServer{} - registerPath := "/v1/agent/service/register" + registerPath := "/v1/catalog/register" deregisterPath := fmt.Sprintf("/v1/agent/service/deregister/%s", id) // Start the fake Consul server. @@ -148,7 +148,7 @@ func runRegistryServer(t *testing.T, failures uint64, id string) *registryServer return } if r != nil && r.URL.Path == registerPath && r.Method == "PUT" { - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { t.Errorf("error reading request body: %v", err) w.WriteHeader(http.StatusInternalServerError) From 533b0970e3e9c0683a425c9be6ec58a2d01c2158 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Fri, 17 Mar 2023 18:29:15 -0500 Subject: [PATCH 03/17] update register path --- internal/commands/exec/exec_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/commands/exec/exec_test.go b/internal/commands/exec/exec_test.go index e26120fa..df47c911 100644 --- a/internal/commands/exec/exec_test.go +++ b/internal/commands/exec/exec_test.go @@ -371,7 +371,7 @@ func runMockConsulServer(t *testing.T, opts mockConsulOptions) *mockConsulServer loginPath := "/v1/acl/login" logoutPath := "/v1/acl/logout" - registerPath := "/v1/agent/service/register" + registerPath := "/v1/catalog/register" deregisterPath := "/v1/agent/service/deregister" leafPath := "/v1/agent/connect/ca/leaf" rootPath := "/v1/agent/connect/ca/roots" From a79cd3fef6216cf32fa4f0bae02814a44c92d22b Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Wed, 22 Mar 2023 17:44:13 -0500 Subject: [PATCH 04/17] update registration to include node --- internal/consul/registration.go | 42 ++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/internal/consul/registration.go b/internal/consul/registration.go index f56d3e10..aa5ea95a 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -43,10 +43,31 @@ type ServiceRegistry struct { backoffInterval time.Duration reregistrationInterval time.Duration updateTTLInterval time.Duration + address string +} + +// NewServiceRegistry creates a new service registry instance +func NewServiceRegistryWithAddress(logger hclog.Logger, client Client, service, namespace, host, address string) *ServiceRegistry { + return newServiceRegistry(logger, client, service, namespace, host, address) } // NewServiceRegistry creates a new service registry instance func NewServiceRegistry(logger hclog.Logger, client Client, service, namespace, partition, host string) *ServiceRegistry { + address := "" + //TODO this is probably wrong, should this be the consul-http-addr flag value + nodes, _, err := client.Catalog().Nodes(nil) + if err != nil { + address = "" + } else { + for _, n := range nodes { + address = n.Address + } + } + return newServiceRegistry(logger, client, service, namespace, partition, host, address) +} + +// NewServiceRegistry creates a new service registry instance +func newServiceRegistry(logger hclog.Logger, client Client, service, namespace, partition, host, address string) *ServiceRegistry { return &ServiceRegistry{ logger: logger, client: client, @@ -59,6 +80,7 @@ func NewServiceRegistry(logger hclog.Logger, client Client, service, namespace, backoffInterval: defaultBackoffInterval, reregistrationInterval: 30 * time.Second, updateTTLInterval: 10 * time.Second, + address: address, } } @@ -96,8 +118,12 @@ func (s *ServiceRegistry) RegisterGateway(ctx context.Context, ttl bool) error { }} } + //node := api.Catalog + return s.register(ctx, &api.CatalogRegistration{ - ID: s.id, + ID: s.id, + Node: s.name, + Address: s.host, Service: &api.AgentService{ Kind: api.ServiceKind(api.IngressGateway), ID: s.id, @@ -117,6 +143,9 @@ func (s *ServiceRegistry) RegisterGateway(ctx context.Context, ttl bool) error { // Register registers a service with Consul. func (s *ServiceRegistry) Register(ctx context.Context) error { return s.register(ctx, &api.CatalogRegistration{ + ID: s.id, + Node: s.name, + Address: s.host, Service: &api.AgentService{ Kind: api.ServiceKindTypical, ID: s.id, @@ -216,8 +245,10 @@ func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *a } func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.CatalogRegistration) error { + writeOptions := &api.WriteOptions{} _, err := s.client.Catalog().Register(registration, writeOptions.WithContext(ctx)) + return err //return s.client.Agent().ServiceRegisterOpts(registration, (&api.ServiceRegisterOpts{}).WithContext(ctx)) } @@ -239,9 +270,14 @@ func (s *ServiceRegistry) Deregister(ctx context.Context) error { } func (s *ServiceRegistry) deregister(ctx context.Context) error { - return s.client.Agent().ServiceDeregisterOpts(s.id, (&api.QueryOptions{ + writeOptions := &api.WriteOptions{} + _, err := s.client.Catalog().Deregister(&api.CatalogDeregistration{ + Node: s.id, + Address: s.address, + ServiceID: s.id, Namespace: s.namespace, - }).WithContext(ctx)) + }, writeOptions.WithContext(ctx)) + return err } func (s *ServiceRegistry) ID() string { From cec0e96a5e286d2a00e12705df4a896da7a6ce82 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Wed, 22 Mar 2023 18:02:35 -0500 Subject: [PATCH 05/17] merge --- internal/consul/registration.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/consul/registration.go b/internal/consul/registration.go index aa5ea95a..8927cb48 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "github.com/hashicorp/go-hclog" "net/http" "time" @@ -14,7 +15,6 @@ import ( "github.com/google/uuid" "github.com/hashicorp/consul/api" - "github.com/hashicorp/go-hclog" ) const ( @@ -47,8 +47,8 @@ type ServiceRegistry struct { } // NewServiceRegistry creates a new service registry instance -func NewServiceRegistryWithAddress(logger hclog.Logger, client Client, service, namespace, host, address string) *ServiceRegistry { - return newServiceRegistry(logger, client, service, namespace, host, address) +func NewServiceRegistryWithAddress(logger hclog.Logger, client Client, service, namespace, host, partition, address string) *ServiceRegistry { + return newServiceRegistry(logger, client, service, namespace, partition, host, address) } // NewServiceRegistry creates a new service registry instance From 4bd4e600f5ba2de33e0f0cb4257632736ca2cd92 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Wed, 22 Mar 2023 18:06:37 -0500 Subject: [PATCH 06/17] update path for deregistration --- internal/commands/exec/exec_test.go | 2 +- internal/consul/registration_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/commands/exec/exec_test.go b/internal/commands/exec/exec_test.go index df47c911..aa57614d 100644 --- a/internal/commands/exec/exec_test.go +++ b/internal/commands/exec/exec_test.go @@ -372,7 +372,7 @@ func runMockConsulServer(t *testing.T, opts mockConsulOptions) *mockConsulServer loginPath := "/v1/acl/login" logoutPath := "/v1/acl/logout" registerPath := "/v1/catalog/register" - deregisterPath := "/v1/agent/service/deregister" + deregisterPath := "/v1/catalog/deregister" leafPath := "/v1/agent/connect/ca/leaf" rootPath := "/v1/agent/connect/ca/roots" diff --git a/internal/consul/registration_test.go b/internal/consul/registration_test.go index 3db549ac..619f956d 100644 --- a/internal/consul/registration_test.go +++ b/internal/consul/registration_test.go @@ -138,7 +138,7 @@ func runRegistryServer(t *testing.T, failures uint64, id string) *registryServer server := ®istryServer{} registerPath := "/v1/catalog/register" - deregisterPath := fmt.Sprintf("/v1/agent/service/deregister/%s", id) + deregisterPath := "/v1/catalog/deregister" // Start the fake Consul server. consulServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { From 0ae5282097d79727076cf5c6e8b503bfec869097 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Thu, 23 Mar 2023 10:27:31 -0500 Subject: [PATCH 07/17] clean generate --- internal/consul/registration.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/consul/registration.go b/internal/consul/registration.go index 8927cb48..780110f5 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -7,10 +7,11 @@ import ( "context" "errors" "fmt" - "github.com/hashicorp/go-hclog" "net/http" "time" + "github.com/hashicorp/go-hclog" + "github.com/cenkalti/backoff" "github.com/google/uuid" From e6ba0914b720e78047353f889c35a0a9b3fb6836 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Thu, 23 Mar 2023 10:32:16 -0500 Subject: [PATCH 08/17] added partition to deregistration --- internal/consul/registration.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/consul/registration.go b/internal/consul/registration.go index 780110f5..fe9d7f16 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -277,6 +277,7 @@ func (s *ServiceRegistry) deregister(ctx context.Context) error { Address: s.address, ServiceID: s.id, Namespace: s.namespace, + Partition: s.partition, }, writeOptions.WithContext(ctx)) return err } From 9802d6a879983a3b93f07713affbff347c5c7830 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Thu, 23 Mar 2023 13:36:04 -0500 Subject: [PATCH 09/17] test without service id --- internal/consul/registration.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/consul/registration.go b/internal/consul/registration.go index fe9d7f16..3d1c74c6 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -273,9 +273,9 @@ func (s *ServiceRegistry) Deregister(ctx context.Context) error { func (s *ServiceRegistry) deregister(ctx context.Context) error { writeOptions := &api.WriteOptions{} _, err := s.client.Catalog().Deregister(&api.CatalogDeregistration{ - Node: s.id, - Address: s.address, - ServiceID: s.id, + Node: s.id, + Address: s.address, + //ServiceID: s.id, Namespace: s.namespace, Partition: s.partition, }, writeOptions.WithContext(ctx)) From 93555ae11629d0b3fbca934229543c80039443b1 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Wed, 5 Apr 2023 14:36:48 -0400 Subject: [PATCH 10/17] fixup arg order for NewServiceRegistryWithAddress --- internal/consul/registration.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/consul/registration.go b/internal/consul/registration.go index 3d1c74c6..8e247764 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -48,7 +48,7 @@ type ServiceRegistry struct { } // NewServiceRegistry creates a new service registry instance -func NewServiceRegistryWithAddress(logger hclog.Logger, client Client, service, namespace, host, partition, address string) *ServiceRegistry { +func NewServiceRegistryWithAddress(logger hclog.Logger, client Client, service, namespace, partition, host, address string) *ServiceRegistry { return newServiceRegistry(logger, client, service, namespace, partition, host, address) } From f4a3d91b36fb8454c1d0019929a8c4beb7f00a7c Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Wed, 5 Apr 2023 17:01:14 -0400 Subject: [PATCH 11/17] fix variable naming from attempts/tries to retries --- internal/consul/auth.go | 10 ++--- internal/consul/auth_test.go | 22 +++++----- internal/consul/common.go | 2 +- internal/consul/registration.go | 39 ++++++++-------- internal/consul/registration_test.go | 66 ++++++++++++++-------------- 5 files changed, 70 insertions(+), 69 deletions(-) diff --git a/internal/consul/auth.go b/internal/consul/auth.go index aed50979..aed0b9c3 100644 --- a/internal/consul/auth.go +++ b/internal/consul/auth.go @@ -25,7 +25,7 @@ type Authenticator struct { method string namespace string - tries uint64 + retries uint64 backoffInterval time.Duration } @@ -36,13 +36,13 @@ func NewAuthenticator(logger hclog.Logger, consul *api.Client, method, namespace logger: logger, method: method, namespace: namespace, - tries: defaultMaxAttempts, + retries: defaultMaxRetries, backoffInterval: defaultBackoffInterval, } } -func (a *Authenticator) WithTries(tries uint64) *Authenticator { - a.tries = tries +func (a *Authenticator) WithRetries(retries uint64) *Authenticator { + a.retries = retries return a } @@ -58,7 +58,7 @@ func (a *Authenticator) Authenticate(ctx context.Context, service, bearerToken s a.logger.Error("error authenticating", "error", err) } return err - }, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(a.backoffInterval), a.tries), ctx)) + }, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(a.backoffInterval), a.retries), ctx)) return token, err } diff --git a/internal/consul/auth_test.go b/internal/consul/auth_test.go index 99f6c104..6bd88b61 100644 --- a/internal/consul/auth_test.go +++ b/internal/consul/auth_test.go @@ -30,7 +30,7 @@ func TestAuthenticate(t *testing.T) { service string expectedMeta string failures uint64 - maxAttempts uint64 + maxRetries uint64 fail bool }{{ name: "success-no-namespace", @@ -46,25 +46,25 @@ func TestAuthenticate(t *testing.T) { service: "consul-api-gateway-test", expectedMeta: "consul-api-gateway-test", failures: 3, - maxAttempts: 3, + maxRetries: 3, }, { - name: "retry-failure", - service: "consul-api-gateway-test", - failures: 3, - maxAttempts: 2, - fail: true, + name: "retry-failure", + service: "consul-api-gateway-test", + failures: 3, + maxRetries: 2, + fail: true, }} { t.Run(test.name, func(t *testing.T) { server := runACLServer(t, test.failures) method := gwTesting.RandomString() token := gwTesting.RandomString() - maxAttempts := defaultMaxAttempts - if test.maxAttempts > 0 { - maxAttempts = test.maxAttempts + maxRetries := defaultMaxRetries + if test.maxRetries > 0 { + maxRetries = test.maxRetries } - auth := NewAuthenticator(hclog.NewNullLogger(), server.consul, method, test.namespace).WithTries(maxAttempts) + auth := NewAuthenticator(hclog.NewNullLogger(), server.consul, method, test.namespace).WithRetries(maxRetries) auth.backoffInterval = 0 consulToken, err := auth.Authenticate(context.Background(), test.service, token) diff --git a/internal/consul/common.go b/internal/consul/common.go index 3c40b1b6..1d9d1c37 100644 --- a/internal/consul/common.go +++ b/internal/consul/common.go @@ -8,6 +8,6 @@ import ( ) const ( - defaultMaxAttempts = uint64(30) + defaultMaxRetries = uint64(30) defaultBackoffInterval = 1 * time.Second ) diff --git a/internal/consul/registration.go b/internal/consul/registration.go index 8e247764..a3155f0e 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -40,7 +40,7 @@ type ServiceRegistry struct { tags []string cancel context.CancelFunc - tries uint64 + retries uint64 backoffInterval time.Duration reregistrationInterval time.Duration updateTTLInterval time.Duration @@ -54,16 +54,16 @@ func NewServiceRegistryWithAddress(logger hclog.Logger, client Client, service, // NewServiceRegistry creates a new service registry instance func NewServiceRegistry(logger hclog.Logger, client Client, service, namespace, partition, host string) *ServiceRegistry { + // TODO: this is probably wrong, should this be the consul-http-addr flag value address := "" - //TODO this is probably wrong, should this be the consul-http-addr flag value - nodes, _, err := client.Catalog().Nodes(nil) - if err != nil { - address = "" - } else { - for _, n := range nodes { - address = n.Address - } - } + + // FIXME: this setup call is currently tracked against the max retries allowed + // by the mock Consul server + // nodes, _, err := client.Catalog().Nodes(nil) + // for _, n := range nodes { + // address = n.Address + // } + return newServiceRegistry(logger, client, service, namespace, partition, host, address) } @@ -77,7 +77,7 @@ func newServiceRegistry(logger hclog.Logger, client Client, service, namespace, namespace: namespace, partition: partition, host: host, - tries: defaultMaxAttempts, + retries: defaultMaxRetries, backoffInterval: defaultBackoffInterval, reregistrationInterval: 30 * time.Second, updateTTLInterval: 10 * time.Second, @@ -91,9 +91,9 @@ func (s *ServiceRegistry) WithTags(tags []string) *ServiceRegistry { return s } -// WithTries tells the service registry to retry on any remote operations. -func (s *ServiceRegistry) WithTries(tries uint64) *ServiceRegistry { - s.tries = tries +// WithRetries tells the service registry to retry on any remote operations. +func (s *ServiceRegistry) WithRetries(retries uint64) *ServiceRegistry { + s.retries = retries return s } @@ -212,9 +212,12 @@ func (s *ServiceRegistry) register(ctx context.Context, registration *api.Catalo } func (s *ServiceRegistry) ensureRegistration(ctx context.Context, registration *api.CatalogRegistration) { - _, _, err := s.client.Agent().Service(s.id, &api.QueryOptions{ + // TODO: will this actually return an error for the catalog API, or just an + // empty list? + _, _, err := s.client.Catalog().Service(s.id, "", &api.QueryOptions{ Namespace: s.namespace, }) + if err == nil { return } @@ -242,16 +245,14 @@ func (s *ServiceRegistry) retryRegistration(ctx context.Context, registration *a s.logger.Error("error registering service", "error", err) } return err - }, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.tries), ctx)) + }, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.retries), ctx)) } func (s *ServiceRegistry) registerService(ctx context.Context, registration *api.CatalogRegistration) error { - writeOptions := &api.WriteOptions{} _, err := s.client.Catalog().Register(registration, writeOptions.WithContext(ctx)) return err - //return s.client.Agent().ServiceRegisterOpts(registration, (&api.ServiceRegisterOpts{}).WithContext(ctx)) } // Deregister de-registers a service from Consul. @@ -267,7 +268,7 @@ func (s *ServiceRegistry) Deregister(ctx context.Context) error { s.logger.Error("error deregistering service", "error", err) } return err - }, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.tries), ctx)) + }, backoff.WithContext(backoff.WithMaxRetries(backoff.NewConstantBackOff(s.backoffInterval), s.retries), ctx)) } func (s *ServiceRegistry) deregister(ctx context.Context) error { diff --git a/internal/consul/registration_test.go b/internal/consul/registration_test.go index 619f956d..bb0dafd5 100644 --- a/internal/consul/registration_test.go +++ b/internal/consul/registration_test.go @@ -27,38 +27,38 @@ func TestRegister(t *testing.T) { ctx := context.Background() for _, test := range []struct { - name string - host string - failures uint64 - maxAttempts uint64 - fail bool + name string + host string + failures uint64 + maxRetries uint64 + fail bool }{{ name: "basic-test", host: "localhost", }, { - name: "test-retries", - host: "localhost", - failures: 3, - maxAttempts: 3, + name: "retry-success", + host: "localhost", + failures: 3, + maxRetries: 3, }, { - name: "test-retries-fail", - host: "localhost", - failures: 3, - maxAttempts: 2, - fail: true, + name: "retry-failure", + host: "localhost", + failures: 3, + maxRetries: 2, + fail: true, }} { t.Run(test.name, func(t *testing.T) { id := uuid.New().String() service := gwTesting.RandomString() namespace := gwTesting.RandomString() - maxAttempts := defaultMaxAttempts - if test.maxAttempts > 0 { - maxAttempts = test.maxAttempts + maxRetries := defaultMaxRetries + if test.maxRetries > 0 { + maxRetries = test.maxRetries } server := runRegistryServer(t, test.failures, id) - registry := NewServiceRegistry(hclog.NewNullLogger(), NewTestClient(server.consul), service, namespace, "", test.host).WithTries(maxAttempts) + registry := NewServiceRegistry(hclog.NewNullLogger(), NewTestClient(server.consul), service, namespace, "", test.host).WithRetries(maxRetries) registry.backoffInterval = 0 registry.id = id @@ -85,33 +85,33 @@ func TestRegister(t *testing.T) { func TestDeregister(t *testing.T) { t.Parallel() for _, test := range []struct { - name string - failures uint64 - maxAttempts uint64 - fail bool + name string + failures uint64 + maxRetries uint64 + fail bool }{{ name: "basic-test", }, { - name: "test-retries", - failures: 3, - maxAttempts: 3, + name: "retry-success", + failures: 3, + maxRetries: 3, }, { - name: "test-retries-fail", - failures: 3, - maxAttempts: 2, - fail: true, + name: "retry-fail", + failures: 3, + maxRetries: 2, + fail: true, }} { t.Run(test.name, func(t *testing.T) { id := uuid.New().String() service := gwTesting.RandomString() - maxAttempts := defaultMaxAttempts - if test.maxAttempts > 0 { - maxAttempts = test.maxAttempts + maxRetries := defaultMaxRetries + if test.maxRetries > 0 { + maxRetries = test.maxRetries } server := runRegistryServer(t, test.failures, id) - registry := NewServiceRegistry(hclog.NewNullLogger(), NewTestClient(server.consul), service, "", "", "").WithTries(maxAttempts) + registry := NewServiceRegistry(hclog.NewNullLogger(), NewTestClient(server.consul), service, "", "", "").WithRetries(maxRetries) registry.backoffInterval = 0 registry.id = id err := registry.Deregister(context.Background()) From 5413e694015d23dfa661972fe2b52d799e1f1c70 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Wed, 5 Apr 2023 17:39:15 -0400 Subject: [PATCH 12/17] fixup: missed one WithTries --- internal/commands/exec/exec.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/commands/exec/exec.go b/internal/commands/exec/exec.go index 4ee78743..7c9b18ac 100644 --- a/internal/commands/exec/exec.go +++ b/internal/commands/exec/exec.go @@ -106,7 +106,7 @@ func RunExec(config ExecConfig) (ret int) { config.GatewayConfig.Host, ) if config.isTest { - registry = registry.WithTries(1) + registry = registry.WithRetries(1) } config.Logger.Trace("registering service") From d8405de59828f4eb99f85258ecc38cca783379bc Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Wed, 5 Apr 2023 17:53:17 -0400 Subject: [PATCH 13/17] scripts: fix unbound DOCKER_HOST_ROUTE env var when running e2e make task on macOS --- scripts/e2e_local.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/e2e_local.sh b/scripts/e2e_local.sh index 10e8e41e..66365451 100755 --- a/scripts/e2e_local.sh +++ b/scripts/e2e_local.sh @@ -17,6 +17,7 @@ check_env_vars() { fi # if running on linux the DOCKER_HOST_ROUTE should be set to the docker IP address + DOCKER_HOST_ROUTE="" if [[ "$(uname -s)" == "Linux" ]]; then DOCKER_HOST_ROUTE="172.17.0.1" fi From 022f7fbe2acc2399f0a3c42b1ac279ff8c8140f5 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Fri, 7 Apr 2023 16:07:20 -0400 Subject: [PATCH 14/17] fixup catalog deregistration --- internal/commands/server/k8s_e2e_test.go | 5 +++++ internal/consul/registration.go | 11 ++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/internal/commands/server/k8s_e2e_test.go b/internal/commands/server/k8s_e2e_test.go index a24f4d9f..ed45585e 100644 --- a/internal/commands/server/k8s_e2e_test.go +++ b/internal/commands/server/k8s_e2e_test.go @@ -378,8 +378,13 @@ func TestGatewayBasic(t *testing.T) { Namespace: e2e.ConsulNamespace(ctx), }) if err != nil { + fmt.Printf("ERROR: %#v", err) return false } + fmt.Printf("SERVICES: %d\n", len(services)) + for _, service := range services { + fmt.Printf("%#v\n", service) + } return len(services) == 0 }, checkTimeout, checkInterval, "consul service not deregistered in the allotted time") diff --git a/internal/consul/registration.go b/internal/consul/registration.go index a3155f0e..6266a038 100644 --- a/internal/consul/registration.go +++ b/internal/consul/registration.go @@ -273,13 +273,14 @@ func (s *ServiceRegistry) Deregister(ctx context.Context) error { func (s *ServiceRegistry) deregister(ctx context.Context) error { writeOptions := &api.WriteOptions{} - _, err := s.client.Catalog().Deregister(&api.CatalogDeregistration{ - Node: s.id, - Address: s.address, - //ServiceID: s.id, + dereg := api.CatalogDeregistration{ + Node: s.name, + ServiceID: s.id, Namespace: s.namespace, Partition: s.partition, - }, writeOptions.WithContext(ctx)) + } + fmt.Printf("DEREGISTERING:\n%#v\n", dereg) + _, err := s.client.Catalog().Deregister(&dereg, writeOptions.WithContext(ctx)) return err } From 33451f334a764d239a5420b901d9e6af3ea41c75 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Mon, 10 Apr 2023 10:43:28 -0400 Subject: [PATCH 15/17] reduce e2e assertion timeouts from 5mins to 90seconds --- internal/commands/server/k8s_e2e_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/commands/server/k8s_e2e_test.go b/internal/commands/server/k8s_e2e_test.go index ed45585e..f8240073 100644 --- a/internal/commands/server/k8s_e2e_test.go +++ b/internal/commands/server/k8s_e2e_test.go @@ -432,7 +432,7 @@ func TestGatewayBasic(t *testing.T) { require.Eventually(t, func() bool { entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace) return err == nil && len(entries) == 1 && entries[0].GetName() == gatewayName - }, 5*time.Minute, checkInterval, "ingress-gateway config-entry not created in allotted time") + }, 90*time.Second, checkInterval, "ingress-gateway config-entry not created in allotted time") // De-register Consul service _, err := client.Catalog().Deregister(&api.CatalogDeregistration{ @@ -444,7 +444,7 @@ func TestGatewayBasic(t *testing.T) { require.Eventually(t, func() bool { services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace) return err == nil && len(services) == 0 - }, 5*time.Minute, checkInterval, "service still returned after de-registering") + }, 90*time.Second, checkInterval, "service still returned after de-registering") // Delete ingress-gateway config-entry _, err = client.ConfigEntries().Delete(api.IngressGateway, gatewayName, &api.WriteOptions{Namespace: e2e.ConsulNamespace(ctx)}) @@ -452,17 +452,17 @@ func TestGatewayBasic(t *testing.T) { require.Eventually(t, func() bool { entries, _, err := client.ConfigEntries().List(api.IngressGateway, queryNamespace) return err == nil && len(entries) == 0 - }, 5*time.Minute, checkInterval, "ingress-gateway config entry still returned after deleting") + }, 90*time.Second, checkInterval, "ingress-gateway config entry still returned after deleting") // Check to make sure the controller recreates the service and config-entry in the background. assert.Eventually(t, func() bool { services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace) return err == nil && len(services) == 1 - }, 5*time.Minute, checkInterval, "service not recreated after delete in allotted time") + }, 90*time.Second, checkInterval, "service not recreated after delete in allotted time") assert.Eventually(t, func() bool { entry, _, err := client.ConfigEntries().Get(api.IngressGateway, gatewayName, queryNamespace) return err == nil && entry != nil - }, 5*time.Minute, checkInterval, "ingress-gateway config-entry not recreated after delete in allotted time") + }, 90*time.Second, checkInterval, "ingress-gateway config-entry not recreated after delete in allotted time") // Clean up require.NoError(t, resources.Delete(ctx, gw)) From 01bb036c329c83ee5f14abec10a71e5053c62a50 Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Mon, 10 Apr 2023 11:06:00 -0400 Subject: [PATCH 16/17] e2e: remove assertion checking agent-based anti-entropy behavior --- internal/commands/server/k8s_e2e_test.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/internal/commands/server/k8s_e2e_test.go b/internal/commands/server/k8s_e2e_test.go index f8240073..ff5c6aa8 100644 --- a/internal/commands/server/k8s_e2e_test.go +++ b/internal/commands/server/k8s_e2e_test.go @@ -454,16 +454,6 @@ func TestGatewayBasic(t *testing.T) { return err == nil && len(entries) == 0 }, 90*time.Second, checkInterval, "ingress-gateway config entry still returned after deleting") - // Check to make sure the controller recreates the service and config-entry in the background. - assert.Eventually(t, func() bool { - services, _, err := client.Catalog().Service(gatewayName, "", queryNamespace) - return err == nil && len(services) == 1 - }, 90*time.Second, checkInterval, "service not recreated after delete in allotted time") - assert.Eventually(t, func() bool { - entry, _, err := client.ConfigEntries().Get(api.IngressGateway, gatewayName, queryNamespace) - return err == nil && entry != nil - }, 90*time.Second, checkInterval, "ingress-gateway config-entry not recreated after delete in allotted time") - // Clean up require.NoError(t, resources.Delete(ctx, gw)) assert.Eventually(t, func() bool { From 0b4ed7a309fc9cf4175c2b967534d08357af1bee Mon Sep 17 00:00:00 2001 From: Mike Morris Date: Wed, 26 Apr 2023 20:48:02 -0400 Subject: [PATCH 17/17] WIP debugging --- internal/commands/server/k8s_e2e_test.go | 12 ++++++++++++ internal/k8s/service/resolver.go | 6 ++++++ 2 files changed, 18 insertions(+) diff --git a/internal/commands/server/k8s_e2e_test.go b/internal/commands/server/k8s_e2e_test.go index ff5c6aa8..44f6fda7 100644 --- a/internal/commands/server/k8s_e2e_test.go +++ b/internal/commands/server/k8s_e2e_test.go @@ -624,6 +624,18 @@ func TestHTTPRouteFlattening(t *testing.T) { err = resources.Create(ctx, route) require.NoError(t, err) + // Verify HTTPRoute has updated its status + check := createConditionsCheck([]meta.Condition{ + { + Type: rstatus.RouteConditionAccepted, Status: "True", Reason: rstatus.RouteConditionReasonAccepted, + }, + { + Type: rstatus.RouteConditionResolvedRefs, Status: "True", Reason: rstatus.RouteConditionReasonResolvedRefs, + }, + }) + require.Eventually(t, httpRouteStatusCheck(ctx, resources, gatewayName, routeOneName, namespace, check), checkTimeout, checkInterval, "route status not set in allotted time") + require.Eventually(t, httpRouteStatusCheck(ctx, resources, gatewayName, routeTwoName, namespace, check), checkTimeout, checkInterval, "route status not set in allotted time") + checkRoute(t, checkPort, "/v2/test", httpResponse{ StatusCode: http.StatusOK, Body: serviceTwo.Name, diff --git a/internal/k8s/service/resolver.go b/internal/k8s/service/resolver.go index 9a1da8ba..2de99e97 100644 --- a/internal/k8s/service/resolver.go +++ b/internal/k8s/service/resolver.go @@ -327,7 +327,9 @@ func (r *backendResolver) findGlobalCatalogService(service *corev1.Service) (*Re } filter := fmt.Sprintf(`Meta[%q] == %q and Meta[%q] == %q and Kind != "connect-proxy"`, MetaKeyKubeServiceName, service.Name, MetaKeyKubeNS, service.Namespace) + fmt.Printf("NODES(%d): %#v\n", len(nodes), nodes) for _, node := range nodes { + fmt.Printf("NODE: %#v\n", node) for _, namespace := range namespaces { nodeWithServices, _, err := r.consul.Catalog().Node(node.Node, &api.QueryOptions{ Filter: filter, @@ -337,6 +339,9 @@ func (r *backendResolver) findGlobalCatalogService(service *corev1.Service) (*Re r.logger.Trace("error retrieving node services", "error", err, "node", node.Node) return nil, err } + for _, service := range nodeWithServices.Services { + fmt.Printf("SERVICES: %#v\n", service) + } if len(nodeWithServices.Services) == 0 { continue } @@ -346,6 +351,7 @@ func (r *backendResolver) findGlobalCatalogService(service *corev1.Service) (*Re return nil, err } if resolved != nil { + fmt.Printf("RESOLVED: %#v\n", resolved) return resolved, nil } }