diff --git a/Dockerfile b/Dockerfile index 9c1de47..352d8c4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,6 +15,7 @@ RUN go mod download COPY main.go main.go COPY api/ api/ COPY controllers/ controllers/ +COPY pkg/ pkg/ # Build RUN CGO_ENABLED=0 GOOS=linux GOARCH=$arch go build -a -ldflags '-s -w' -o manager main.go diff --git a/controllers/reconcile.go b/controllers/reconcile.go index b6cb197..17d6bf8 100644 --- a/controllers/reconcile.go +++ b/controllers/reconcile.go @@ -4,11 +4,14 @@ import ( "context" "fmt" "math/rand" + "net" "sort" "strings" "time" clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token" "golang.org/x/mod/semver" "github.com/pkg/errors" @@ -16,6 +19,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/storage/names" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" "sigs.k8s.io/cluster-api/controllers/external" @@ -28,6 +32,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) +const ( + defaultClusterAgentPort string = "25000" + defaultDqlitePort string = "19001" + defaultClusterAgentClientTimeout time.Duration = 10 * time.Second +) + type errServiceUnhealthy struct { service string reason string @@ -466,7 +476,7 @@ func (r *MicroK8sControlPlaneReconciler) reconcileDelete(ctx context.Context, cl } // clean up MicroK8s cluster secrets - for _, secretName := range []string{"kubeconfig", "ca", "jointoken"} { + for _, secretName := range []string{"kubeconfig", "ca", "jointoken", token.AuthTokenNameSuffix} { secret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Namespace: cluster.Namespace, @@ -578,6 +588,28 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte node := deleteMachine.Status.NodeRef logger = logger.WithValues("machineName", deleteMachine.Name, "nodeName", node.Name) + + logger.Info("deleting node from dqlite", "machineName", deleteMachine.Name, "nodeName", node.Name) + + // NOTE(Hue): We do this step as a best effort since this whole logic is implemented to prevent a not-yet-reported bug. + // The issue is that we were not removing the endpoint from dqlite when we were deleting a machine. + // This would cause a situation were a joining node failed to join because the endpoint was already in the dqlite cluster. + // How? The IP assigned to the joining (new) node, previously belonged to a node that was deleted, but the IP is still there in dqlite. + // If we have 2 machines, deleting one is not safe because it can be the leader and we're not taking care of + // leadership transfers in the cluster-agent for now. Maybe something for later (TODO) + // If we have 3 or more machines left, get cluster agent client and delete node from dqlite. + if len(machines) > 2 { + portRemap := tcp != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration.PortCompatibilityRemap + + if clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine, portRemap); err == nil { + if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, cluster, deleteMachine, portRemap); err != nil { + logger.Error(err, "failed to remove node from dqlite: %w", "machineName", deleteMachine.Name, "nodeName", node.Name) + } + } else { + logger.Error(err, "failed to get cluster agent client") + } + } + logger.Info("deleting machine") err = r.Client.Delete(ctx, &deleteMachine) @@ -595,6 +627,60 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte return ctrl.Result{Requeue: true}, nil } +func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Machine, portRemap bool) (*clusteragent.Client, error) { + opts := clusteragent.Options{ + // NOTE(hue): We want to pick a random machine's IP to call POST /dqlite/remove on its cluster agent endpoint. + // This machine should preferably not be the itself, although this is not forced by Microk8s. + IgnoreMachineNames: sets.NewString(delMachine.Name), + } + + port := defaultClusterAgentPort + if portRemap { + // https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102 + port = "30000" + } + + clusterAgentClient, err := clusteragent.NewClient(machines, port, defaultClusterAgentClientTimeout, opts) + if err != nil { + return nil, fmt.Errorf("failed to initialize cluster agent client: %w", err) + } + + return clusterAgentClient, nil +} + +// removeMicrok8sNode removes the node from +func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client, + clusterKey client.ObjectKey, delMachine clusterv1.Machine, portRemap bool) error { + dqlitePort := defaultDqlitePort + if portRemap { + // https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102 + dqlitePort = "2379" + } + + var removeEp string + for _, addr := range delMachine.Status.Addresses { + if net.ParseIP(addr.Address) != nil { + removeEp = fmt.Sprintf("%s:%s", addr.Address, dqlitePort) + break + } + } + + if removeEp == "" { + return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name) + } + + token, err := token.Lookup(ctx, r.Client, clusterKey) + if err != nil { + return fmt.Errorf("failed to lookup token: %w", err) + } + + if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, token, removeEp); err != nil { + return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err) + } + + return nil +} + func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) { nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v") diff --git a/go.mod b/go.mod index ccb6f47..68f5586 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( cloud.google.com/go/compute v1.10.0 // indirect github.com/blang/semver v3.5.1+incompatible // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect + github.com/evanphx/json-patch v5.6.0+incompatible // indirect github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/go-openapi/jsonpointer v0.19.5 // indirect github.com/go-openapi/jsonreference v0.20.0 // indirect diff --git a/pkg/clusteragent/clusteragent.go b/pkg/clusteragent/clusteragent.go new file mode 100644 index 0000000..dce3dca --- /dev/null +++ b/pkg/clusteragent/clusteragent.go @@ -0,0 +1,115 @@ +package clusteragent + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "net" + "net/http" + "time" + + "k8s.io/apimachinery/pkg/util/sets" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" +) + +// Options should be used when initializing a new client. +type Options struct { + // IgnoreMachineNames is a set of ignored machine names that we don't want to pick their IP for the cluster agent endpoint. + IgnoreMachineNames sets.String +} + +type Client struct { + ip, port string + client *http.Client +} + +// NewClient picks an IP from one of the given machines and creates a new client for the cluster agent +// with that IP. +func NewClient(machines []clusterv1.Machine, port string, timeout time.Duration, opts Options) (*Client, error) { + var ip string + for _, m := range machines { + if opts.IgnoreMachineNames.Has(m.Name) { + continue + } + + for _, addr := range m.Status.Addresses { + if net.ParseIP(addr.Address) != nil { + ip = addr.Address + break + } + } + break + } + + if ip == "" { + return nil, errors.New("failed to find an IP for cluster agent") + } + + return &Client{ + ip: ip, + port: port, + client: &http.Client{ + Timeout: timeout, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + // TODO(Hue): Workaround for now, address later on + // get the certificate fingerprint from the matching node through a resource in the cluster (TBD), + // and validate it in the TLSClientConfig + InsecureSkipVerify: true, + }, + }, + }, + }, nil +} + +func (c *Client) Endpoint() string { + return fmt.Sprintf("https://%s:%s", c.ip, c.port) +} + +// do makes a request to the given endpoint with the given method. It marshals the request and unmarshals +// server response body if the provided response is not nil. +// The endpoint should _not_ have a leading slash. +func (c *Client) do(ctx context.Context, method, endpoint string, request any, header map[string][]string, response any) error { + url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint) + + requestBody, err := json.Marshal(request) + if err != nil { + return fmt.Errorf("failed to prepare worker info request: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(requestBody)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header = http.Header(header) + + res, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("failed to call cluster agent: %w", err) + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + // NOTE(hue): Marshal and print any response that we got since it might contain valuable information + // on why the request failed. + // Ignore JSON errors to prevent unnecessarily complicated error handling. + anyResp := make(map[string]any) + _ = json.NewDecoder(res.Body).Decode(&anyResp) + b, _ := json.Marshal(anyResp) + resStr := string(b) + + return fmt.Errorf("HTTP request to cluster agent failed with status code %d, got response: %q", res.StatusCode, resStr) + } + + if response != nil { + if err := json.NewDecoder(res.Body).Decode(response); err != nil { + return fmt.Errorf("failed to decode response: %w", err) + } + } + + return nil +} diff --git a/pkg/clusteragent/clusteragent_test.go b/pkg/clusteragent/clusteragent_test.go new file mode 100644 index 0000000..25dfa69 --- /dev/null +++ b/pkg/clusteragent/clusteragent_test.go @@ -0,0 +1,165 @@ +package clusteragent + +import ( + "context" + "fmt" + "math/rand" + "net" + "net/http" + "strings" + "testing" + "time" + + . "github.com/onsi/gomega" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/httptest" +) + +func TestClient(t *testing.T) { + t.Run("CanNotFindAddress", func(t *testing.T) { + g := NewWithT(t) + + // Machines don't have any addresses. + machines := []clusterv1.Machine{{}, {}} + _, err := NewClient(machines, "25000", time.Second, Options{}) + + g.Expect(err).To(HaveOccurred()) + + // The only machine is the ignored one. + ignoreName := "ignore" + machines = []clusterv1.Machine{ + { + ObjectMeta: v1.ObjectMeta{ + Name: ignoreName, + }, + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: "1.1.1.1", + }, + }, + }, + }, + } + _, err = NewClient(machines, "25000", time.Second, Options{IgnoreMachineNames: sets.NewString(ignoreName)}) + + g.Expect(err).To(HaveOccurred()) + }) + + t.Run("CorrectEndpoint", func(t *testing.T) { + g := NewWithT(t) + + port := "30000" + firstAddr := "1.1.1.1" + secondAddr := "2.2.2.2" + thirdAddr := "3.3.3.3" + + ignoreName := "ignore" + ignoreAddr := "8.8.8.8" + machines := []clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: firstAddr, + }, + }, + }, + }, + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: secondAddr, + }, + }, + }, + }, + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: thirdAddr, + }, + }, + }, + }, + { + ObjectMeta: v1.ObjectMeta{ + Name: ignoreName, + }, + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: ignoreAddr, + }, + }, + }, + }, + } + + opts := Options{ + IgnoreMachineNames: sets.NewString(ignoreName), + } + + // NOTE(Hue): Repeat the test to make sure the ignored machine's IP is not picked by chance (reduce flakiness). + for i := 0; i < 100; i++ { + machines = shuffleMachines(machines) + c, err := NewClient(machines, port, time.Second, opts) + + g.Expect(err).ToNot(HaveOccurred()) + + // Check if the endpoint is one of the expected ones and not the ignored one. + g.Expect([]string{fmt.Sprintf("https://%s:%s", firstAddr, port), fmt.Sprintf("https://%s:%s", secondAddr, port), fmt.Sprintf("https://%s:%s", thirdAddr, port)}).To(ContainElement(c.Endpoint())) + g.Expect(c.Endpoint()).ToNot(Equal(fmt.Sprintf("https://%s:%s", ignoreAddr, port))) + } + + }) +} + +func TestDo(t *testing.T) { + g := NewWithT(t) + + path := "/random/path" + method := http.MethodPost + resp := map[string]string{ + "key": "value", + } + servM := httptest.NewServerMock(method, path, resp) + defer servM.Srv.Close() + + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.Srv.URL, "https://")) + g.Expect(err).ToNot(HaveOccurred()) + c, err := NewClient([]clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: ip, + }, + }, + }, + }, + }, port, time.Second, Options{}) + + g.Expect(err).ToNot(HaveOccurred()) + + response := make(map[string]string) + req := map[string]string{"req": "value"} + path = strings.TrimPrefix(path, "/") + g.Expect(c.do(context.Background(), method, path, req, nil, &response)).To(Succeed()) + + g.Expect(response).To(Equal(resp)) +} + +func shuffleMachines(src []clusterv1.Machine) []clusterv1.Machine { + dest := make([]clusterv1.Machine, len(src)) + perm := rand.Perm(len(src)) + for i, v := range perm { + dest[v] = src[i] + } + return dest +} diff --git a/pkg/clusteragent/const.go b/pkg/clusteragent/const.go new file mode 100644 index 0000000..b997955 --- /dev/null +++ b/pkg/clusteragent/const.go @@ -0,0 +1,5 @@ +package clusteragent + +const ( + AuthTokenHeader = "capi-auth-token" +) diff --git a/pkg/clusteragent/remove_node.go b/pkg/clusteragent/remove_node.go new file mode 100644 index 0000000..59ca691 --- /dev/null +++ b/pkg/clusteragent/remove_node.go @@ -0,0 +1,16 @@ +package clusteragent + +import ( + "context" + "net/http" +) + +// RemoveNodeFromDqlite calls the /v2/dqlite/remove endpoint on cluster agent to remove the given address from Dqlite. +// The endpoint should be in the format of "address:port". +func (p *Client) RemoveNodeFromDqlite(ctx context.Context, token string, removeEp string) error { + request := map[string]string{"remove_endpoint": removeEp} + header := map[string][]string{ + AuthTokenHeader: {token}, + } + return p.do(ctx, http.MethodPost, "cluster/api/v2.0/dqlite/remove", request, header, nil) +} diff --git a/pkg/clusteragent/remove_node_test.go b/pkg/clusteragent/remove_node_test.go new file mode 100644 index 0000000..ef79633 --- /dev/null +++ b/pkg/clusteragent/remove_node_test.go @@ -0,0 +1,45 @@ +package clusteragent_test + +import ( + "context" + "net" + "net/http" + "strings" + "testing" + "time" + + . "github.com/onsi/gomega" + clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent" + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/httptest" +) + +func TestRemoveFromDqlite(t *testing.T) { + g := NewWithT(t) + + path := "/cluster/api/v2.0/dqlite/remove" + token := "myRandomToken" + method := http.MethodPost + servM := httptest.NewServerMock(method, path, nil) + defer servM.Srv.Close() + + ip, port, err := net.SplitHostPort(strings.TrimPrefix(servM.Srv.URL, "https://")) + g.Expect(err).ToNot(HaveOccurred()) + c, err := clusteragent.NewClient([]clusterv1.Machine{ + { + Status: clusterv1.MachineStatus{ + Addresses: clusterv1.MachineAddresses{ + { + Address: ip, + }, + }, + }, + }, + }, port, time.Second, clusteragent.Options{}) + + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(c.RemoveNodeFromDqlite(context.Background(), token, "1.1.1.1:1234")).To(Succeed()) + g.Expect(servM.Request).To(HaveKeyWithValue("remove_endpoint", "1.1.1.1:1234")) + g.Expect(servM.Header.Get(clusteragent.AuthTokenHeader)).To(Equal(token)) +} diff --git a/pkg/httptest/httptest.go b/pkg/httptest/httptest.go new file mode 100644 index 0000000..055bfe3 --- /dev/null +++ b/pkg/httptest/httptest.go @@ -0,0 +1,62 @@ +package httptest + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" +) + +type serverMock struct { + Method string + Path string + Response any + Request map[string]any + Header http.Header + Srv *httptest.Server +} + +// NewServerMock creates a test server that responds with the given response when called with the given method and path. +// Make sure to close the server after the test is done. +// Server will try to decode the request body into a map[string]any. +func NewServerMock(method string, path string, response any) *serverMock { + req := make(map[string]any) + header := make(map[string][]string) + ts := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != path { + http.NotFound(w, r) + return + } + if r.Method != method { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + for k, vv := range map[string][]string(r.Header) { + header[k] = vv + } + + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if response != nil { + if err := json.NewEncoder(w).Encode(response); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + } + w.WriteHeader(http.StatusOK) + })) + + fmt.Println("header:", header) + return &serverMock{ + Method: method, + Path: path, + Response: response, + Header: header, + Request: req, + Srv: ts, + } +} diff --git a/pkg/token/token.go b/pkg/token/token.go new file mode 100644 index 0000000..5db3aec --- /dev/null +++ b/pkg/token/token.go @@ -0,0 +1,47 @@ +package token + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + AuthTokenNameSuffix = "capi-auth-token" +) + +// Lookup retrieves the token for the given cluster. +func Lookup(ctx context.Context, c client.Client, clusterKey client.ObjectKey) (string, error) { + secret, err := getSecret(ctx, c, clusterKey) + if err != nil { + return "", fmt.Errorf("failed to get secret: %w", err) + } + + v, ok := secret.Data["token"] + if !ok { + return "", fmt.Errorf("token not found in secret") + } + + return string(v), nil +} + +// getSecret retrieves the token secret for the given cluster. +func getSecret(ctx context.Context, c client.Client, clusterKey client.ObjectKey) (*corev1.Secret, error) { + s := &corev1.Secret{} + key := client.ObjectKey{ + Name: authTokenName(clusterKey.Name), + Namespace: clusterKey.Namespace, + } + if err := c.Get(ctx, key, s); err != nil { + return nil, fmt.Errorf("failed to get secret: %w", err) + } + + return s, nil +} + +// authTokenName returns the name of the auth-token secret, computed by convention using the name of the cluster. +func authTokenName(clusterName string) string { + return fmt.Sprintf("%s-%s", clusterName, AuthTokenNameSuffix) +} diff --git a/pkg/token/token_test.go b/pkg/token/token_test.go new file mode 100644 index 0000000..c1868b4 --- /dev/null +++ b/pkg/token/token_test.go @@ -0,0 +1,50 @@ +package token_test + +import ( + "context" + "fmt" + "testing" + + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token" +) + +func TestReconcile(t *testing.T) { + t.Run("LookupFailsIfNoSecret", func(t *testing.T) { + namespace := "test-namespace" + clusterName := "test-cluster" + c := fake.NewClientBuilder().Build() + + g := NewWithT(t) + + _, err := token.Lookup(context.Background(), c, client.ObjectKey{Name: clusterName, Namespace: namespace}) + g.Expect(err).To(HaveOccurred()) + }) + + t.Run("LookupSucceedsIfSecretExists", func(t *testing.T) { + namespace := "test-namespace" + clusterName := "test-cluster" + expToken := "test-token" + secret := &corev1.Secret{ + ObjectMeta: v1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", clusterName, token.AuthTokenNameSuffix), + Namespace: namespace, + }, + Data: map[string][]byte{ + "token": []byte(expToken), + }, + } + c := fake.NewClientBuilder().WithObjects(secret).Build() + + g := NewWithT(t) + + token, err := token.Lookup(context.Background(), c, client.ObjectKey{Name: clusterName, Namespace: namespace}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(token).To(Equal(expToken)) + }) +}