Skip to content

Commit

Permalink
Initial review changes
Browse files Browse the repository at this point in the history
Signed-off-by: Kanha gupta <[email protected]>

Initial review changes

Signed-off-by: Kanha gupta <[email protected]>
  • Loading branch information
kanha-gupta committed Mar 23, 2024
1 parent 6c238fe commit 2a26ae3
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 92 deletions.
18 changes: 3 additions & 15 deletions .github/workflows/kind.yml
Original file line number Diff line number Diff line change
Expand Up @@ -769,29 +769,17 @@ jobs:
sudo mv kind /usr/local/bin
- name: Create Kind Cluster
run: |
cat <<EOF > kind-config.yml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
networking:
disableDefaultCNI: true
podSubnet: 10.10.0.0/16
nodes:
- role: control-plane
- role: worker
- role: worker
EOF
kind create cluster --config kind-config.yml
kind create cluster --config ci/kind/config-3nodes.yml
- name: load docker image into kind
run: |
kind load docker-image antrea/antrea-controller-ubuntu-coverage:latest antrea/antrea-agent-ubuntu-coverage:latest
kubectl apply -f build/yamls/antrea.yml
- name: build antctl binaries
run: |
make antctl
sudo cp bin/antctl-linux /usr/local/bin/antctl
make antctl-linux
- name: run antctl command
run: |
antctl test
./bin/antctl-linux test
validate-prometheus-metrics-doc:
name: Validate metrics in Prometheus document match running deployment's
Expand Down
24 changes: 15 additions & 9 deletions pkg/antctl/raw/test/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@ import (
"bytes"
"context"
"fmt"
"time"

"k8s.io/apimachinery/pkg/util/wait"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)

type Client struct {
Clientset kubernetes.Interface
Config *rest.Config
RawConfig clientcmdapi.Config
contextName string
clusterName string
}

func NewClient(contextName, kubeconfig string) (*Client, error) {
Expand Down Expand Up @@ -63,11 +65,16 @@ func NewClient(contextName, kubeconfig string) (*Client, error) {
contextName = rawConfig.CurrentContext
}

clusterName := ""
if context, ok := rawConfig.Contexts[contextName]; ok {
clusterName = context.Cluster
}

return &Client{
Clientset: clientset,
Config: config,
RawConfig: rawConfig,
contextName: contextName,
clusterName: clusterName,
}, nil
}

Expand All @@ -76,10 +83,7 @@ func (c *Client) ContextName() (name string) {
}

func (c *Client) ClusterName() (name string) {
if context, ok := c.RawConfig.Contexts[c.ContextName()]; ok {
name = context.Cluster
}
return
return c.clusterName
}

func (c *Client) CreateService(ctx context.Context, namespace string, service *corev1.Service, opts metav1.CreateOptions) (*corev1.Service, error) {
Expand Down Expand Up @@ -181,8 +185,10 @@ func (c *Client) DeleteDaemonSet(ctx context.Context, namespace, name string, op
return c.Clientset.AppsV1().DaemonSets(namespace).Delete(ctx, name, opts)
}

type Kind int

func (c *Client) ListNodes(ctx context.Context, options metav1.ListOptions) (*corev1.NodeList, error) {
return c.Clientset.CoreV1().Nodes().List(ctx, options)
}

func (c *Client) WaitForDeployment(ctx context.Context, interval, timeout time.Duration, immediate bool, condition wait.ConditionWithContextFunc) error {
return wait.PollUntilContextTimeout(ctx, interval, timeout, immediate, condition)
}
104 changes: 36 additions & 68 deletions pkg/antctl/raw/test/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"os"
"time"

"k8s.io/apimachinery/pkg/util/wait"

"github.com/spf13/cobra"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -54,6 +56,10 @@ func init() {
return check.Run(context.Background())
},
}
Command.Flags().StringVarP(&parameters.antreaNamespace, "namespace", "n", "kube-system", "Configure namespace in which Antrea is running")
Command.Flags().BoolVar(&parameters.ForceDeploy, "force-deploy", false, "Force deploys test deployments")
//TODO
Command.Flags().BoolVar(&parameters.SingleNode, "single-node", false, "Runs tests applicable only on single node environment")
}

