Skip to content

Commit

Permalink
generate a node join token (#4072)
Browse files Browse the repository at this point in the history
* generate a node join token

* two mutexes, and do not restart successful pods

* generate the full node join command

* all controllers are also workers

* allow arbitrary node roles

* role is controller not controller+worker
  • Loading branch information
laverya committed Oct 22, 2023
1 parent efa325b commit 4740d84
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 9 deletions.
4 changes: 2 additions & 2 deletions pkg/handlers/helmvm_node_join_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (h *Handler) GenerateHelmVMNodeJoinCommandSecondary(w http.ResponseWriter,
return
}

command, expiry, err := helmvm.GenerateAddNodeCommand(client, false)
command, expiry, err := helmvm.GenerateAddNodeCommand(r.Context(), client, "worker")
if err != nil {
logger.Error(err)
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -42,7 +42,7 @@ func (h *Handler) GenerateHelmVMNodeJoinCommandPrimary(w http.ResponseWriter, r
return
}

command, expiry, err := helmvm.GenerateAddNodeCommand(client, true)
command, expiry, err := helmvm.GenerateAddNodeCommand(r.Context(), client, "controller")
if err != nil {
logger.Error(err)
w.WriteHeader(http.StatusInternalServerError)
Expand Down
25 changes: 25 additions & 0 deletions pkg/helmvm/jointoken.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package helmvm

import (
"encoding/base64"
"encoding/json"

"github.com/google/uuid"
)

// joinToken is a struct that holds both the actual token and the cluster id. This is marshaled
// and base64 encoded and used as argument to the join command in the other nodes.
type joinToken struct {
ClusterID uuid.UUID `json:"clusterID"`
Token string `json:"token"`
Role string `json:"role"`
}

// Encode encodes a JoinToken to base64.
func (j *joinToken) Encode() (string, error) {
b, err := json.Marshal(j)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(b), nil
}
239 changes: 233 additions & 6 deletions pkg/helmvm/node_join.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,244 @@
package helmvm

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/google/uuid"
corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)

type joinCommandEntry struct {
Command []string
Creation *time.Time
Mut sync.Mutex
}

var joinCommandMapMut = sync.Mutex{}
var joinCommandMap = map[string]*joinCommandEntry{}

// GenerateAddNodeCommand will generate the HelmVM node add command for a primary or secondary node
func GenerateAddNodeCommand(client kubernetes.Interface, primary bool) ([]string, *time.Time, error) {
tomorrow := time.Now().Add(time.Hour * 24)
if primary {
return []string{"this is a primary join command string", "that can be multiple strings"}, &tomorrow, nil
} else {
return []string{"this is a secondary join command string", "that can be multiple strings"}, &tomorrow, nil
// join commands will last for 24 hours, and will be cached for 1 hour after first generation
func GenerateAddNodeCommand(ctx context.Context, client kubernetes.Interface, nodeRole string) ([]string, *time.Time, error) {
// get the joinCommand struct entry for this node role
joinCommandMapMut.Lock()
if _, ok := joinCommandMap[nodeRole]; !ok {
joinCommandMap[nodeRole] = &joinCommandEntry{}
}
joinCommand := joinCommandMap[nodeRole]
joinCommandMapMut.Unlock()

// lock the joinCommand struct entry
joinCommand.Mut.Lock()
defer joinCommand.Mut.Unlock()

// if the joinCommand has been generated in the past hour, return it
if joinCommand.Creation != nil && time.Now().Before(joinCommand.Creation.Add(time.Hour)) {
expiry := joinCommand.Creation.Add(time.Hour * 24)
return joinCommand.Command, &expiry, nil
}

newToken, err := runAddNodeCommandPod(ctx, client, nodeRole)
if err != nil {
return nil, nil, fmt.Errorf("failed to run add node command pod: %w", err)
}

newCmd, err := generateAddNodeCommand(ctx, client, nodeRole, newToken)
if err != nil {
return nil, nil, fmt.Errorf("failed to generate add node command: %w", err)
}

now := time.Now()
joinCommand.Command = newCmd
joinCommand.Creation = &now

expiry := now.Add(time.Hour * 24)
return newCmd, &expiry, nil
}

// run a pod that will generate the add node token
func runAddNodeCommandPod(ctx context.Context, client kubernetes.Interface, nodeRole string) (string, error) {
podName := "k0s-token-generator-"
suffix := strings.Replace(nodeRole, "+", "-", -1)
podName += suffix

// cleanup the pod if it already exists
err := client.CoreV1().Pods("kube-system").Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
if !kuberneteserrors.IsNotFound(err) {
return "", fmt.Errorf("failed to delete pod: %w", err)
}
}

hostPathFile := corev1.HostPathFile
hostPathDir := corev1.HostPathDirectory
_, err = client.CoreV1().Pods("kube-system").Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: "kube-system",
Labels: map[string]string{
"replicated.app/embedded-cluster": "true",
},
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
HostNetwork: true,
Volumes: []corev1.Volume{
{
Name: "bin",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/usr/local/bin/k0s",
Type: &hostPathFile,
},
},
},
{
Name: "lib",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/lib/k0s",
Type: &hostPathDir,
},
},
},
{
Name: "etc",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/etc/k0s",
Type: &hostPathDir,
},
},
},
{
Name: "run",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/run/k0s",
Type: &hostPathDir,
},
},
},
},
Affinity: &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &corev1.NodeSelector{
NodeSelectorTerms: []corev1.NodeSelectorTerm{
{
MatchExpressions: []corev1.NodeSelectorRequirement{
{
Key: "node.k0sproject.io/role",
Operator: corev1.NodeSelectorOpIn,
Values: []string{
"control-plane",
},
},
},
},
},
},
},
},
Containers: []corev1.Container{
{
Name: "k0s-token-generator",
Image: "ubuntu:latest", // TODO use the kotsadm image here as we'll know it exists
Command: []string{"/mnt/k0s"},
Args: []string{
"token",
"create",
"--expiry",
"12h",
"--role",
nodeRole,
},
VolumeMounts: []corev1.VolumeMount{
{
Name: "bin",
MountPath: "/mnt/k0s",
},
{
Name: "lib",
MountPath: "/var/lib/k0s",
},
{
Name: "etc",
MountPath: "/etc/k0s",
},
{
Name: "run",
MountPath: "/run/k0s",
},
},
},
},
},
}, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("failed to create pod: %w", err)
}

