Skip to content

Commit

Permalink
Call Dqlite remove endpoint for cleanup (KU-1719) (#69)
Browse files Browse the repository at this point in the history
* Call node removal endpoint on machine deletion (#66)
* Add pkg to Dockerfile and Fix TLS issue (#67)
* Add capi-auth-token header to /dqlite/remove request (#68)
  • Loading branch information
HomayoonAlimohammadi authored Oct 14, 2024
1 parent 4f3a678 commit ac3d9e3
Show file tree
Hide file tree
Showing 11 changed files with 594 additions and 1 deletion.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ RUN go mod download
COPY main.go main.go
COPY api/ api/
COPY controllers/ controllers/
COPY pkg/ pkg/

# Build
RUN CGO_ENABLED=0 GOOS=linux GOARCH=$arch go build -a -ldflags '-s -w' -o manager main.go
Expand Down
88 changes: 87 additions & 1 deletion controllers/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,22 @@ import (
"context"
"fmt"
"math/rand"
"net"
"sort"
"strings"
"time"

clusterv1beta1 "github.com/canonical/cluster-api-control-plane-provider-microk8s/api/v1beta1"
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/clusteragent"
"github.com/canonical/cluster-api-control-plane-provider-microk8s/pkg/token"
"golang.org/x/mod/semver"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/names"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/external"
Expand All @@ -28,6 +32,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

const (
defaultClusterAgentPort string = "25000"
defaultDqlitePort string = "19001"
defaultClusterAgentClientTimeout time.Duration = 10 * time.Second
)

type errServiceUnhealthy struct {
service string
reason string
Expand Down Expand Up @@ -466,7 +476,7 @@ func (r *MicroK8sControlPlaneReconciler) reconcileDelete(ctx context.Context, cl
}

// clean up MicroK8s cluster secrets
for _, secretName := range []string{"kubeconfig", "ca", "jointoken"} {
for _, secretName := range []string{"kubeconfig", "ca", "jointoken", token.AuthTokenNameSuffix} {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Expand Down Expand Up @@ -578,6 +588,28 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
node := deleteMachine.Status.NodeRef

logger = logger.WithValues("machineName", deleteMachine.Name, "nodeName", node.Name)

logger.Info("deleting node from dqlite", "machineName", deleteMachine.Name, "nodeName", node.Name)

// NOTE(Hue): We do this step as a best effort since this whole logic is implemented to prevent a not-yet-reported bug.
// The issue is that we were not removing the endpoint from dqlite when we were deleting a machine.
// This would cause a situation were a joining node failed to join because the endpoint was already in the dqlite cluster.
// How? The IP assigned to the joining (new) node, previously belonged to a node that was deleted, but the IP is still there in dqlite.
// If we have 2 machines, deleting one is not safe because it can be the leader and we're not taking care of
// leadership transfers in the cluster-agent for now. Maybe something for later (TODO)
// If we have 3 or more machines left, get cluster agent client and delete node from dqlite.
if len(machines) > 2 {
portRemap := tcp != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration != nil && tcp.Spec.ControlPlaneConfig.ClusterConfiguration.PortCompatibilityRemap

if clusterAgentClient, err := getClusterAgentClient(machines, deleteMachine, portRemap); err == nil {
if err := r.removeNodeFromDqlite(ctx, clusterAgentClient, cluster, deleteMachine, portRemap); err != nil {
logger.Error(err, "failed to remove node from dqlite: %w", "machineName", deleteMachine.Name, "nodeName", node.Name)
}
} else {
logger.Error(err, "failed to get cluster agent client")
}
}

logger.Info("deleting machine")

err = r.Client.Delete(ctx, &deleteMachine)
Expand All @@ -595,6 +627,60 @@ func (r *MicroK8sControlPlaneReconciler) scaleDownControlPlane(ctx context.Conte
return ctrl.Result{Requeue: true}, nil
}

func getClusterAgentClient(machines []clusterv1.Machine, delMachine clusterv1.Machine, portRemap bool) (*clusteragent.Client, error) {
opts := clusteragent.Options{
// NOTE(hue): We want to pick a random machine's IP to call POST /dqlite/remove on its cluster agent endpoint.
// This machine should preferably not be the <delMachine> itself, although this is not forced by Microk8s.
IgnoreMachineNames: sets.NewString(delMachine.Name),
}

port := defaultClusterAgentPort
if portRemap {
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
port = "30000"
}

clusterAgentClient, err := clusteragent.NewClient(machines, port, defaultClusterAgentClientTimeout, opts)
if err != nil {
return nil, fmt.Errorf("failed to initialize cluster agent client: %w", err)
}

return clusterAgentClient, nil
}

// removeMicrok8sNode removes the node from
func (r *MicroK8sControlPlaneReconciler) removeNodeFromDqlite(ctx context.Context, clusterAgentClient *clusteragent.Client,
clusterKey client.ObjectKey, delMachine clusterv1.Machine, portRemap bool) error {
dqlitePort := defaultDqlitePort
if portRemap {
// https://github.com/canonical/cluster-api-control-plane-provider-microk8s/blob/v0.6.10/control-plane-components.yaml#L96-L102
dqlitePort = "2379"
}

var removeEp string
for _, addr := range delMachine.Status.Addresses {
if net.ParseIP(addr.Address) != nil {
removeEp = fmt.Sprintf("%s:%s", addr.Address, dqlitePort)
break
}
}

if removeEp == "" {
return fmt.Errorf("failed to extract endpoint of the deleting machine %q", delMachine.Name)
}

token, err := token.Lookup(ctx, r.Client, clusterKey)
if err != nil {
return fmt.Errorf("failed to lookup token: %w", err)
}

if err := clusterAgentClient.RemoveNodeFromDqlite(ctx, token, removeEp); err != nil {
return fmt.Errorf("failed to remove node %q from dqlite: %w", removeEp, err)
}

return nil
}

func createUpgradePod(ctx context.Context, kubeclient *kubernetesClient, nodeName string, nodeVersion string) (*corev1.Pod, error) {
nodeVersion = strings.TrimPrefix(semver.MajorMinor(nodeVersion), "v")

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
cloud.google.com/go/compute v1.10.0 // indirect
github.com/blang/semver v3.5.1+incompatible // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
Expand Down
115 changes: 115 additions & 0 deletions pkg/clusteragent/clusteragent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package clusteragent

import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"time"

"k8s.io/apimachinery/pkg/util/sets"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
)

// Options should be used when initializing a new client.
type Options struct {
// IgnoreMachineNames is a set of ignored machine names that we don't want to pick their IP for the cluster agent endpoint.
IgnoreMachineNames sets.String
}

type Client struct {
ip, port string
client *http.Client
}

// NewClient picks an IP from one of the given machines and creates a new client for the cluster agent
// with that IP.
func NewClient(machines []clusterv1.Machine, port string, timeout time.Duration, opts Options) (*Client, error) {
var ip string
for _, m := range machines {
if opts.IgnoreMachineNames.Has(m.Name) {
continue
}

for _, addr := range m.Status.Addresses {
if net.ParseIP(addr.Address) != nil {
ip = addr.Address
break
}
}
break
}

if ip == "" {
return nil, errors.New("failed to find an IP for cluster agent")
}

return &Client{
ip: ip,
port: port,
client: &http.Client{
Timeout: timeout,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
// TODO(Hue): Workaround for now, address later on
// get the certificate fingerprint from the matching node through a resource in the cluster (TBD),
// and validate it in the TLSClientConfig
InsecureSkipVerify: true,
},
},
},
}, nil
}

func (c *Client) Endpoint() string {
return fmt.Sprintf("https://%s:%s", c.ip, c.port)
}

// do makes a request to the given endpoint with the given method. It marshals the request and unmarshals
// server response body if the provided response is not nil.
// The endpoint should _not_ have a leading slash.
func (c *Client) do(ctx context.Context, method, endpoint string, request any, header map[string][]string, response any) error {
url := fmt.Sprintf("https://%s:%s/%s", c.ip, c.port, endpoint)

requestBody, err := json.Marshal(request)
if err != nil {
return fmt.Errorf("failed to prepare worker info request: %w", err)
}

req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(requestBody))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

req.Header = http.Header(header)

res, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to call cluster agent: %w", err)
}
defer res.Body.Close()

if res.StatusCode != http.StatusOK {
// NOTE(hue): Marshal and print any response that we got since it might contain valuable information
// on why the request failed.
// Ignore JSON errors to prevent unnecessarily complicated error handling.
anyResp := make(map[string]any)
_ = json.NewDecoder(res.Body).Decode(&anyResp)
b, _ := json.Marshal(anyResp)
resStr := string(b)

return fmt.Errorf("HTTP request to cluster agent failed with status code %d, got response: %q", res.StatusCode, resStr)
}

if response != nil {
if err := json.NewDecoder(res.Body).Decode(response); err != nil {
return fmt.Errorf("failed to decode response: %w", err)
}
}

return nil
}
Loading

0 comments on commit ac3d9e3

Please sign in to comment.