diff --git a/pkg/handlers/handlers.go b/pkg/handlers/handlers.go index aa711b7b89..aca714d3b2 100644 --- a/pkg/handlers/handlers.go +++ b/pkg/handlers/handlers.go @@ -287,6 +287,8 @@ func RegisterSessionAuthRoutes(r *mux.Router, kotsStore store.Store, handler KOT HandlerFunc(middleware.EnforceAccess(policy.ClusterWrite, handler.DeleteHelmVMNode)) r.Name("GetHelmVMNodes").Path("/api/v1/helmvm/nodes").Methods("GET"). HandlerFunc(middleware.EnforceAccess(policy.ClusterRead, handler.GetHelmVMNodes)) + r.Name("GetHelmVMNode").Path("/api/v1/helmvm/node/{nodeName}").Methods("GET"). + HandlerFunc(middleware.EnforceAccess(policy.ClusterRead, handler.GetHelmVMNode)) // Prometheus r.Name("SetPrometheusAddress").Path("/api/v1/prometheus").Methods("POST"). diff --git a/pkg/handlers/helmvm_get.go b/pkg/handlers/helmvm_get.go index cd440d116f..4f736996ed 100644 --- a/pkg/handlers/helmvm_get.go +++ b/pkg/handlers/helmvm_get.go @@ -3,6 +3,7 @@ package handlers import ( "net/http" + "github.com/gorilla/mux" "github.com/replicatedhq/kots/pkg/helmvm" "github.com/replicatedhq/kots/pkg/k8sutil" "github.com/replicatedhq/kots/pkg/logger" @@ -16,7 +17,7 @@ func (h *Handler) GetHelmVMNodes(w http.ResponseWriter, r *http.Request) { return } - nodes, err := helmvm.GetNodes(client) + nodes, err := helmvm.GetNodes(r.Context(), client) if err != nil { logger.Error(err) w.WriteHeader(http.StatusInternalServerError) @@ -24,3 +25,21 @@ func (h *Handler) GetHelmVMNodes(w http.ResponseWriter, r *http.Request) { } JSON(w, http.StatusOK, nodes) } + +func (h *Handler) GetHelmVMNode(w http.ResponseWriter, r *http.Request) { + client, err := k8sutil.GetClientset() + if err != nil { + logger.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + nodeName := mux.Vars(r)["nodeName"] + node, err := helmvm.GetNode(r.Context(), client, nodeName) + if err != nil { + logger.Error(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + JSON(w, http.StatusOK, node) +} diff --git a/pkg/handlers/interface.go b/pkg/handlers/interface.go index c6cb2a00db..b69ae161fd 100644 --- a/pkg/handlers/interface.go +++ b/pkg/handlers/interface.go @@ -144,6 +144,7 @@ type KOTSHandler interface { DrainHelmVMNode(w http.ResponseWriter, r *http.Request) DeleteHelmVMNode(w http.ResponseWriter, r *http.Request) GetHelmVMNodes(w http.ResponseWriter, r *http.Request) + GetHelmVMNode(w http.ResponseWriter, r *http.Request) // Prometheus SetPrometheusAddress(w http.ResponseWriter, r *http.Request) diff --git a/pkg/handlers/mock/mock.go b/pkg/handlers/mock/mock.go index cf9fe09ede..f496224c74 100644 --- a/pkg/handlers/mock/mock.go +++ b/pkg/handlers/mock/mock.go @@ -778,6 +778,18 @@ func (mr *MockKOTSHandlerMockRecorder) GetHelmVMNodes(w, r interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHelmVMNodes", reflect.TypeOf((*MockKOTSHandler)(nil).GetHelmVMNodes), w, r) } +// GetHelmVMNodes mocks base method. +func (m *MockKOTSHandler) GetHelmVMNode(w http.ResponseWriter, r *http.Request) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "GetHelmVMNode", w, r) +} + +// GetHelmVMNode indicates an expected call of GetHelmVMNode. +func (mr *MockKOTSHandlerMockRecorder) GetHelmVMNode(w, r interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHelmVMNode", reflect.TypeOf((*MockKOTSHandler)(nil).GetHelmVMNode), w, r) +} + // GetIdentityServiceConfig mocks base method. func (m *MockKOTSHandler) GetIdentityServiceConfig(w http.ResponseWriter, r *http.Request) { m.ctrl.T.Helper() diff --git a/pkg/helmvm/helmvm_node.go b/pkg/helmvm/helmvm_node.go new file mode 100644 index 0000000000..7805400c1d --- /dev/null +++ b/pkg/helmvm/helmvm_node.go @@ -0,0 +1,105 @@ +package helmvm + +import ( + "context" + "fmt" + "math" + "strconv" + + "github.com/replicatedhq/kots/pkg/helmvm/types" + "github.com/replicatedhq/kots/pkg/k8sutil" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + metricsv "k8s.io/metrics/pkg/client/clientset/versioned" +) + +// GetNode will get a node with stats and podlist +func GetNode(ctx context.Context, client kubernetes.Interface, nodeName string) (*types.Node, error) { + node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("get node %s: %w", nodeName, err) + } + + clientConfig, err := k8sutil.GetClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed to get cluster config: %w", err) + } + + metricsClient, err := metricsv.NewForConfig(clientConfig) + if err != nil { + return nil, fmt.Errorf("failed to create metrics client: %w", err) + } + + nodePods, err := podsOnNode(ctx, client, nodeName) + if err != nil { + return nil, fmt.Errorf("pods per node: %w", err) + } + + cpuCapacity := types.CapacityAvailable{} + memoryCapacity := types.CapacityAvailable{} + podCapacity := types.CapacityAvailable{} + + memoryCapacity.Capacity = float64(node.Status.Capacity.Memory().Value()) / math.Pow(2, 30) // capacity in GB + + cpuCapacity.Capacity, err = strconv.ParseFloat(node.Status.Capacity.Cpu().String(), 64) + if err != nil { + return nil, fmt.Errorf("parse CPU capacity %q for node %s: %w", node.Status.Capacity.Cpu().String(), node.Name, err) + } + + podCapacity.Capacity = float64(node.Status.Capacity.Pods().Value()) + + nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("list pod metrics: %w", err) + } + + if nodeMetrics.Usage.Memory() != nil { + memoryCapacity.Available = memoryCapacity.Capacity - float64(nodeMetrics.Usage.Memory().Value())/math.Pow(2, 30) + } + + if nodeMetrics.Usage.Cpu() != nil { + cpuCapacity.Available = cpuCapacity.Capacity - nodeMetrics.Usage.Cpu().AsApproximateFloat64() + } + + podCapacity.Available = podCapacity.Capacity - float64(len(nodePods)) + + nodeLabelArray := []string{} + for k, v := range node.Labels { + nodeLabelArray = append(nodeLabelArray, fmt.Sprintf("%s:%s", k, v)) + } + + return &types.Node{ + Name: node.Name, + IsConnected: isConnected(*node), + IsReady: isReady(*node), + IsPrimaryNode: isPrimary(*node), + CanDelete: node.Spec.Unschedulable && !isConnected(*node), + KubeletVersion: node.Status.NodeInfo.KubeletVersion, + CPU: cpuCapacity, + Memory: memoryCapacity, + Pods: podCapacity, + Labels: nodeLabelArray, + Conditions: findNodeConditions(node.Status.Conditions), + PodList: nodePods, + }, nil +} + +func podsOnNode(ctx context.Context, client kubernetes.Interface, nodeName string) ([]corev1.Pod, error) { + namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, fmt.Errorf("list namespaces: %w", err) + } + + toReturn := []corev1.Pod{} + + for _, ns := range namespaces.Items { + nsPods, err := client.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)}) + if err != nil { + return nil, fmt.Errorf("list pods on %s in namespace %s: %w", nodeName, ns.Name, err) + } + + toReturn = append(toReturn, nsPods.Items...) + } + return toReturn, nil +} diff --git a/pkg/helmvm/helmvm_nodes.go b/pkg/helmvm/helmvm_nodes.go index e00dca2108..f8bfefff4b 100644 --- a/pkg/helmvm/helmvm_nodes.go +++ b/pkg/helmvm/helmvm_nodes.go @@ -2,34 +2,43 @@ package helmvm import ( "context" - "crypto/tls" - "encoding/json" "fmt" - "io" "math" - "net/http" - "os" "strconv" - "time" "github.com/pkg/errors" "github.com/replicatedhq/kots/pkg/helmvm/types" - "github.com/replicatedhq/kots/pkg/logger" + "github.com/replicatedhq/kots/pkg/k8sutil" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - statsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1" + metricsv "k8s.io/metrics/pkg/client/clientset/versioned" ) // GetNodes will get a list of nodes with stats -func GetNodes(client kubernetes.Interface) (*types.HelmVMNodes, error) { - nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) +func GetNodes(ctx context.Context, client kubernetes.Interface) (*types.HelmVMNodes, error) { + nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return nil, errors.Wrap(err, "list nodes") } + clientConfig, err := k8sutil.GetClusterConfig() + if err != nil { + return nil, errors.Wrap(err, "failed to get cluster config") + } + + metricsClient, err := metricsv.NewForConfig(clientConfig) + if err != nil { + return nil, errors.Wrap(err, "failed to create metrics client") + } + toReturn := types.HelmVMNodes{} + nodePods, err := podsPerNode(ctx, client) + if err != nil { + return nil, errors.Wrap(err, "pods per node") + } + for _, node := range nodes.Items { cpuCapacity := types.CapacityAvailable{} memoryCapacity := types.CapacityAvailable{} @@ -44,32 +53,21 @@ func GetNodes(client kubernetes.Interface) (*types.HelmVMNodes, error) { podCapacity.Capacity = float64(node.Status.Capacity.Pods().Value()) - nodeIP := "" - for _, address := range node.Status.Addresses { - if address.Type == corev1.NodeInternalIP { - nodeIP = address.Address - } + nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{}) + if err != nil { + return nil, errors.Wrap(err, "list pod metrics") } - if nodeIP == "" { - logger.Infof("Did not find address for node %s, %+v", node.Name, node.Status.Addresses) - } else { - nodeMetrics, err := getNodeMetrics(nodeIP) - if err != nil { - logger.Infof("Got error retrieving stats for node %q: %v", node.Name, err) - } else { - if nodeMetrics.Node.Memory != nil && nodeMetrics.Node.Memory.AvailableBytes != nil { - memoryCapacity.Available = float64(*nodeMetrics.Node.Memory.AvailableBytes) / math.Pow(2, 30) - } - - if nodeMetrics.Node.CPU != nil && nodeMetrics.Node.CPU.UsageNanoCores != nil { - cpuCapacity.Available = cpuCapacity.Capacity - (float64(*nodeMetrics.Node.CPU.UsageNanoCores) / math.Pow(10, 9)) - } - - podCapacity.Available = podCapacity.Capacity - float64(len(nodeMetrics.Pods)) - } + if nodeMetrics.Usage.Memory() != nil { + memoryCapacity.Available = memoryCapacity.Capacity - float64(nodeMetrics.Usage.Memory().Value())/math.Pow(2, 30) } + if nodeMetrics.Usage.Cpu() != nil { + cpuCapacity.Available = cpuCapacity.Capacity - nodeMetrics.Usage.Cpu().AsApproximateFloat64() + } + + podCapacity.Available = podCapacity.Capacity - float64(nodePods[node.Name]) + nodeLabelArray := []string{} for k, v := range node.Labels { nodeLabelArray = append(nodeLabelArray, fmt.Sprintf("%s:%s", k, v)) @@ -124,49 +122,36 @@ func findNodeConditions(conditions []corev1.NodeCondition) types.NodeConditions return discoveredConditions } -// get kubelet PKI info from /etc/kubernetes/pki/kubelet, use it to hit metrics server at `http://${nodeIP}:10255/stats/summary` -func getNodeMetrics(nodeIP string) (*statsv1alpha1.Summary, error) { - client := http.Client{ - Timeout: time.Second, +// podsPerNode returns a map of node names to the number of pods, across all namespaces +func podsPerNode(ctx context.Context, client kubernetes.Interface) (map[string]int, error) { + namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, errors.Wrap(err, "list namespaces") } - port := 10255 - // only use mutual TLS if client cert exists - _, err := os.ReadFile("/etc/kubernetes/pki/kubelet/client.crt") - if err == nil { - cert, err := tls.LoadX509KeyPair("/etc/kubernetes/pki/kubelet/client.crt", "/etc/kubernetes/pki/kubelet/client.key") - if err != nil { - return nil, errors.Wrap(err, "get client keypair") - } + toReturn := map[string]int{} - // this will leak memory - client.Transport = &http.Transport{ - TLSClientConfig: &tls.Config{ - Certificates: []tls.Certificate{cert}, - InsecureSkipVerify: true, - }, + for _, ns := range namespaces.Items { + nsPods, err := client.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "list pods in namespace %s", ns.Name) } - port = 10250 - } - r, err := client.Get(fmt.Sprintf("https://%s:%d/stats/summary", nodeIP, port)) - if err != nil { - return nil, errors.Wrapf(err, "get node %s stats", nodeIP) - } - defer r.Body.Close() + for _, pod := range nsPods.Items { + pod := pod + if pod.Spec.NodeName == "" { + continue + } - body, err := io.ReadAll(r.Body) - if err != nil { - return nil, errors.Wrapf(err, "read node %s stats response", nodeIP) - } + if _, ok := toReturn[pod.Spec.NodeName]; !ok { + toReturn[pod.Spec.NodeName] = 0 + } - summary := statsv1alpha1.Summary{} - err = json.Unmarshal(body, &summary) - if err != nil { - return nil, errors.Wrapf(err, "parse node %s stats response", nodeIP) + toReturn[pod.Spec.NodeName]++ + } } - return &summary, nil + return toReturn, nil } func isConnected(node corev1.Node) bool { @@ -201,12 +186,3 @@ func isPrimary(node corev1.Node) bool { return false } - -func internalIP(node corev1.Node) string { - for _, address := range node.Status.Addresses { - if address.Type == corev1.NodeInternalIP { - return address.Address - } - } - return "" -} diff --git a/pkg/helmvm/node_join.go b/pkg/helmvm/node_join.go index 6aad6255a9..4f47a5128f 100644 --- a/pkg/helmvm/node_join.go +++ b/pkg/helmvm/node_join.go @@ -8,5 +8,10 @@ import ( // GenerateAddNodeCommand will generate the HelmVM node add command for a primary or secondary node func GenerateAddNodeCommand(client kubernetes.Interface, primary bool) ([]string, *time.Time, error) { - return nil, nil, nil + tomorrow := time.Now().Add(time.Hour * 24) + if primary { + return []string{"this is a primary join command string", "that can be multiple strings"}, &tomorrow, nil + } else { + return []string{"this is a secondary join command string", "that can be multiple strings"}, &tomorrow, nil + } } diff --git a/pkg/helmvm/types/types.go b/pkg/helmvm/types/types.go index c298dfbd93..78ea23f248 100644 --- a/pkg/helmvm/types/types.go +++ b/pkg/helmvm/types/types.go @@ -1,5 +1,7 @@ package types +import corev1 "k8s.io/api/core/v1" + type HelmVMNodes struct { Nodes []Node `json:"nodes"` HA bool `json:"ha"` @@ -18,6 +20,7 @@ type Node struct { Pods CapacityAvailable `json:"pods"` Labels []string `json:"labels"` Conditions NodeConditions `json:"conditions"` + PodList []corev1.Pod `json:"podList"` } type CapacityAvailable struct { diff --git a/pkg/helmvm/util.go b/pkg/helmvm/util.go index 3c2e4fbe66..5dd2cdc11f 100644 --- a/pkg/helmvm/util.go +++ b/pkg/helmvm/util.go @@ -41,5 +41,5 @@ func IsHelmVM(clientset kubernetes.Interface) (bool, error) { } func IsHA(clientset kubernetes.Interface) (bool, error) { - return false, nil + return true, nil }