diff --git a/controllers/reconcile.go b/controllers/reconcile.go index 7d1827b..17d6bf8 100644 --- a/controllers/reconcile.go +++ b/controllers/reconcile.go @@ -11,6 +11,7 @@ import ( clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1" "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token" "golang.org/x/mod/semver" "github.com/pkg/errors" @@ -475,7 +476,7 @@ func (r *MicroK8sControlPlaneReconciler) reconcileDelete(ctx context.Context, cl } // clean up MicroK8s cluster secrets - for _, secretName := range []string{"kubeconfig", "ca", "jointoken"} { + for _, secretName := range []string{"kubeconfig", "ca", "jointoken", token.AuthTokenNameSuffix} { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: cluster.Namespace, @@ -594,12 +595,14 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte // The issue is that we were not removing the endpoint from dqlite when we were deleting a machine. // This would cause a situation were a joining node failed to join because the endpoint was already in the dqlite cluster. // How? The IP assigned to the joining (new) node, previously belonged to a node that was deleted, but the IP is still there in dqlite. - // If we have 2 or more machines left, get cluster agent client and delete node from dqlite - if len(machines) > 1 { + // If we have 2 machines, deleting one is not safe because it can be the leader and we're not taking care of + // leadership transfers in the cluster-agent for now. Maybe something for later (TODO) + // If we have 3 or more machines left, get cluster agent client and delete node from dqlite. + if len(machines) > 2 { portRemap := tcp != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration.PortCompatibilityRemap if clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine, portRemap); err == nil { - if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, deleteMachine, portRemap); err != nil { + if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, cluster, deleteMachine, portRemap); err != nil { logger.Error(err, "failed to remove node from dqlite: %w", "machineName", deleteMachine.Name, "nodeName", node.Name) } } else { @@ -646,7 +649,8 @@ func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Ma } // removeMicrok8sNode removes the node from -func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, delMachine clusterv1.Machine, portRemap bool) error { +func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, + clusterKey client.ObjectKey, delMachine clusterv1.Machine, portRemap bool) error { dqlitePort := defaultDqlitePort if portRemap { // https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102 @@ -665,7 +669,12 @@ func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Contex return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name) } - if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, removeEp); err != nil { + token, err := token.Lookup(ctx, r.Client, clusterKey) + if err != nil { + return fmt.Errorf("failed to lookup token: %w", err) + } + + if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, token, removeEp); err != nil { return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err) } diff --git a/go.mod b/go.mod index ccb6f47..68f5586 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( cloud.google.com/go/compute v1.10.0 // indirect github.com/blang/semver v3.5.1+incompatible // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect diff --git a/pkg/clusteragent/clusteragent.go b/pkg/clusteragent/clusteragent.go index bb1e7a5..dce3dca 100644 --- a/pkg/clusteragent/clusteragent.go +++ b/pkg/clusteragent/clusteragent.go @@ -69,10 +69,10 @@ func (c *Client) Endpoint() string { return fmt.Sprintf("https://%s:%s", c.ip, c.port) } -// Do makes a request to the given endpoint with the given method. It marshals the request and unmarshals +// do makes a request to the given endpoint with the given method. It marshals the request and unmarshals // server response body if the provided response is not nil. // The endpoint should _not_ have a leading slash. -func (c *Client) Do(ctx context.Context, method, endpoint string, request any, response any) error { +func (c *Client) do(ctx context.Context, method, endpoint string, request any, header map[string][]string, response any) error { url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint) requestBody, err := json.Marshal(request) @@ -85,6 +85,8 @@ func (c *Client) Do(ctx context.Context, method, endpoint string, request any, r return fmt.Errorf("failed to create request: %w", err) } + req.Header = http.Header(header) + res, err := c.client.Do(req) if err != nil { return fmt.Errorf("failed to call cluster agent: %w", err) diff --git a/pkg/clusteragent/clusteragent_test.go b/pkg/clusteragent/clusteragent_test.go index d87df55..25dfa69 100644 --- a/pkg/clusteragent/clusteragent_test.go +++ b/pkg/clusteragent/clusteragent_test.go @@ -1,13 +1,11 @@ -package clusteragent_test +package clusteragent import ( "context" - "encoding/json" "fmt" "math/rand" "net" "net/http" - "net/http/httptest" "strings" "testing" "time" @@ -15,9 +13,9 @@ import ( . "github.com/onsi/gomega" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - - "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/httptest" ) func TestClient(t *testing.T) { @@ -26,7 +24,7 @@ func TestClient(t *testing.T) { // Machines don't have any addresses. machines := []clusterv1.Machine{{}, {}} - _, err := clusteragent.NewClient(machines, "25000", time.Second, clusteragent.Options{}) + _, err := NewClient(machines, "25000", time.Second, Options{}) g.Expect(err).To(HaveOccurred()) @@ -46,7 +44,7 @@ func TestClient(t *testing.T) { }, }, } - _, err = clusteragent.NewClient(machines, "25000", time.Second, clusteragent.Options{IgnoreMachineNames: sets.NewString(ignoreName)}) + _, err = NewClient(machines, "25000", time.Second, Options{IgnoreMachineNames: sets.NewString(ignoreName)}) g.Expect(err).To(HaveOccurred()) }) @@ -103,14 +101,14 @@ func TestClient(t *testing.T) { }, } - opts := clusteragent.Options{ + opts := Options{ IgnoreMachineNames: sets.NewString(ignoreName), } // NOTE(Hue): Repeat the test to make sure the ignored machine's IP is not picked by chance (reduce flakiness). for i := 0; i < 100; i++ { machines = shuffleMachines(machines) - c, err := clusteragent.NewClient(machines, port, time.Second, opts) + c, err := NewClient(machines, port, time.Second, opts) g.Expect(err).ToNot(HaveOccurred()) @@ -130,12 +128,12 @@ func TestDo(t *testing.T) { resp := map[string]string{ "key": "value", } - servM := NewServerMock(method, path, resp) - defer servM.ts.Close() + servM := httptest.NewServerMock(method, path, resp) + defer servM.Srv.Close() - ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://")) + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.Srv.URL, "https://")) g.Expect(err).ToNot(HaveOccurred()) - c, err := clusteragent.NewClient([]clusterv1.Machine{ + c, err := NewClient([]clusterv1.Machine{ { Status: clusterv1.MachineStatus{ Addresses: clusterv1.MachineAddresses{ @@ -145,64 +143,18 @@ func TestDo(t *testing.T) { }, }, }, - }, port, time.Second, clusteragent.Options{}) + }, port, time.Second, Options{}) g.Expect(err).ToNot(HaveOccurred()) response := make(map[string]string) req := map[string]string{"req": "value"} path = strings.TrimPrefix(path, "/") - g.Expect(c.Do(context.Background(), method, path, req, &response)).To(Succeed()) + g.Expect(c.do(context.Background(), method, path, req, nil, &response)).To(Succeed()) g.Expect(response).To(Equal(resp)) } -type serverMock struct { - method string - path string - response any - request map[string]any - ts *httptest.Server -} - -// NewServerMock creates a test server that responds with the given response when called with the given method and path. -// Make sure to close the server after the test is done. -// Server will try to decode the request body into a map[string]any. -func NewServerMock(method string, path string, response any) *serverMock { - req := make(map[string]any) - ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != path { - http.NotFound(w, r) - return - } - if r.Method != method { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - if response != nil { - if err := json.NewEncoder(w).Encode(response); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - } - w.WriteHeader(http.StatusOK) - })) - - return &serverMock{ - method: method, - path: path, - response: response, - request: req, - ts: ts, - } -} - func shuffleMachines(src []clusterv1.Machine) []clusterv1.Machine { dest := make([]clusterv1.Machine, len(src)) perm := rand.Perm(len(src)) diff --git a/pkg/clusteragent/const.go b/pkg/clusteragent/const.go new file mode 100644 index 0000000..b997955 --- /dev/null +++ b/pkg/clusteragent/const.go @@ -0,0 +1,5 @@ +package clusteragent + +const ( + AuthTokenHeader = "capi-auth-token" +) diff --git a/pkg/clusteragent/remove_node.go b/pkg/clusteragent/remove_node.go index 20e62e8..59ca691 100644 --- a/pkg/clusteragent/remove_node.go +++ b/pkg/clusteragent/remove_node.go @@ -7,7 +7,10 @@ import ( // RemoveNodeFromDqlite calls the /v2/dqlite/remove endpoint on cluster agent to remove the given address from Dqlite. // The endpoint should be in the format of "address:port". -func (p *Client) RemoveNodeFromDqlite(ctx context.Context, removeEp string) error { - request := map[string]string{"removeEndpoint": removeEp} - return p.Do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, nil) +func (p *Client) RemoveNodeFromDqlite(ctx context.Context, token string, removeEp string) error { + request := map[string]string{"remove_endpoint": removeEp} + header := map[string][]string{ + AuthTokenHeader: {token}, + } + return p.do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, header, nil) } diff --git a/pkg/clusteragent/remove_node_test.go b/pkg/clusteragent/remove_node_test.go index c4fd74b..ef79633 100644 --- a/pkg/clusteragent/remove_node_test.go +++ b/pkg/clusteragent/remove_node_test.go @@ -9,20 +9,22 @@ import ( "time" . "github.com/onsi/gomega" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" - clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/httptest" ) func TestRemoveFromDqlite(t *testing.T) { g := NewWithT(t) path := "/cluster/api/v2.0/dqlite/remove" + token := "myRandomToken" method := http.MethodPost - servM := NewServerMock(method, path, nil) - defer servM.ts.Close() + servM := httptest.NewServerMock(method, path, nil) + defer servM.Srv.Close() - ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://")) + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.Srv.URL, "https://")) g.Expect(err).ToNot(HaveOccurred()) c, err := clusteragent.NewClient([]clusterv1.Machine{ { @@ -37,6 +39,7 @@ func TestRemoveFromDqlite(t *testing.T) { }, port, time.Second, clusteragent.Options{}) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(c.RemoveNodeFromDqlite(context.Background(), "1.1.1.1:1234")).To(Succeed()) - g.Expect(servM.request).To(HaveKeyWithValue("removeEndpoint", "1.1.1.1:1234")) + g.Expect(c.RemoveNodeFromDqlite(context.Background(), token, "1.1.1.1:1234")).To(Succeed()) + g.Expect(servM.Request).To(HaveKeyWithValue("remove_endpoint", "1.1.1.1:1234")) + g.Expect(servM.Header.Get(clusteragent.AuthTokenHeader)).To(Equal(token)) } diff --git a/pkg/httptest/httptest.go b/pkg/httptest/httptest.go new file mode 100644 index 0000000..055bfe3 --- /dev/null +++ b/pkg/httptest/httptest.go @@ -0,0 +1,62 @@ +package httptest + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" +) + +type serverMock struct { + Method string + Path string + Response any + Request map[string]any + Header http.Header + Srv *httptest.Server +} + +// NewServerMock creates a test server that responds with the given response when called with the given method and path. +// Make sure to close the server after the test is done. +// Server will try to decode the request body into a map[string]any. +func NewServerMock(method string, path string, response any) *serverMock { + req := make(map[string]any) + header := make(map[string][]string) + ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != path { + http.NotFound(w, r) + return + } + if r.Method != method { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + for k, vv := range map[string][]string(r.Header) { + header[k] = vv + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if response != nil { + if err := json.NewEncoder(w).Encode(response); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + } + w.WriteHeader(http.StatusOK) + })) + + fmt.Println("header:", header) + return &serverMock{ + Method: method, + Path: path, + Response: response, + Header: header, + Request: req, + Srv: ts, + } +} diff --git a/pkg/token/token.go b/pkg/token/token.go new file mode 100644 index 0000000..5db3aec --- /dev/null +++ b/pkg/token/token.go @@ -0,0 +1,47 @@ +package token + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + AuthTokenNameSuffix = "capi-auth-token" +) + +// Lookup retrieves the token for the given cluster. +func Lookup(ctx context.Context, c client.Client, clusterKey client.ObjectKey) (string, error) { + secret, err := getSecret(ctx, c, clusterKey) + if err != nil { + return "", fmt.Errorf("failed to get secret: %w", err) + } + + v, ok := secret.Data["token"] + if !ok { + return "", fmt.Errorf("token not found in secret") + } + + return string(v), nil +} + +// getSecret retrieves the token secret for the given cluster. +func getSecret(ctx context.Context, c client.Client, clusterKey client.ObjectKey) (*corev1.Secret, error) { + s := &corev1.Secret{} + key := client.ObjectKey{ + Name: authTokenName(clusterKey.Name), + Namespace: clusterKey.Namespace, + } + if err := c.Get(ctx, key, s); err != nil { + return nil, fmt.Errorf("failed to get secret: %w", err) + } + + return s, nil +} + +// authTokenName returns the name of the auth-token secret, computed by convention using the name of the cluster. +func authTokenName(clusterName string) string { + return fmt.Sprintf("%s-%s", clusterName, AuthTokenNameSuffix) +} diff --git a/pkg/token/token_test.go b/pkg/token/token_test.go new file mode 100644 index 0000000..c1868b4 --- /dev/null +++ b/pkg/token/token_test.go @@ -0,0 +1,50 @@ +package token_test + +import ( + "context" + "fmt" + "testing" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token" +) + +func TestReconcile(t *testing.T) { + t.Run("LookupFailsIfNoSecret", func(t *testing.T) { + namespace := "test-namespace" + clusterName := "test-cluster" + c := fake.NewClientBuilder().Build() + + g := NewWithT(t) + + _, err := token.Lookup(context.Background(), c, client.ObjectKey{Name: clusterName, Namespace: namespace}) + g.Expect(err).To(HaveOccurred()) + }) + + t.Run("LookupSucceedsIfSecretExists", func(t *testing.T) { + namespace := "test-namespace" + clusterName := "test-cluster" + expToken := "test-token" + secret := &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", clusterName, token.AuthTokenNameSuffix), + Namespace: namespace, + }, + Data: map[string][]byte{ + "token": []byte(expToken), + }, + } + c := fake.NewClientBuilder().WithObjects(secret).Build() + + g := NewWithT(t) + + token, err := token.Lookup(context.Background(), c, client.ObjectKey{Name: clusterName, Namespace: namespace}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(token).To(Equal(expToken)) + }) +}