const (
Expand Down Expand Up @@ -93,12 +99,8 @@ func (k *k8sConnectivityParams) Run(ctx context.Context) error {
return nil
}

func curlCommand(target string) []string {
return []string{"curl", "-sS", "--fail", "--connect-timeout", "5", "-o", "/dev/null", target}
}

func curlCommandforInternet(target string) []string {
return []string{"curl", "-s", "--fail", target}
func agnhostConnectCommand(target string) []string {
return []string{"/agnhost", "connect", target, "--timeout=5s"}
}

func newService(name string, selector map[string]string, portName string, port int) *corev1.Service {
Expand Down Expand Up @@ -206,7 +208,7 @@ func (k *k8sConnectivityParams) validatePodToPod(ctx context.Context) {
success = true
)
k.Header("Validating from pod %s to pod %s...", srcPod, dstPod)
_, err := k.client.ExecInPod(ctx, connectivityCheckNamespace, clientPod.Name, "", curlCommand(echoIP+":80"))
_, err := k.client.ExecInPod(ctx, connectivityCheckNamespace, clientPod.Name, "", agnhostConnectCommand(echoIP+":80"))
if err != nil {
k.Log("curl connectivity check command failed: %s", err)
success = false
Expand All @@ -231,7 +233,7 @@ func (k *k8sConnectivityParams) validatePodInternetConnectivity(ctx context.Cont
)

k.Header("Validating connectivity from pod %s to the world (google.com)...", srcPod)
_, err := k.client.ExecInPod(ctx, connectivityCheckNamespace, clientPod.Name, clientDeploymentName, curlCommandforInternet("https://google.com"))
_, err := k.client.ExecInPod(ctx, connectivityCheckNamespace, clientPod.Name, clientDeploymentName, agnhostConnectCommand("google.com:80"))
if err != nil {
k.Log("Connectivity test from pod %s to google.com failed: %s", srcPod, err)
success = false
Expand All @@ -254,13 +256,10 @@ type Parameters struct {
antreaNamespace string
SingleNode bool
ForceDeploy bool
MultiCluster string
Writer io.Writer
}

func (p Parameters) podReadyTimeout() time.Duration {
return 5 * time.Minute
}
const podReadyTimeout = 5 * time.Minute

func (k *k8sConnectivityParams) deleteDeployments(ctx context.Context, client k8sClientOperations) error {
k.Log("[%s] Deleting connectivity check deployments...", client.ClusterName())
Expand All @@ -286,23 +285,19 @@ func (k *k8sConnectivityParams) deleteDeployments(ctx context.Context, client k8
func (k *k8sConnectivityParams) deploymentList() (srcList []string, dstList []string) {
srcList = []string{clientDeploymentName, echoSameNodeDeploymentName}

if k.params.MultiCluster != "" || !k.params.SingleNode {
if !k.params.SingleNode {
dstList = append(dstList, echoOtherNodeDeploymentName)
}

return srcList, dstList
}

type deploymentClients struct {
destinationInOtherCluster bool
source k8sClientOperations
destination k8sClientOperations
source k8sClientOperations
destination k8sClientOperations
}

func (d *deploymentClients) clients() []k8sClientOperations {
if d.destinationInOtherCluster {
return []k8sClientOperations{d.source, d.destination}
}
return []k8sClientOperations{d.source}
}

Expand All @@ -312,26 +307,15 @@ func (k *k8sConnectivityParams) initClients(ctx context.Context) (*deploymentCli
destination: k.client,
}

// This automatically detects a single-node environment so we can skip deploying tests of multiple nodes environment
if k.params.MultiCluster == "" {
daemonSet, err := k.client.GetDaemonSet(ctx, k.params.antreaNamespace, AgentDaemonSetName, metav1.GetOptions{})
if err != nil {
k.Log("Unable to determine status of Antrea DaemonSet.")
return nil, fmt.Errorf("Unable to determine status of antrea DaemonSet: %w", err)
}

if daemonSet.Status.DesiredNumberScheduled == 1 && !k.params.SingleNode {
k.Log("Single node environment detected, enabling single node connectivity test")
k.params.SingleNode = true
}
} else {
dst, err := NewClient(k.params.MultiCluster, "")
if err != nil {
return nil, fmt.Errorf("unable to create Kubernetes client for remote cluster %q: %w", k.params.MultiCluster, err)
}
daemonSet, err := k.client.GetDaemonSet(ctx, k.params.antreaNamespace, AgentDaemonSetName, metav1.GetOptions{})
if err != nil {
k.Log("Unable to determine status of Antrea DaemonSet.")
return nil, fmt.Errorf("Unable to determine status of antrea DaemonSet: %w", err)
}

c.destination = dst
c.destinationInOtherCluster = true
if daemonSet.Status.DesiredNumberScheduled == 1 && !k.params.SingleNode {
k.Log("Single node environment detected, enabling single node connectivity test")
k.params.SingleNode = true
}

return c, nil
Expand All @@ -349,34 +333,15 @@ func (k *k8sConnectivityParams) deploy(ctx context.Context) error {
_, err := k.clients.source.GetNamespace(ctx, connectivityCheckNamespace, metav1.GetOptions{})
if err != nil {
srcDeploymentNeeded = true
if k.params.MultiCluster == "" {
dstDeploymentNeeded = true
}
dstDeploymentNeeded = true

k.Log("[%s] Creating namespace for connectivity check...", k.clients.source.ClusterName())
_, err = k.clients.source.CreateNamespace(ctx, connectivityCheckNamespace, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("unable to create namespace %s: %s", connectivityCheckNamespace, err)
}
}

if k.params.MultiCluster != "" {
if k.params.ForceDeploy {
if err := k.deleteDeployments(ctx, k.clients.destination); err != nil {
return err
}
}

_, err = k.clients.destination.GetNamespace(ctx, connectivityCheckNamespace, metav1.GetOptions{})
if err != nil {
dstDeploymentNeeded = true
k.Log("[%s] Creating namespace for connectivity check...", k.clients.destination.ClusterName())
_, err = k.clients.destination.CreateNamespace(ctx, connectivityCheckNamespace, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("unable to create namespace %s: %s", connectivityCheckNamespace, err)
}
}
}

if srcDeploymentNeeded {
k.Log("[%s] Deploying echo-same-node service...", k.clients.source.ClusterName())
svc := newService(echoSameNodeDeploymentName, map[string]string{"name": echoSameNodeDeploymentName}, "http", 80)
Expand Down Expand Up @@ -480,20 +445,22 @@ func (k *k8sConnectivityParams) deploy(ctx context.Context) error {
}

func (k *k8sConnectivityParams) waitForDeploymentsReady(ctx context.Context, client k8sClientOperations, deployments []string) error {
k.Log("[%s] Waiting for deployments %s to become ready...", client.ClusterName(), deployments)
for _, deployment := range deployments {
k.Log("[%s] Waiting for deployments %s to become ready...", client.ClusterName(), deployment)
}

ctx, cancel := context.WithTimeout(ctx, k.params.podReadyTimeout())
defer cancel()
for _, name := range deployments {
for client.DeploymentIsReady(ctx, connectivityCheckNamespace, name) != nil {
select {
case <-time.After(time.Second):
case <-ctx.Done():
return fmt.Errorf("waiting for deployment %s to become ready has been interrupted: %w", name, ctx.Err())
err := k.client.WaitForDeployment(ctx, time.Second, podReadyTimeout, false, func(ctx context.Context) (bool, error) {
err := client.DeploymentIsReady(ctx, connectivityCheckNamespace, name)
if err != nil {
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("waiting for deployment %s to become ready has been interrupted: %w", name, err)
}
}

return nil
}

Expand Down Expand Up @@ -551,4 +518,5 @@ type k8sClientOperations interface {
ListPods(ctx context.Context, namespace string, options metav1.ListOptions) (*corev1.PodList, error)
ExecInPod(ctx context.Context, namespace, pod, container string, command []string) (bytes.Buffer, error)
ClusterName() (name string)
WaitForDeployment(ctx context.Context, interval, timeout time.Duration, immediate bool, condition wait.ConditionWithContextFunc) error
}

0 comments on commit 2a26ae3

Please sign in to comment.