From 5ed7ddfad9f0492bee3f28c5173416a2d08fb3b1 Mon Sep 17 00:00:00 2001 From: Homayoon Alimohammadi Date: Wed, 2 Oct 2024 16:08:43 +0400 Subject: [PATCH 1/3] Call node removal endpoint on machine deletion (#66) --- controllers/reconcile.go | 77 ++++++++++ pkg/clusteragent/clusteragent.go | 115 ++++++++++++++ pkg/clusteragent/clusteragent_test.go | 213 ++++++++++++++++++++++++++ pkg/clusteragent/remove_node.go | 13 ++ pkg/clusteragent/remove_node_test.go | 42 +++++ 5 files changed, 460 insertions(+) create mode 100644 pkg/clusteragent/clusteragent.go create mode 100644 pkg/clusteragent/clusteragent_test.go create mode 100644 pkg/clusteragent/remove_node.go create mode 100644 pkg/clusteragent/remove_node_test.go diff --git a/controllers/reconcile.go b/controllers/reconcile.go index b6cb197..7d1827b 100644 --- a/controllers/reconcile.go +++ b/controllers/reconcile.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "math/rand" + "net" "sort" "strings" "time" clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" "golang.org/x/mod/semver" "github.com/pkg/errors" @@ -16,6 +18,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/storage/names" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/controllers/external" @@ -28,6 +31,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) +const ( + defaultClusterAgentPort string = "25000" + defaultDqlitePort string = "19001" + defaultClusterAgentClientTimeout time.Duration = 10 * time.Second +) + type errServiceUnhealthy struct { service string reason string @@ -578,6 +587,26 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte node := deleteMachine.Status.NodeRef logger = logger.WithValues("machineName", deleteMachine.Name, "nodeName", node.Name) + + logger.Info("deleting node from dqlite", "machineName", deleteMachine.Name, "nodeName", node.Name) + + // NOTE(Hue): We do this step as a best effort since this whole logic is implemented to prevent a not-yet-reported bug. + // The issue is that we were not removing the endpoint from dqlite when we were deleting a machine. + // This would cause a situation were a joining node failed to join because the endpoint was already in the dqlite cluster. + // How? The IP assigned to the joining (new) node, previously belonged to a node that was deleted, but the IP is still there in dqlite. + // If we have 2 or more machines left, get cluster agent client and delete node from dqlite + if len(machines) > 1 { + portRemap := tcp != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration.PortCompatibilityRemap + + if clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine, portRemap); err == nil { + if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, deleteMachine, portRemap); err != nil { + logger.Error(err, "failed to remove node from dqlite: %w", "machineName", deleteMachine.Name, "nodeName", node.Name) + } + } else { + logger.Error(err, "failed to get cluster agent client") + } + } + logger.Info("deleting machine") err = r.Client.Delete(ctx, &deleteMachine) @@ -595,6 +624,54 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte return ctrl.Result{Requeue: true}, nil } +func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Machine, portRemap bool) (*clusteragent.Client, error) { + opts := clusteragent.Options{ + // NOTE(hue): We want to pick a random machine's IP to call POST /dqlite/remove on its cluster agent endpoint. + // This machine should preferably not be the itself, although this is not forced by Microk8s. + IgnoreMachineNames: sets.NewString(delMachine.Name), + } + + port := defaultClusterAgentPort + if portRemap { + // https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102 + port = "30000" + } + + clusterAgentClient, err := clusteragent.NewClient(machines, port, defaultClusterAgentClientTimeout, opts) + if err != nil { + return nil, fmt.Errorf("failed to initialize cluster agent client: %w", err) + } + + return clusterAgentClient, nil +} + +// removeMicrok8sNode removes the node from +func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, delMachine clusterv1.Machine, portRemap bool) error { + dqlitePort := defaultDqlitePort + if portRemap { + // https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102 + dqlitePort = "2379" + } + + var removeEp string + for _, addr := range delMachine.Status.Addresses { + if net.ParseIP(addr.Address) != nil { + removeEp = fmt.Sprintf("%s:%s", addr.Address, dqlitePort) + break + } + } + + if removeEp == "" { + return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name) + } + + if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, removeEp); err != nil { + return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err) + } + + return nil +} + func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) { nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v") diff --git a/pkg/clusteragent/clusteragent.go b/pkg/clusteragent/clusteragent.go new file mode 100644 index 0000000..61b3df4 --- /dev/null +++ b/pkg/clusteragent/clusteragent.go @@ -0,0 +1,115 @@ +package clusteragent + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) + +// Options should be used when initializing a new client. +type Options struct { + // IgnoreMachineNames is a set of ignored machine names that we don't want to pick their IP for the cluster agent endpoint. + IgnoreMachineNames sets.String + // InsecureSkipVerify skips the verification of the server's certificate chain and host name. + // This is mostly used for testing purposes. + InsecureSkipVerify bool +} + +type Client struct { + ip, port string + client *http.Client +} + +// NewClient picks an IP from one of the given machines and creates a new client for the cluster agent +// with that IP. +func NewClient(machines []clusterv1.Machine, port string, timeout time.Duration, opts Options) (*Client, error) { + var ip string + for _, m := range machines { + if opts.IgnoreMachineNames.Has(m.Name) { + continue + } + + for _, addr := range m.Status.Addresses { + if net.ParseIP(addr.Address) != nil { + ip = addr.Address + break + } + } + break + } + + if ip == "" { + return nil, errors.New("failed to find an IP for cluster agent") + } + + transport := &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: opts.InsecureSkipVerify, + }, + } + + return &Client{ + ip: ip, + port: port, + client: &http.Client{ + Timeout: timeout, + Transport: transport, + }, + }, nil +} + +func (c *Client) Endpoint() string { + return fmt.Sprintf("https://%s:%s", c.ip, c.port) +} + +// Do makes a request to the given endpoint with the given method. It marshals the request and unmarshals +// server response body if the provided response is not nil. +// The endpoint should _not_ have a leading slash. +func (c *Client) Do(ctx context.Context, method, endpoint string, request any, response any) error { + url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint) + + requestBody, err := json.Marshal(request) + if err != nil { + return fmt.Errorf("failed to prepare worker info request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(requestBody)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + res, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("failed to call cluster agent: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + // NOTE(hue): Marshal and print any response that we got since it might contain valuable information + // on why the request failed. + // Ignore JSON errors to prevent unnecessarily complicated error handling. + anyResp := make(map[string]any) + _ = json.NewDecoder(res.Body).Decode(&anyResp) + b, _ := json.Marshal(anyResp) + resStr := string(b) + + return fmt.Errorf("HTTP request to cluster agent failed with status code %d, got response: %q", res.StatusCode, resStr) + } + + if response != nil { + if err := json.NewDecoder(res.Body).Decode(response); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + } + + return nil +} diff --git a/pkg/clusteragent/clusteragent_test.go b/pkg/clusteragent/clusteragent_test.go new file mode 100644 index 0000000..adfa196 --- /dev/null +++ b/pkg/clusteragent/clusteragent_test.go @@ -0,0 +1,213 @@ +package clusteragent_test + +import ( + "context" + "encoding/json" + "fmt" + "math/rand" + "net" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + . "github.com/onsi/gomega" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) + +func TestClient(t *testing.T) { + t.Run("CanNotFindAddress", func(t *testing.T) { + g := NewWithT(t) + + // Machines don't have any addresses. + machines := []clusterv1.Machine{{}, {}} + _, err := clusteragent.NewClient(machines, "25000", time.Second, clusteragent.Options{}) + + g.Expect(err).To(HaveOccurred()) + + // The only machine is the ignored one. + ignoreName := "ignore" + machines = []clusterv1.Machine{ + { + ObjectMeta: v1.ObjectMeta{ + Name: ignoreName, + }, + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: "1.1.1.1", + }, + }, + }, + }, + } + _, err = clusteragent.NewClient(machines, "25000", time.Second, clusteragent.Options{IgnoreMachineNames: sets.NewString(ignoreName)}) + + g.Expect(err).To(HaveOccurred()) + }) + + t.Run("CorrectEndpoint", func(t *testing.T) { + g := NewWithT(t) + + port := "30000" + firstAddr := "1.1.1.1" + secondAddr := "2.2.2.2" + thirdAddr := "3.3.3.3" + + ignoreName := "ignore" + ignoreAddr := "8.8.8.8" + machines := []clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: firstAddr, + }, + }, + }, + }, + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: secondAddr, + }, + }, + }, + }, + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: thirdAddr, + }, + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: ignoreName, + }, + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: ignoreAddr, + }, + }, + }, + }, + } + + opts := clusteragent.Options{ + IgnoreMachineNames: sets.NewString(ignoreName), + } + + // NOTE(Hue): Repeat the test to make sure the ignored machine's IP is not picked by chance (reduce flakiness). + for i := 0; i < 100; i++ { + machines = shuffleMachines(machines) + c, err := clusteragent.NewClient(machines, port, time.Second, opts) + + g.Expect(err).ToNot(HaveOccurred()) + + // Check if the endpoint is one of the expected ones and not the ignored one. + g.Expect([]string{fmt.Sprintf("https://%s:%s", firstAddr, port), fmt.Sprintf("https://%s:%s", secondAddr, port), fmt.Sprintf("https://%s:%s", thirdAddr, port)}).To(ContainElement(c.Endpoint())) + g.Expect(c.Endpoint()).ToNot(Equal(fmt.Sprintf("https://%s:%s", ignoreAddr, port))) + } + + }) +} + +func TestDo(t *testing.T) { + g := NewWithT(t) + + path := "/random/path" + method := http.MethodPost + resp := map[string]string{ + "key": "value", + } + servM := NewServerMock(method, path, resp) + defer servM.ts.Close() + + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://")) + g.Expect(err).ToNot(HaveOccurred()) + c, err := clusteragent.NewClient([]clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: ip, + }, + }, + }, + }, + }, port, time.Second, clusteragent.Options{InsecureSkipVerify: true}) + + g.Expect(err).ToNot(HaveOccurred()) + + response := make(map[string]string) + req := map[string]string{"req": "value"} + path = strings.TrimPrefix(path, "/") + g.Expect(c.Do(context.Background(), method, path, req, &response)).To(Succeed()) + + g.Expect(response).To(Equal(resp)) +} + +type serverMock struct { + method string + path string + response any + request map[string]any + ts *httptest.Server +} + +// NewServerMock creates a test server that responds with the given response when called with the given method and path. +// Make sure to close the server after the test is done. +// Server will try to decode the request body into a map[string]any. +func NewServerMock(method string, path string, response any) *serverMock { + req := make(map[string]any) + ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != path { + http.NotFound(w, r) + return + } + if r.Method != method { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if response != nil { + if err := json.NewEncoder(w).Encode(response); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + } + w.WriteHeader(http.StatusOK) + })) + + return &serverMock{ + method: method, + path: path, + response: response, + request: req, + ts: ts, + } +} + +func shuffleMachines(src []clusterv1.Machine) []clusterv1.Machine { + dest := make([]clusterv1.Machine, len(src)) + perm := rand.Perm(len(src)) + for i, v := range perm { + dest[v] = src[i] + } + return dest +} diff --git a/pkg/clusteragent/remove_node.go b/pkg/clusteragent/remove_node.go new file mode 100644 index 0000000..20e62e8 --- /dev/null +++ b/pkg/clusteragent/remove_node.go @@ -0,0 +1,13 @@ +package clusteragent + +import ( + "context" + "net/http" +) + +// RemoveNodeFromDqlite calls the /v2/dqlite/remove endpoint on cluster agent to remove the given address from Dqlite. +// The endpoint should be in the format of "address:port". +func (p *Client) RemoveNodeFromDqlite(ctx context.Context, removeEp string) error { + request := map[string]string{"removeEndpoint": removeEp} + return p.Do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, nil) +} diff --git a/pkg/clusteragent/remove_node_test.go b/pkg/clusteragent/remove_node_test.go new file mode 100644 index 0000000..961f868 --- /dev/null +++ b/pkg/clusteragent/remove_node_test.go @@ -0,0 +1,42 @@ +package clusteragent_test + +import ( + "context" + "net" + "net/http" + "strings" + "testing" + "time" + + . "github.com/onsi/gomega" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) + +func TestRemoveFromDqlite(t *testing.T) { + g := NewWithT(t) + + path := "/cluster/api/v2.0/dqlite/remove" + method := http.MethodPost + servM := NewServerMock(method, path, nil) + defer servM.ts.Close() + + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://")) + g.Expect(err).ToNot(HaveOccurred()) + c, err := clusteragent.NewClient([]clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: ip, + }, + }, + }, + }, + }, port, time.Second, clusteragent.Options{InsecureSkipVerify: true}) + + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(c.RemoveNodeFromDqlite(context.Background(), "1.1.1.1:1234")).To(Succeed()) + g.Expect(servM.request).To(HaveKeyWithValue("removeEndpoint", "1.1.1.1:1234")) +} From b311e15d7b50f2f9ffab520d49cefb03eaf77b42 Mon Sep 17 00:00:00 2001 From: Homayoon Alimohammadi Date: Thu, 3 Oct 2024 15:43:22 +0400 Subject: [PATCH 2/3] Add pkg to Dockerfile and Fix TLS issue (#67) --- Dockerfile | 1 + pkg/clusteragent/clusteragent.go | 20 +++++++++----------- pkg/clusteragent/clusteragent_test.go | 2 +- pkg/clusteragent/remove_node_test.go | 2 +- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/Dockerfile b/Dockerfile index 9c1de47..352d8c4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,6 +15,7 @@ RUN go mod download COPY main.go main.go COPY api/ api/ COPY controllers/ controllers/ +COPY pkg/ pkg/ # Build RUN CGO_ENABLED=0 GOOS=linux GOARCH=$arch go build -a -ldflags '-s -w' -o manager main.go diff --git a/pkg/clusteragent/clusteragent.go b/pkg/clusteragent/clusteragent.go index 61b3df4..bb1e7a5 100644 --- a/pkg/clusteragent/clusteragent.go +++ b/pkg/clusteragent/clusteragent.go @@ -19,9 +19,6 @@ import ( type Options struct { // IgnoreMachineNames is a set of ignored machine names that we don't want to pick their IP for the cluster agent endpoint. IgnoreMachineNames sets.String - // InsecureSkipVerify skips the verification of the server's certificate chain and host name. - // This is mostly used for testing purposes. - InsecureSkipVerify bool } type Client struct { @@ -51,18 +48,19 @@ func NewClient(machines []clusterv1.Machine, port string, timeout time.Duration, return nil, errors.New("failed to find an IP for cluster agent") } - transport := &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: opts.InsecureSkipVerify, - }, - } - return &Client{ ip: ip, port: port, client: &http.Client{ - Timeout: timeout, - Transport: transport, + Timeout: timeout, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + // TODO(Hue): Workaround for now, address later on + // get the certificate fingerprint from the matching node through a resource in the cluster (TBD), + // and validate it in the TLSClientConfig + InsecureSkipVerify: true, + }, + }, }, }, nil } diff --git a/pkg/clusteragent/clusteragent_test.go b/pkg/clusteragent/clusteragent_test.go index adfa196..d87df55 100644 --- a/pkg/clusteragent/clusteragent_test.go +++ b/pkg/clusteragent/clusteragent_test.go @@ -145,7 +145,7 @@ func TestDo(t *testing.T) { }, }, }, - }, port, time.Second, clusteragent.Options{InsecureSkipVerify: true}) + }, port, time.Second, clusteragent.Options{}) g.Expect(err).ToNot(HaveOccurred()) diff --git a/pkg/clusteragent/remove_node_test.go b/pkg/clusteragent/remove_node_test.go index 961f868..c4fd74b 100644 --- a/pkg/clusteragent/remove_node_test.go +++ b/pkg/clusteragent/remove_node_test.go @@ -34,7 +34,7 @@ func TestRemoveFromDqlite(t *testing.T) { }, }, }, - }, port, time.Second, clusteragent.Options{InsecureSkipVerify: true}) + }, port, time.Second, clusteragent.Options{}) g.Expect(err).ToNot(HaveOccurred()) g.Expect(c.RemoveNodeFromDqlite(context.Background(), "1.1.1.1:1234")).To(Succeed()) From 2258ddc1460e3aca06e23b79e81cbfb6bfa44b50 Mon Sep 17 00:00:00 2001 From: Homayoon Alimohammadi Date: Fri, 11 Oct 2024 14:47:12 +0400 Subject: [PATCH 3/3] Add capi-auth-token header to /dqlite/remove request (#68) --- controllers/reconcile.go | 21 +++++--- go.mod | 1 + pkg/clusteragent/clusteragent.go | 6 ++- pkg/clusteragent/clusteragent_test.go | 74 +++++---------------------- pkg/clusteragent/const.go | 5 ++ pkg/clusteragent/remove_node.go | 9 ++-- pkg/clusteragent/remove_node_test.go | 15 +++--- pkg/httptest/httptest.go | 62 ++++++++++++++++++++++ pkg/token/token.go | 47 +++++++++++++++++ pkg/token/token_test.go | 50 ++++++++++++++++++ 10 files changed, 212 insertions(+), 78 deletions(-) create mode 100644 pkg/clusteragent/const.go create mode 100644 pkg/httptest/httptest.go create mode 100644 pkg/token/token.go create mode 100644 pkg/token/token_test.go diff --git a/controllers/reconcile.go b/controllers/reconcile.go index 7d1827b..17d6bf8 100644 --- a/controllers/reconcile.go +++ b/controllers/reconcile.go @@ -11,6 +11,7 @@ import ( clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1" "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token" "golang.org/x/mod/semver" "github.com/pkg/errors" @@ -475,7 +476,7 @@ func (r *MicroK8sControlPlaneReconciler) reconcileDelete(ctx context.Context, cl } // clean up MicroK8s cluster secrets - for _, secretName := range []string{"kubeconfig", "ca", "jointoken"} { + for _, secretName := range []string{"kubeconfig", "ca", "jointoken", token.AuthTokenNameSuffix} { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: cluster.Namespace, @@ -594,12 +595,14 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte // The issue is that we were not removing the endpoint from dqlite when we were deleting a machine. // This would cause a situation were a joining node failed to join because the endpoint was already in the dqlite cluster. // How? The IP assigned to the joining (new) node, previously belonged to a node that was deleted, but the IP is still there in dqlite. - // If we have 2 or more machines left, get cluster agent client and delete node from dqlite - if len(machines) > 1 { + // If we have 2 machines, deleting one is not safe because it can be the leader and we're not taking care of + // leadership transfers in the cluster-agent for now. Maybe something for later (TODO) + // If we have 3 or more machines left, get cluster agent client and delete node from dqlite. + if len(machines) > 2 { portRemap := tcp != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration.PortCompatibilityRemap if clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine, portRemap); err == nil { - if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, deleteMachine, portRemap); err != nil { + if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, cluster, deleteMachine, portRemap); err != nil { logger.Error(err, "failed to remove node from dqlite: %w", "machineName", deleteMachine.Name, "nodeName", node.Name) } } else { @@ -646,7 +649,8 @@ func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Ma } // removeMicrok8sNode removes the node from -func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, delMachine clusterv1.Machine, portRemap bool) error { +func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, + clusterKey client.ObjectKey, delMachine clusterv1.Machine, portRemap bool) error { dqlitePort := defaultDqlitePort if portRemap { // https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102 @@ -665,7 +669,12 @@ func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Contex return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name) } - if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, removeEp); err != nil { + token, err := token.Lookup(ctx, r.Client, clusterKey) + if err != nil { + return fmt.Errorf("failed to lookup token: %w", err) + } + + if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, token, removeEp); err != nil { return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err) } diff --git a/go.mod b/go.mod index ccb6f47..68f5586 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( cloud.google.com/go/compute v1.10.0 // indirect github.com/blang/semver v3.5.1+incompatible // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect diff --git a/pkg/clusteragent/clusteragent.go b/pkg/clusteragent/clusteragent.go index bb1e7a5..dce3dca 100644 --- a/pkg/clusteragent/clusteragent.go +++ b/pkg/clusteragent/clusteragent.go @@ -69,10 +69,10 @@ func (c *Client) Endpoint() string { return fmt.Sprintf("https://%s:%s", c.ip, c.port) } -// Do makes a request to the given endpoint with the given method. It marshals the request and unmarshals +// do makes a request to the given endpoint with the given method. It marshals the request and unmarshals // server response body if the provided response is not nil. // The endpoint should _not_ have a leading slash. -func (c *Client) Do(ctx context.Context, method, endpoint string, request any, response any) error { +func (c *Client) do(ctx context.Context, method, endpoint string, request any, header map[string][]string, response any) error { url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint) requestBody, err := json.Marshal(request) @@ -85,6 +85,8 @@ func (c *Client) Do(ctx context.Context, method, endpoint string, request any, r return fmt.Errorf("failed to create request: %w", err) } + req.Header = http.Header(header) + res, err := c.client.Do(req) if err != nil { return fmt.Errorf("failed to call cluster agent: %w", err) diff --git a/pkg/clusteragent/clusteragent_test.go b/pkg/clusteragent/clusteragent_test.go index d87df55..25dfa69 100644 --- a/pkg/clusteragent/clusteragent_test.go +++ b/pkg/clusteragent/clusteragent_test.go @@ -1,13 +1,11 @@ -package clusteragent_test +package clusteragent import ( "context" - "encoding/json" "fmt" "math/rand" "net" "net/http" - "net/http/httptest" "strings" "testing" "time" @@ -15,9 +13,9 @@ import ( . "github.com/onsi/gomega" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - - "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/httptest" ) func TestClient(t *testing.T) { @@ -26,7 +24,7 @@ func TestClient(t *testing.T) { // Machines don't have any addresses. machines := []clusterv1.Machine{{}, {}} - _, err := clusteragent.NewClient(machines, "25000", time.Second, clusteragent.Options{}) + _, err := NewClient(machines, "25000", time.Second, Options{}) g.Expect(err).To(HaveOccurred()) @@ -46,7 +44,7 @@ func TestClient(t *testing.T) { }, }, } - _, err = clusteragent.NewClient(machines, "25000", time.Second, clusteragent.Options{IgnoreMachineNames: sets.NewString(ignoreName)}) + _, err = NewClient(machines, "25000", time.Second, Options{IgnoreMachineNames: sets.NewString(ignoreName)}) g.Expect(err).To(HaveOccurred()) }) @@ -103,14 +101,14 @@ func TestClient(t *testing.T) { }, } - opts := clusteragent.Options{ + opts := Options{ IgnoreMachineNames: sets.NewString(ignoreName), } // NOTE(Hue): Repeat the test to make sure the ignored machine's IP is not picked by chance (reduce flakiness). for i := 0; i < 100; i++ { machines = shuffleMachines(machines) - c, err := clusteragent.NewClient(machines, port, time.Second, opts) + c, err := NewClient(machines, port, time.Second, opts) g.Expect(err).ToNot(HaveOccurred()) @@ -130,12 +128,12 @@ func TestDo(t *testing.T) { resp := map[string]string{ "key": "value", } - servM := NewServerMock(method, path, resp) - defer servM.ts.Close() + servM := httptest.NewServerMock(method, path, resp) + defer servM.Srv.Close() - ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://")) + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.Srv.URL, "https://")) g.Expect(err).ToNot(HaveOccurred()) - c, err := clusteragent.NewClient([]clusterv1.Machine{ + c, err := NewClient([]clusterv1.Machine{ { Status: clusterv1.MachineStatus{ Addresses: clusterv1.MachineAddresses{ @@ -145,64 +143,18 @@ func TestDo(t *testing.T) { }, }, }, - }, port, time.Second, clusteragent.Options{}) + }, port, time.Second, Options{}) g.Expect(err).ToNot(HaveOccurred()) response := make(map[string]string) req := map[string]string{"req": "value"} path = strings.TrimPrefix(path, "/") - g.Expect(c.Do(context.Background(), method, path, req, &response)).To(Succeed()) + g.Expect(c.do(context.Background(), method, path, req, nil, &response)).To(Succeed()) g.Expect(response).To(Equal(resp)) } -type serverMock struct { - method string - path string - response any - request map[string]any - ts *httptest.Server -} - -// NewServerMock creates a test server that responds with the given response when called with the given method and path. -// Make sure to close the server after the test is done. -// Server will try to decode the request body into a map[string]any. -func NewServerMock(method string, path string, response any) *serverMock { - req := make(map[string]any) - ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != path { - http.NotFound(w, r) - return - } - if r.Method != method { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - if response != nil { - if err := json.NewEncoder(w).Encode(response); err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - } - w.WriteHeader(http.StatusOK) - })) - - return &serverMock{ - method: method, - path: path, - response: response, - request: req, - ts: ts, - } -} - func shuffleMachines(src []clusterv1.Machine) []clusterv1.Machine { dest := make([]clusterv1.Machine, len(src)) perm := rand.Perm(len(src)) diff --git a/pkg/clusteragent/const.go b/pkg/clusteragent/const.go new file mode 100644 index 0000000..b997955 --- /dev/null +++ b/pkg/clusteragent/const.go @@ -0,0 +1,5 @@ +package clusteragent + +const ( + AuthTokenHeader = "capi-auth-token" +) diff --git a/pkg/clusteragent/remove_node.go b/pkg/clusteragent/remove_node.go index 20e62e8..59ca691 100644 --- a/pkg/clusteragent/remove_node.go +++ b/pkg/clusteragent/remove_node.go @@ -7,7 +7,10 @@ import ( // RemoveNodeFromDqlite calls the /v2/dqlite/remove endpoint on cluster agent to remove the given address from Dqlite. // The endpoint should be in the format of "address:port". -func (p *Client) RemoveNodeFromDqlite(ctx context.Context, removeEp string) error { - request := map[string]string{"removeEndpoint": removeEp} - return p.Do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, nil) +func (p *Client) RemoveNodeFromDqlite(ctx context.Context, token string, removeEp string) error { + request := map[string]string{"remove_endpoint": removeEp} + header := map[string][]string{ + AuthTokenHeader: {token}, + } + return p.do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, header, nil) } diff --git a/pkg/clusteragent/remove_node_test.go b/pkg/clusteragent/remove_node_test.go index c4fd74b..ef79633 100644 --- a/pkg/clusteragent/remove_node_test.go +++ b/pkg/clusteragent/remove_node_test.go @@ -9,20 +9,22 @@ import ( "time" . "github.com/onsi/gomega" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" - clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/httptest" ) func TestRemoveFromDqlite(t *testing.T) { g := NewWithT(t) path := "/cluster/api/v2.0/dqlite/remove" + token := "myRandomToken" method := http.MethodPost - servM := NewServerMock(method, path, nil) - defer servM.ts.Close() + servM := httptest.NewServerMock(method, path, nil) + defer servM.Srv.Close() - ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.ts.URL, "https://")) + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.Srv.URL, "https://")) g.Expect(err).ToNot(HaveOccurred()) c, err := clusteragent.NewClient([]clusterv1.Machine{ { @@ -37,6 +39,7 @@ func TestRemoveFromDqlite(t *testing.T) { }, port, time.Second, clusteragent.Options{}) g.Expect(err).ToNot(HaveOccurred()) - g.Expect(c.RemoveNodeFromDqlite(context.Background(), "1.1.1.1:1234")).To(Succeed()) - g.Expect(servM.request).To(HaveKeyWithValue("removeEndpoint", "1.1.1.1:1234")) + g.Expect(c.RemoveNodeFromDqlite(context.Background(), token, "1.1.1.1:1234")).To(Succeed()) + g.Expect(servM.Request).To(HaveKeyWithValue("remove_endpoint", "1.1.1.1:1234")) + g.Expect(servM.Header.Get(clusteragent.AuthTokenHeader)).To(Equal(token)) } diff --git a/pkg/httptest/httptest.go b/pkg/httptest/httptest.go new file mode 100644 index 0000000..055bfe3 --- /dev/null +++ b/pkg/httptest/httptest.go @@ -0,0 +1,62 @@ +package httptest + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" +) + +type serverMock struct { + Method string + Path string + Response any + Request map[string]any + Header http.Header + Srv *httptest.Server +} + +// NewServerMock creates a test server that responds with the given response when called with the given method and path. +// Make sure to close the server after the test is done. +// Server will try to decode the request body into a map[string]any. +func NewServerMock(method string, path string, response any) *serverMock { + req := make(map[string]any) + header := make(map[string][]string) + ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != path { + http.NotFound(w, r) + return + } + if r.Method != method { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + for k, vv := range map[string][]string(r.Header) { + header[k] = vv + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if response != nil { + if err := json.NewEncoder(w).Encode(response); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + } + w.WriteHeader(http.StatusOK) + })) + + fmt.Println("header:", header) + return &serverMock{ + Method: method, + Path: path, + Response: response, + Header: header, + Request: req, + Srv: ts, + } +} diff --git a/pkg/token/token.go b/pkg/token/token.go new file mode 100644 index 0000000..5db3aec --- /dev/null +++ b/pkg/token/token.go @@ -0,0 +1,47 @@ +package token + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + AuthTokenNameSuffix = "capi-auth-token" +) + +// Lookup retrieves the token for the given cluster. +func Lookup(ctx context.Context, c client.Client, clusterKey client.ObjectKey) (string, error) { + secret, err := getSecret(ctx, c, clusterKey) + if err != nil { + return "", fmt.Errorf("failed to get secret: %w", err) + } + + v, ok := secret.Data["token"] + if !ok { + return "", fmt.Errorf("token not found in secret") + } + + return string(v), nil +} + +// getSecret retrieves the token secret for the given cluster. +func getSecret(ctx context.Context, c client.Client, clusterKey client.ObjectKey) (*corev1.Secret, error) { + s := &corev1.Secret{} + key := client.ObjectKey{ + Name: authTokenName(clusterKey.Name), + Namespace: clusterKey.Namespace, + } + if err := c.Get(ctx, key, s); err != nil { + return nil, fmt.Errorf("failed to get secret: %w", err) + } + + return s, nil +} + +// authTokenName returns the name of the auth-token secret, computed by convention using the name of the cluster. +func authTokenName(clusterName string) string { + return fmt.Sprintf("%s-%s", clusterName, AuthTokenNameSuffix) +} diff --git a/pkg/token/token_test.go b/pkg/token/token_test.go new file mode 100644 index 0000000..c1868b4 --- /dev/null +++ b/pkg/token/token_test.go @@ -0,0 +1,50 @@ +package token_test + +import ( + "context" + "fmt" + "testing" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token" +) + +func TestReconcile(t *testing.T) { + t.Run("LookupFailsIfNoSecret", func(t *testing.T) { + namespace := "test-namespace" + clusterName := "test-cluster" + c := fake.NewClientBuilder().Build() + + g := NewWithT(t) + + _, err := token.Lookup(context.Background(), c, client.ObjectKey{Name: clusterName, Namespace: namespace}) + g.Expect(err).To(HaveOccurred()) + }) + + t.Run("LookupSucceedsIfSecretExists", func(t *testing.T) { + namespace := "test-namespace" + clusterName := "test-cluster" + expToken := "test-token" + secret := &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", clusterName, token.AuthTokenNameSuffix), + Namespace: namespace, + }, + Data: map[string][]byte{ + "token": []byte(expToken), + }, + } + c := fake.NewClientBuilder().WithObjects(secret).Build() + + g := NewWithT(t) + + token, err := token.Lookup(context.Background(), c, client.ObjectKey{Name: clusterName, Namespace: namespace}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(token).To(Equal(expToken)) + }) +}