From 2b1e9fb2be5aa1023ea306576f53f20351c99f67 Mon Sep 17 00:00:00 2001 From: darox Date: Fri, 11 Oct 2024 15:24:13 +0200 Subject: [PATCH] Add multicluster support This commit introduces multicluster support, which allows to specify separate K8s contexts for the iperf3 client and server. An example usage is: ``` k8s-iperf run --k8s-server-context kind-1 --k8s-client-context kind-2 --k8s-multicluster true --k8s-service-annotation "service.cilium.io/global: true" ``` This is using Cilium Cluster Mesh to test cross cluster throughput. Signed-off-by: darox --- pkg/cli/commands.go | 33 ++++++--- pkg/iperf/test.go | 158 +++++++++++++++++++++++++++++++------------- pkg/k8s/client.go | 19 +++++- 3 files changed, 152 insertions(+), 58 deletions(-) diff --git a/pkg/cli/commands.go b/pkg/cli/commands.go index fcad9f6..efb0767 100644 --- a/pkg/cli/commands.go +++ b/pkg/cli/commands.go @@ -25,8 +25,16 @@ var runCmd = &cobra.Command{ clientNode, _ := cmd.Flags().GetString("k8s-client-node") domain, _ := cmd.Flags().GetString("k8s-domain") cleanup, _ := cmd.Flags().GetBool("k8s-cleanup") + serverContext, _ := cmd.Flags().GetString("k8s-server-context") + clientContext, _ := cmd.Flags().GetString("k8s-client-context") + multiCluster, _ := cmd.Flags().GetBool("k8s-multi-cluster") + serviceAnnotation, _ := cmd.Flags().GetString("k8s-service-annotation") + serverClientSet, err := k8s.NewClient(serverContext) + if err != nil { + return fmt.Errorf("failed to create Kubernetes client: %w", err) + } - client, err := k8s.NewClient() + clientClientSet, err := k8s.NewClient(clientContext) if err != nil { return fmt.Errorf("failed to create Kubernetes client: %w", err) } @@ -38,14 +46,17 @@ var runCmd = &cobra.Command{ } config := iperf.TestConfig{ - Client: client, - Namespace: namespace, - Image: image, - IperfArgs: iperfArgs, - ServerNode: serverNode, - ClientNode: clientNode, - Domain: domain, - Cleanup: cleanup, + ClientClientSet: clientClientSet, + ServerClientSet: serverClientSet, + Namespace: namespace, + Image: image, + IperfArgs: iperfArgs, + ServerNode: serverNode, + ClientNode: clientNode, + Domain: domain, + Cleanup: cleanup, + MultiCluster: multiCluster, + ServiceAnnotation: serviceAnnotation, } return iperf.RunTest(config) @@ -59,6 +70,10 @@ func init() { runCmd.Flags().StringP("k8s-client-node", "", "", "Client node to use for the test") runCmd.Flags().StringP("k8s-domain", "", "cluster.local", "Kubernetes domain to use for the test") runCmd.Flags().BoolP("k8s-cleanup", "", true, "Cleanup resources after the test") + runCmd.Flags().StringP("k8s-server-context", "", "", "Kubernetes server context to use for the test") + runCmd.Flags().StringP("k8s-client-context", "", "", "Kubernetes client context to use for the test") + runCmd.Flags().BoolP("k8s-multi-cluster", "", false, "Run the test in multi-cluster mode") + runCmd.Flags().StringP("k8s-service-annotation", "", "", "Service annotation to use for the test") rootCmd.AddCommand(runCmd) } diff --git a/pkg/iperf/test.go b/pkg/iperf/test.go index e2deeba..43bd27f 100644 --- a/pkg/iperf/test.go +++ b/pkg/iperf/test.go @@ -22,10 +22,10 @@ import ( "github.com/fatih/color" ) -// At the package level, add this static error: +// Used for Cilium Cluster Mesh to test across two different clusters + var errIperf3ClientPodFailed = errors.New("iperf3 client pod failed") -// Add static error variables var ( errEmptyLogs = errors.New("iperf3 client returned empty logs") errMissingStartField = errors.New("missing or invalid 'start' field in JSON data") @@ -34,16 +34,21 @@ var ( errInvalidConnectedData = errors.New("invalid 'connected' data structure in JSON data") ) +var errCleanup = errors.New("cleanup errors occurred") + // TestConfig holds the configuration for the iperf3 test type TestConfig struct { - Client *kubernetes.Clientset - Namespace string - Domain string - Image string - IperfArgs []string - ServerNode string - ClientNode string - Cleanup bool + ServerClientSet *kubernetes.Clientset + ClientClientSet *kubernetes.Clientset + Namespace string + Domain string + Image string + IperfArgs []string + ServerNode string + ClientNode string + Cleanup bool + MultiCluster bool + ServiceAnnotation string } func RunTest(config TestConfig) error { @@ -66,10 +71,10 @@ func RunTest(config TestConfig) error { case <-sigChan: fmt.Println("Received interrupt signal, cleaning up resources...") cancel() // Cancel the context to stop ongoing operations - return cleanup(config.Client, config.Namespace) + return cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster) case <-ctx.Done(): fmt.Println("Test cancelled, cleaning up resources...") - return cleanup(config.Client, config.Namespace) + return cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster) } } @@ -77,7 +82,7 @@ func runTestInternal(ctx context.Context, config TestConfig) error { // Defer cleanup if Cleanup is set to true if config.Cleanup { defer func() { - if err := cleanup(config.Client, config.Namespace); err != nil { + if err := cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster); err != nil { fmt.Printf("Failed to cleanup resources: %v\n", err) } }() @@ -88,23 +93,23 @@ func runTestInternal(ctx context.Context, config TestConfig) error { color.Red("✘ Failed to deploy iperf3 server: %v", err) return fmt.Errorf("failed to deploy iperf3 server: %w", err) } - color.Green("✔ iperf3 server deployed successfully") + color.Green("✔ iperf3 server created successfully") // Create service for iperf3 server - if err := createIperf3Service(config.Client, config.Namespace); err != nil { + if err := createIperf3Service(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster, config.ServiceAnnotation); err != nil { color.Red("✘ Failed to create iperf3 service: %v", err) return fmt.Errorf("failed to create iperf3 service: %w", err) } color.Green("✔ iperf3 service created successfully") // Wait for server to be ready - if err := waitForDeploymentReady(config.Client, config.Namespace, "iperf3-server", 60*time.Second); err != nil { + if err := waitForDeploymentReady(config.ServerClientSet, config.Namespace, "iperf3-server", 60*time.Second); err != nil { color.Red("✘ iperf3 server failed to become ready: %v", err) return fmt.Errorf("iperf3 server failed to become ready: %w", err) } - // Add this: Wait for the iperf3 server pod to be ready - if err := waitForPodReady(ctx, config.Client, config.Namespace, "app=iperf3-server", 60*time.Second); err != nil { + // Wait for the iperf3 server pod to be ready + if err := waitForPodReady(ctx, config.ServerClientSet, config.Namespace, "app=iperf3-server", 60*time.Second); err != nil { color.Red("✘ iperf3 server pod failed to become ready: %v", err) return fmt.Errorf("iperf3 server pod failed to become ready: %w", err) } @@ -185,11 +190,11 @@ func deployIperf3Server(config TestConfig) error { } } - _, err := config.Client.AppsV1().Deployments(config.Namespace).Create(context.TODO(), deployment, metav1.CreateOptions{}) + _, err := config.ServerClientSet.AppsV1().Deployments(config.Namespace).Create(context.TODO(), deployment, metav1.CreateOptions{}) return err } -func createIperf3Service(client *kubernetes.Clientset, namespace string) error { +func createIperf3Service(clientSetServer *kubernetes.Clientset, clientSetClient *kubernetes.Clientset, namespace string, multiCluster bool, serviceAnnotation string) error { service := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "iperf3-server", @@ -210,7 +215,31 @@ func createIperf3Service(client *kubernetes.Clientset, namespace string) error { }, } - _, err := client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{}) + // signature of serviceAnnotation is key1=value1,key2=value2. This is useful for adding the Cilium Cluster Mesh annotation + if serviceAnnotation != "" { + + // init map for annotations + service.Annotations = make(map[string]string) + + for _, v := range strings.Split(serviceAnnotation, ",") { + kv := strings.Split(v, "=") + if len(kv) == 2 { + service.Annotations[kv[0]] = kv[1] + } + } + } + + if multiCluster { + for _, client := range []*kubernetes.Clientset{clientSetServer, clientSetClient} { + _, err := client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("failed to create service in cluster: %w", err) + } + } + return nil + } + + _, err := clientSetServer.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{}) return err } @@ -255,12 +284,12 @@ func runIperf3Client(ctx context.Context, config TestConfig) error { } } - _, err := config.Client.CoreV1().Pods(config.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + _, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) if err != nil { return err } - color.Green("✔ iperf3 client pod created successfully") + color.Green("✔ iperf3 client created successfully") color.Green("► Starting iperf3 test") @@ -271,7 +300,7 @@ func runIperf3Client(ctx context.Context, config TestConfig) error { case <-ctx.Done(): return ctx.Err() default: - pod, err := config.Client.CoreV1().Pods(config.Namespace).Get(ctx, "iperf3-client", metav1.GetOptions{}) + pod, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).Get(ctx, "iperf3-client", metav1.GetOptions{}) if err != nil { return err } @@ -287,7 +316,7 @@ func runIperf3Client(ctx context.Context, config TestConfig) error { PodCompleted: // Get and print the logs - logs, err := config.Client.CoreV1().Pods(config.Namespace).GetLogs("iperf3-client", &corev1.PodLogOptions{}).Do(ctx).Raw() + logs, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).GetLogs("iperf3-client", &corev1.PodLogOptions{}).Do(ctx).Raw() if err != nil { return err } @@ -352,18 +381,18 @@ func printIperfSummary(jsonData []byte) error { // Prepare all lines lines := []string{ - fmt.Sprintf("Connection Details:"), - fmt.Sprintf(" Local: %s:%d", connectedMap["local_host"], int(connectedMap["local_port"].(float64))), - fmt.Sprintf(" Remote: %s:%d", connectedMap["remote_host"], int(connectedMap["remote_port"].(float64))), - fmt.Sprintf(""), - fmt.Sprintf("Test Configuration:"), - fmt.Sprintf(" Protocol: %s", testStart["protocol"]), - fmt.Sprintf(" Duration: %.2f seconds", sumSent["seconds"]), - fmt.Sprintf(" Parallel Streams: %d", int(testStart["num_streams"].(float64))), - fmt.Sprintf(""), - fmt.Sprintf("Results:"), - fmt.Sprintf(" Sent: %.2f Mbits/sec", sumSent["bits_per_second"].(float64)/1e6), - fmt.Sprintf(" Received: %.2f Mbits/sec", sumReceived["bits_per_second"].(float64)/1e6), + "Connection Details:", + fmt.Sprintf(" Local: %s:%d", getStringValue(connectedMap, "local_host"), getIntValue(connectedMap, "local_port")), + fmt.Sprintf(" Remote: %s:%d", getStringValue(connectedMap, "remote_host"), getIntValue(connectedMap, "remote_port")), + "", + "Test Configuration:", + fmt.Sprintf(" Protocol: %s", getStringValue(testStart, "protocol")), + fmt.Sprintf(" Duration: %.2f seconds", getFloatValue(sumSent, "seconds")), + fmt.Sprintf(" Parallel Streams: %d", getIntValue(testStart, "num_streams")), + "", + "Results:", + fmt.Sprintf(" Sent: %.2f Mbits/sec", getFloatValue(sumSent, "bits_per_second")/1e6), + fmt.Sprintf(" Received: %.2f Mbits/sec", getFloatValue(sumReceived, "bits_per_second")/1e6), } if retransmits, ok := sumSent["retransmits"]; ok { @@ -371,10 +400,10 @@ func printIperfSummary(jsonData []byte) error { } lines = append(lines, - fmt.Sprintf(""), - fmt.Sprintf("CPU Utilization:"), - fmt.Sprintf(" Local: %.2f%%", cpuUtil["host_total"].(float64)), - fmt.Sprintf(" Remote: %.2f%%", cpuUtil["remote_total"].(float64)), + "", + "CPU Utilization:", + fmt.Sprintf(" Local: %.2f%%", getFloatValue(cpuUtil, "host_total")), + fmt.Sprintf(" Remote: %.2f%%", getFloatValue(cpuUtil, "remote_total")), ) // Calculate the length of the longest line @@ -403,16 +432,30 @@ func printIperfSummary(jsonData []byte) error { return nil } -func cleanup(client *kubernetes.Clientset, namespace string) error { - if err := client.AppsV1().Deployments(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil { - return err +func cleanup(serverClientSet, clientClientSet *kubernetes.Clientset, namespace string, multiCluster bool) error { + var errs []error + + if err := serverClientSet.AppsV1().Deployments(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil { + errs = append(errs, fmt.Errorf("failed to delete iperf3-server deployment: %w", err)) } - if err := client.CoreV1().Services(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil { - return err + if err := clientClientSet.CoreV1().Pods(namespace).Delete(context.TODO(), "iperf3-client", metav1.DeleteOptions{}); err != nil { + errs = append(errs, fmt.Errorf("failed to delete iperf3-client pod: %w", err)) } - return client.CoreV1().Pods(namespace).Delete(context.TODO(), "iperf3-client", metav1.DeleteOptions{}) + if multiCluster { + for _, client := range []*kubernetes.Clientset{serverClientSet, clientClientSet} { + if err := client.CoreV1().Services(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil { + errs = append(errs, fmt.Errorf("failed to delete iperf3-server service in cluster: %w", err)) + } + } + } + + if len(errs) > 0 { + return fmt.Errorf("%w: %v", errCleanup, errs) + } + + return nil } func int32Ptr(i int32) *int32 { @@ -463,3 +506,24 @@ func waitForPodReady(ctx context.Context, client *kubernetes.Clientset, namespac return false, nil }) } + +func getStringValue(m map[string]interface{}, key string) string { + if value, ok := m[key].(string); ok { + return value + } + return "N/A" +} + +func getIntValue(m map[string]interface{}, key string) int { + if value, ok := m[key].(float64); ok { + return int(value) + } + return 0 +} + +func getFloatValue(m map[string]interface{}, key string) float64 { + if value, ok := m[key].(float64); ok { + return value + } + return 0.0 +} diff --git a/pkg/k8s/client.go b/pkg/k8s/client.go index aa4e8dc..d2f91a2 100644 --- a/pkg/k8s/client.go +++ b/pkg/k8s/client.go @@ -8,14 +8,29 @@ import ( "k8s.io/client-go/tools/clientcmd" ) -func NewClient() (*kubernetes.Clientset, error) { +func NewClient(context string) (*kubernetes.Clientset, error) { homeDir, err := os.UserHomeDir() if err != nil { return nil, err } kubeconfig := filepath.Join(homeDir, ".kube", "config") - config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + + // Create a config loading rule that prefers the provided context + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + loadingRules.ExplicitPath = kubeconfig + + // Create a config overrides struct and set the context if provided + overrides := &clientcmd.ConfigOverrides{} + if context != "" { + overrides.CurrentContext = context + } + + // Create a client config using the loading rules and overrides + clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides) + + // Get the rest config + config, err := clientConfig.ClientConfig() if err != nil { return nil, err }