Skip to content

Commit

Permalink
Merge pull request #1 from darox/add-multi-contex
Browse files Browse the repository at this point in the history
Add multicluster support
  • Loading branch information
darox authored Oct 11, 2024
2 parents 668ec0f + 2b1e9fb commit 8ec72b5
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 58 deletions.
33 changes: 24 additions & 9 deletions pkg/cli/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ var runCmd = &cobra.Command{
clientNode, _ := cmd.Flags().GetString("k8s-client-node")
domain, _ := cmd.Flags().GetString("k8s-domain")
cleanup, _ := cmd.Flags().GetBool("k8s-cleanup")
serverContext, _ := cmd.Flags().GetString("k8s-server-context")
clientContext, _ := cmd.Flags().GetString("k8s-client-context")
multiCluster, _ := cmd.Flags().GetBool("k8s-multi-cluster")
serviceAnnotation, _ := cmd.Flags().GetString("k8s-service-annotation")
serverClientSet, err := k8s.NewClient(serverContext)
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}

client, err := k8s.NewClient()
clientClientSet, err := k8s.NewClient(clientContext)
if err != nil {
return fmt.Errorf("failed to create Kubernetes client: %w", err)
}
Expand All @@ -38,14 +46,17 @@ var runCmd = &cobra.Command{
}

config := iperf.TestConfig{
Client: client,
Namespace: namespace,
Image: image,
IperfArgs: iperfArgs,
ServerNode: serverNode,
ClientNode: clientNode,
Domain: domain,
Cleanup: cleanup,
ClientClientSet: clientClientSet,
ServerClientSet: serverClientSet,
Namespace: namespace,
Image: image,
IperfArgs: iperfArgs,
ServerNode: serverNode,
ClientNode: clientNode,
Domain: domain,
Cleanup: cleanup,
MultiCluster: multiCluster,
ServiceAnnotation: serviceAnnotation,
}

return iperf.RunTest(config)
Expand All @@ -59,6 +70,10 @@ func init() {
runCmd.Flags().StringP("k8s-client-node", "", "", "Client node to use for the test")
runCmd.Flags().StringP("k8s-domain", "", "cluster.local", "Kubernetes domain to use for the test")
runCmd.Flags().BoolP("k8s-cleanup", "", true, "Cleanup resources after the test")
runCmd.Flags().StringP("k8s-server-context", "", "", "Kubernetes server context to use for the test")
runCmd.Flags().StringP("k8s-client-context", "", "", "Kubernetes client context to use for the test")
runCmd.Flags().BoolP("k8s-multi-cluster", "", false, "Run the test in multi-cluster mode")
runCmd.Flags().StringP("k8s-service-annotation", "", "", "Service annotation to use for the test")
rootCmd.AddCommand(runCmd)
}

Expand Down
158 changes: 111 additions & 47 deletions pkg/iperf/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"github.com/fatih/color"
)

// At the package level, add this static error:
// Used for Cilium Cluster Mesh to test across two different clusters

var errIperf3ClientPodFailed = errors.New("iperf3 client pod failed")

// Add static error variables
var (
errEmptyLogs = errors.New("iperf3 client returned empty logs")
errMissingStartField = errors.New("missing or invalid 'start' field in JSON data")
Expand All @@ -34,16 +34,21 @@ var (
errInvalidConnectedData = errors.New("invalid 'connected' data structure in JSON data")
)

var errCleanup = errors.New("cleanup errors occurred")

// TestConfig holds the configuration for the iperf3 test
type TestConfig struct {
Client *kubernetes.Clientset
Namespace string
Domain string
Image string
IperfArgs []string
ServerNode string
ClientNode string
Cleanup bool
ServerClientSet *kubernetes.Clientset
ClientClientSet *kubernetes.Clientset
Namespace string
Domain string
Image string
IperfArgs []string
ServerNode string
ClientNode string
Cleanup bool
MultiCluster bool
ServiceAnnotation string
}

func RunTest(config TestConfig) error {
Expand All @@ -66,18 +71,18 @@ func RunTest(config TestConfig) error {
case <-sigChan:
fmt.Println("Received interrupt signal, cleaning up resources...")
cancel() // Cancel the context to stop ongoing operations
return cleanup(config.Client, config.Namespace)
return cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster)
case <-ctx.Done():
fmt.Println("Test cancelled, cleaning up resources...")
return cleanup(config.Client, config.Namespace)
return cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster)
}
}