// wait for the pod to complete
for {
pod, err := client.CoreV1().Pods("kube-system").Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("failed to get pod: %w", err)
}

if pod.Status.Phase == corev1.PodSucceeded {
break
}

if pod.Status.Phase == corev1.PodFailed {
return "", fmt.Errorf("pod failed")
}

time.Sleep(time.Second)
}

// get the logs from the completed pod
podLogs, err := client.CoreV1().Pods("kube-system").GetLogs(podName, &corev1.PodLogOptions{}).DoRaw(ctx)
if err != nil {
return "", fmt.Errorf("failed to get pod logs: %w", err)
}

// the logs are just a join token, which needs to be added to other things to get a join command
return string(podLogs), nil
}

// generate the add node command from the join token, the node roles, and info from the embedded-cluster-config configmap
func generateAddNodeCommand(ctx context.Context, client kubernetes.Interface, nodeRole string, token string) ([]string, error) {
cm, err := ReadConfigMap(client)
if err != nil {
return nil, fmt.Errorf("failed to read configmap: %w", err)
}

clusterID := cm.Data["embedded-cluster-id"]
binaryName := cm.Data["embedded-binary-name"]

clusterUUID := uuid.UUID{}
err = clusterUUID.UnmarshalText([]byte(clusterID))
if err != nil {
return nil, fmt.Errorf("failed to unmarshal cluster id %s: %w", clusterID, err)
}

fullToken := joinToken{
ClusterID: clusterUUID,
Token: token,
Role: nodeRole,
}

b64token, err := fullToken.Encode()
if err != nil {
return nil, fmt.Errorf("unable to encode token: %w", err)
}

return []string{binaryName + " node join", b64token}, nil
}
2 changes: 1 addition & 1 deletion pkg/helmvm/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package helmvm
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"

corev1 "k8s.io/api/core/v1"
kuberneteserrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand Down

0 comments on commit 4740d84

Please sign in to comment.