func runTestInternal(ctx context.Context, config TestConfig) error {
// Defer cleanup if Cleanup is set to true
if config.Cleanup {
defer func() {
if err := cleanup(config.Client, config.Namespace); err != nil {
if err := cleanup(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster); err != nil {
fmt.Printf("Failed to cleanup resources: %v\n", err)
}
}()
Expand All @@ -88,23 +93,23 @@ func runTestInternal(ctx context.Context, config TestConfig) error {
color.Red("✘ Failed to deploy iperf3 server: %v", err)
return fmt.Errorf("failed to deploy iperf3 server: %w", err)
}
color.Green("✔ iperf3 server deployed successfully")
color.Green("✔ iperf3 server created successfully")

// Create service for iperf3 server
if err := createIperf3Service(config.Client, config.Namespace); err != nil {
if err := createIperf3Service(config.ServerClientSet, config.ClientClientSet, config.Namespace, config.MultiCluster, config.ServiceAnnotation); err != nil {
color.Red("✘ Failed to create iperf3 service: %v", err)
return fmt.Errorf("failed to create iperf3 service: %w", err)
}
color.Green("✔ iperf3 service created successfully")

// Wait for server to be ready
if err := waitForDeploymentReady(config.Client, config.Namespace, "iperf3-server", 60*time.Second); err != nil {
if err := waitForDeploymentReady(config.ServerClientSet, config.Namespace, "iperf3-server", 60*time.Second); err != nil {
color.Red("✘ iperf3 server failed to become ready: %v", err)
return fmt.Errorf("iperf3 server failed to become ready: %w", err)
}

// Add this: Wait for the iperf3 server pod to be ready
if err := waitForPodReady(ctx, config.Client, config.Namespace, "app=iperf3-server", 60*time.Second); err != nil {
// Wait for the iperf3 server pod to be ready
if err := waitForPodReady(ctx, config.ServerClientSet, config.Namespace, "app=iperf3-server", 60*time.Second); err != nil {
color.Red("✘ iperf3 server pod failed to become ready: %v", err)
return fmt.Errorf("iperf3 server pod failed to become ready: %w", err)
}
Expand Down Expand Up @@ -185,11 +190,11 @@ func deployIperf3Server(config TestConfig) error {
}
}

_, err := config.Client.AppsV1().Deployments(config.Namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
_, err := config.ServerClientSet.AppsV1().Deployments(config.Namespace).Create(context.TODO(), deployment, metav1.CreateOptions{})
return err
}

func createIperf3Service(client *kubernetes.Clientset, namespace string) error {
func createIperf3Service(clientSetServer *kubernetes.Clientset, clientSetClient *kubernetes.Clientset, namespace string, multiCluster bool, serviceAnnotation string) error {
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "iperf3-server",
Expand All @@ -210,7 +215,31 @@ func createIperf3Service(client *kubernetes.Clientset, namespace string) error {
},
}

_, err := client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
// signature of serviceAnnotation is key1=value1,key2=value2. This is useful for adding the Cilium Cluster Mesh annotation
if serviceAnnotation != "" {

// init map for annotations
service.Annotations = make(map[string]string)

for _, v := range strings.Split(serviceAnnotation, ",") {
kv := strings.Split(v, "=")
if len(kv) == 2 {
service.Annotations[kv[0]] = kv[1]
}
}
}

if multiCluster {
for _, client := range []*kubernetes.Clientset{clientSetServer, clientSetClient} {
_, err := client.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create service in cluster: %w", err)
}
}
return nil
}

_, err := clientSetServer.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
return err
}

Expand Down Expand Up @@ -255,12 +284,12 @@ func runIperf3Client(ctx context.Context, config TestConfig) error {
}
}

_, err := config.Client.CoreV1().Pods(config.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
_, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
if err != nil {
return err
}

color.Green("✔ iperf3 client pod created successfully")
color.Green("✔ iperf3 client created successfully")

color.Green("► Starting iperf3 test")

Expand All @@ -271,7 +300,7 @@ func runIperf3Client(ctx context.Context, config TestConfig) error {
case <-ctx.Done():
return ctx.Err()
default:
pod, err := config.Client.CoreV1().Pods(config.Namespace).Get(ctx, "iperf3-client", metav1.GetOptions{})
pod, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).Get(ctx, "iperf3-client", metav1.GetOptions{})
if err != nil {
return err
}
Expand All @@ -287,7 +316,7 @@ func runIperf3Client(ctx context.Context, config TestConfig) error {

PodCompleted:
// Get and print the logs
logs, err := config.Client.CoreV1().Pods(config.Namespace).GetLogs("iperf3-client", &corev1.PodLogOptions{}).Do(ctx).Raw()
logs, err := config.ClientClientSet.CoreV1().Pods(config.Namespace).GetLogs("iperf3-client", &corev1.PodLogOptions{}).Do(ctx).Raw()
if err != nil {
return err
}
Expand Down Expand Up @@ -352,29 +381,29 @@ func printIperfSummary(jsonData []byte) error {

// Prepare all lines
lines := []string{
fmt.Sprintf("Connection Details:"),
fmt.Sprintf(" Local: %s:%d", connectedMap["local_host"], int(connectedMap["local_port"].(float64))),
fmt.Sprintf(" Remote: %s:%d", connectedMap["remote_host"], int(connectedMap["remote_port"].(float64))),
fmt.Sprintf(""),
fmt.Sprintf("Test Configuration:"),
fmt.Sprintf(" Protocol: %s", testStart["protocol"]),
fmt.Sprintf(" Duration: %.2f seconds", sumSent["seconds"]),
fmt.Sprintf(" Parallel Streams: %d", int(testStart["num_streams"].(float64))),
fmt.Sprintf(""),
fmt.Sprintf("Results:"),
fmt.Sprintf(" Sent: %.2f Mbits/sec", sumSent["bits_per_second"].(float64)/1e6),
fmt.Sprintf(" Received: %.2f Mbits/sec", sumReceived["bits_per_second"].(float64)/1e6),
"Connection Details:",
fmt.Sprintf(" Local: %s:%d", getStringValue(connectedMap, "local_host"), getIntValue(connectedMap, "local_port")),
fmt.Sprintf(" Remote: %s:%d", getStringValue(connectedMap, "remote_host"), getIntValue(connectedMap, "remote_port")),
"",
"Test Configuration:",
fmt.Sprintf(" Protocol: %s", getStringValue(testStart, "protocol")),
fmt.Sprintf(" Duration: %.2f seconds", getFloatValue(sumSent, "seconds")),
fmt.Sprintf(" Parallel Streams: %d", getIntValue(testStart, "num_streams")),
"",
"Results:",
fmt.Sprintf(" Sent: %.2f Mbits/sec", getFloatValue(sumSent, "bits_per_second")/1e6),
fmt.Sprintf(" Received: %.2f Mbits/sec", getFloatValue(sumReceived, "bits_per_second")/1e6),
}

if retransmits, ok := sumSent["retransmits"]; ok {
lines = append(lines, fmt.Sprintf(" Retransmits: %d", int(retransmits.(float64))))
}

lines = append(lines,
fmt.Sprintf(""),
fmt.Sprintf("CPU Utilization:"),
fmt.Sprintf(" Local: %.2f%%", cpuUtil["host_total"].(float64)),
fmt.Sprintf(" Remote: %.2f%%", cpuUtil["remote_total"].(float64)),
"",
"CPU Utilization:",
fmt.Sprintf(" Local: %.2f%%", getFloatValue(cpuUtil, "host_total")),
fmt.Sprintf(" Remote: %.2f%%", getFloatValue(cpuUtil, "remote_total")),
)

// Calculate the length of the longest line
Expand Down Expand Up @@ -403,16 +432,30 @@ func printIperfSummary(jsonData []byte) error {
return nil
}

func cleanup(client *kubernetes.Clientset, namespace string) error {
if err := client.AppsV1().Deployments(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil {
return err
func cleanup(serverClientSet, clientClientSet *kubernetes.Clientset, namespace string, multiCluster bool) error {
var errs []error

if err := serverClientSet.AppsV1().Deployments(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil {
errs = append(errs, fmt.Errorf("failed to delete iperf3-server deployment: %w", err))
}

if err := client.CoreV1().Services(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil {
return err
if err := clientClientSet.CoreV1().Pods(namespace).Delete(context.TODO(), "iperf3-client", metav1.DeleteOptions{}); err != nil {
errs = append(errs, fmt.Errorf("failed to delete iperf3-client pod: %w", err))
}

return client.CoreV1().Pods(namespace).Delete(context.TODO(), "iperf3-client", metav1.DeleteOptions{})
if multiCluster {
for _, client := range []*kubernetes.Clientset{serverClientSet, clientClientSet} {
if err := client.CoreV1().Services(namespace).Delete(context.TODO(), "iperf3-server", metav1.DeleteOptions{}); err != nil {
errs = append(errs, fmt.Errorf("failed to delete iperf3-server service in cluster: %w", err))
}
}
}

if len(errs) > 0 {
return fmt.Errorf("%w: %v", errCleanup, errs)
}

return nil
}

func int32Ptr(i int32) *int32 {
Expand Down Expand Up @@ -463,3 +506,24 @@ func waitForPodReady(ctx context.Context, client *kubernetes.Clientset, namespac
return false, nil
})
}

func getStringValue(m map[string]interface{}, key string) string {
if value, ok := m[key].(string); ok {
return value
}
return "N/A"
}

func getIntValue(m map[string]interface{}, key string) int {
if value, ok := m[key].(float64); ok {
return int(value)
}
return 0
}

func getFloatValue(m map[string]interface{}, key string) float64 {
if value, ok := m[key].(float64); ok {
return value
}
return 0.0
}
19 changes: 17 additions & 2 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,29 @@ import (
"k8s.io/client-go/tools/clientcmd"
)

func NewClient() (*kubernetes.Clientset, error) {
func NewClient(context string) (*kubernetes.Clientset, error) {
homeDir, err := os.UserHomeDir()
if err != nil {
return nil, err
}

kubeconfig := filepath.Join(homeDir, ".kube", "config")
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)

// Create a config loading rule that prefers the provided context
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
loadingRules.ExplicitPath = kubeconfig

// Create a config overrides struct and set the context if provided
overrides := &clientcmd.ConfigOverrides{}
if context != "" {
overrides.CurrentContext = context
}

// Create a client config using the loading rules and overrides
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)

// Get the rest config
config, err := clientConfig.ClientConfig()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8ec72b5

Please sign in to comment.