diff --git a/.github/workflows/ccm-integration-tests.yml b/.github/workflows/ccm-integration-tests.yml index 7413053..857b1e2 100644 --- a/.github/workflows/ccm-integration-tests.yml +++ b/.github/workflows/ccm-integration-tests.yml @@ -129,6 +129,7 @@ jobs: env: CLOUDSCALE_API_TOKEN: ${{ secrets.CLOUDSCALE_API_TOKEN }} + HTTP_ECHO_BRANCH: ${{ vars.HTTP_ECHO_BRANCH }} KUBERNETES: '${{ matrix.kubernetes }}' SUBNET: '${{ matrix.subnet }}' CLUSTER_PREFIX: '${{ matrix.cluster_prefix }}' diff --git a/cmd/http-echo/go.mod b/cmd/http-echo/go.mod new file mode 100644 index 0000000..bc48f99 --- /dev/null +++ b/cmd/http-echo/go.mod @@ -0,0 +1,5 @@ +module github.com/cloudscale-ch/cloudscale-cloud-controller-manager/cmd/http-echo + +go 1.23.0 + +require github.com/pires/go-proxyproto v0.7.0 diff --git a/cmd/http-echo/go.sum b/cmd/http-echo/go.sum new file mode 100644 index 0000000..0633665 --- /dev/null +++ b/cmd/http-echo/go.sum @@ -0,0 +1,2 @@ +github.com/pires/go-proxyproto v0.7.0 h1:IukmRewDQFWC7kfnb66CSomk2q/seBuilHBYFwyq0Hs= +github.com/pires/go-proxyproto v0.7.0/go.mod h1:Vz/1JPY/OACxWGQNIRY2BeyDmpoaWmEP40O9LbuiFR4= diff --git a/cmd/http-echo/main.go b/cmd/http-echo/main.go new file mode 100644 index 0000000..1436eb7 --- /dev/null +++ b/cmd/http-echo/main.go @@ -0,0 +1,73 @@ +// A http echo server to get information about connections made to it. +package main + +import ( + "context" + "flag" + "fmt" + "net" + "net/http" + "time" + + proxyproto "github.com/pires/go-proxyproto" +) + +func main() { + host := flag.String("host", "127.0.0.1", "Host to connect to") + port := flag.Int("port", 8080, "Port to connect to") + + flag.Parse() + + serve(*host, *port) +} + +// log http requests in basic fashion +func log(h http.Handler) http.Handler { + return http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + h.ServeHTTP(w, r) + fmt.Printf("%s %s (%s)\n", r.Method, r.RequestURI, r.RemoteAddr) + }) +} + +// serve HTTP API on the given host and port +func serve(host string, port int) { + router := http.NewServeMux() + + // Returns 'true' if the PROXY protocol was used for the given connection + router.HandleFunc("GET /proxy-protocol/used", + func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, r.Context().Value("HasProxyHeader")) + }) + + addr := fmt.Sprintf("%s:%d", host, port) + + server := http.Server{ + Addr: addr, + Handler: log(router), + ConnContext: func(ctx context.Context, c net.Conn) context.Context { + hasProxyHeader := false + + if c, ok := c.(*proxyproto.Conn); ok { + hasProxyHeader = c.ProxyHeader() != nil + } + + return context.WithValue(ctx, "HasProxyHeader", hasProxyHeader) + }, + } + + listener, err := net.Listen("tcp", server.Addr) + if err != nil { + panic(err) + } + + fmt.Printf("Listening on %s\n", addr) + + proxyListener := &proxyproto.Listener{ + Listener: listener, + ReadHeaderTimeout: 10 * time.Second, + } + defer proxyListener.Close() + + server.Serve(proxyListener) +} diff --git a/pkg/cloudscale_ccm/loadbalancer.go b/pkg/cloudscale_ccm/loadbalancer.go index 20bb182..e2048d3 100644 --- a/pkg/cloudscale_ccm/loadbalancer.go +++ b/pkg/cloudscale_ccm/loadbalancer.go @@ -12,6 +12,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "k8s.io/utils/ptr" ) // Annotations used by the loadbalancer integration of cloudscale_ccm. Those @@ -133,6 +134,60 @@ const ( // as all pools have to be recreated. LoadBalancerPoolProtocol = "k8s.cloudscale.ch/loadbalancer-pool-protocol" + // LoadBalancerForceHostname forces the CCM to report a specific hostname + // to Kubernetes when returning the loadbalancer status, instead of + // reporting the IP address(es). + // + // The hostname used should point to the same IP address that would + // otherwise be reported. This is used as a workaround for clusters that + // do not support status.loadBalancer.ingress.ipMode, and use `proxy` or + // `proxyv2` protocol. + // + // For newer clusters, .status.loadBalancer.ingress.ipMode is automatically + // set to "Proxy", unless LoadBalancerIPMode is set to "VIP" + // + // For more information about this workaround see + // https://kubernetes.io/blog/2023/12/18/kubernetes-1-29-feature-loadbalancer-ip-mode-alpha/ + // + // To illustrate, here's an example of a load balancer status shown on + // a Kubernetes 1.29 service with default settings: + // + // apiVersion: v1 + // kind: Service + // ... + // status: + // loadBalancer: + // ingress: + // - ip: 45.81.71.1 + // - ip: 2a06:c00::1 + // + // Using the annotation causes the status to use the given value instead: + // + // apiVersion: v1 + // kind: Service + // metadata: + // annotations: + // k8s.cloudscale.ch/loadbalancer-force-hostname: example.org + // status: + // loadBalancer: + // ingress: + // - hostname: example.org + // + // If you are not using the `proxy` or `proxyv2` protocol, or if you are + // on Kubernetes 1.30 or newer, you probly do not need this setting. + // + // See `LoadBalancerIPMode` below. + LoadBalancerForceHostname = "k8s.cloudscale.ch/loadbalancer-force-hostname" + + // LoadBalancerIPMode defines the IP mode reported to Kubernetes for the + // loadbalancers managed by this CCM. It defaults to "Proxy", where all + // traffic destined to the load balancer is sent through the load balancer, + // even if coming from inside the cluster. + // + // The older behavior, where traffic inside the cluster is directly + // sent to the backend service, can be activated by using "VIP" instead. + LoadBalancerIPMode = "k8s.cloudscale.ch/loadbalancer-ip-mode" + // LoadBalancerHealthMonitorDelayS is the delay between two successive // checks, in seconds. Defaults to 2. // @@ -269,7 +324,13 @@ func (l *loadbalancer) GetLoadBalancer( return nil, false, nil } - return loadBalancerStatus(instance), true, nil + result, err := l.loadBalancerStatus(serviceInfo, instance) + if err != nil { + return nil, true, fmt.Errorf( + "unable to get load balancer state for %s: %w", service.Name, err) + } + + return result, true, nil } // GetLoadBalancerName returns the name of the load balancer. Implementations @@ -361,7 +422,13 @@ func (l *loadbalancer) EnsureLoadBalancer( "unable to annotate service %s: %w", service.Name, err) } - return loadBalancerStatus(actual.lb), nil + result, err := l.loadBalancerStatus(serviceInfo, actual.lb) + if err != nil { + return nil, fmt.Errorf( + "unable to get load balancer state for %s: %w", service.Name, err) + } + + return result, nil } // UpdateLoadBalancer updates hosts under the specified load balancer. @@ -432,6 +499,53 @@ func (l *loadbalancer) EnsureLoadBalancerDeleted( }) } +// loadBalancerStatus generates the v1.LoadBalancerStatus for the given +// loadbalancer, as required by Kubernetes. +func (l *loadbalancer) loadBalancerStatus( + serviceInfo *serviceInfo, + lb *cloudscale.LoadBalancer, +) (*v1.LoadBalancerStatus, error) { + + status := v1.LoadBalancerStatus{} + + // When forcing the use of a hostname, there's exactly one ingress item + hostname := serviceInfo.annotation(LoadBalancerForceHostname) + if len(hostname) > 0 { + status.Ingress = []v1.LoadBalancerIngress{{Hostname: hostname}} + return &status, nil + } + + // Otherwise there as many items as there are addresses + status.Ingress = make([]v1.LoadBalancerIngress, len(lb.VIPAddresses)) + + var ipmode *v1.LoadBalancerIPMode + switch serviceInfo.annotation(LoadBalancerIPMode) { + case "Proxy": + ipmode = ptr.To(v1.LoadBalancerIPModeProxy) + case "VIP": + ipmode = ptr.To(v1.LoadBalancerIPModeVIP) + default: + return nil, fmt.Errorf( + "unsupported IP mode: '%s', must be 'Proxy' or 'VIP'", *ipmode) + } + + // On newer releases, we explicitly configure the IP mode + supportsIPMode, err := kubeutil.IsKubernetesReleaseOrNewer(l.k8s, 1, 30) + if err != nil { + return nil, fmt.Errorf("failed to get load balancer status: %w", err) + } + + for i, address := range lb.VIPAddresses { + status.Ingress[i].IP = address.Address + + if supportsIPMode { + status.Ingress[i].IPMode = ipmode + } + } + + return &status, nil +} + // ensureValidConfig ensures that the configuration can be applied at all, // acting as a gate that ensures certain invariants before any changes are // made. @@ -545,17 +659,3 @@ func (l *loadbalancer) findIPsAssignedElsewhere( return conflicts, nil } - -// loadBalancerStatus generates the v1.LoadBalancerStatus for the given -// loadbalancer, as required by Kubernetes. -func loadBalancerStatus(lb *cloudscale.LoadBalancer) *v1.LoadBalancerStatus { - - status := v1.LoadBalancerStatus{} - status.Ingress = make([]v1.LoadBalancerIngress, len(lb.VIPAddresses)) - - for i, address := range lb.VIPAddresses { - status.Ingress[i].IP = address.Address - } - - return &status -} diff --git a/pkg/cloudscale_ccm/service_info.go b/pkg/cloudscale_ccm/service_info.go index 10f0603..9379294 100644 --- a/pkg/cloudscale_ccm/service_info.go +++ b/pkg/cloudscale_ccm/service_info.go @@ -82,6 +82,10 @@ func (s serviceInfo) annotation(key string) string { return s.annotationOrDefault(key, "") case LoadBalancerPoolProtocol: return s.annotationOrDefault(key, "tcp") + case LoadBalancerForceHostname: + return s.annotationOrDefault(key, "") + case LoadBalancerIPMode: + return s.annotationOrDefault(key, "Proxy") case LoadBalancerFlavor: return s.annotationOrDefault(key, "lb-standard") case LoadBalancerVIPAddresses: diff --git a/pkg/internal/integration/service_test.go b/pkg/internal/integration/service_test.go index aecfaa9..1dc67dd 100644 --- a/pkg/internal/integration/service_test.go +++ b/pkg/internal/integration/service_test.go @@ -6,7 +6,9 @@ import ( "context" "fmt" "io" + "math/rand" "net/netip" + "os" "os/exec" "strings" "time" @@ -16,15 +18,24 @@ import ( "github.com/cloudscale-ch/cloudscale-cloud-controller-manager/pkg/internal/testkit" cloudscale "github.com/cloudscale-ch/cloudscale-go-sdk/v4" appsv1 "k8s.io/api/apps/v1" + batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" ) func (s *IntegrationTestSuite) CreateDeployment( name string, image string, replicas int32, port int32, args ...string) { + var command []string + + if len(args) > 0 { + command = args[:1] + args = args[1:] + } + spec := appsv1.DeploymentSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ @@ -41,9 +52,10 @@ func (s *IntegrationTestSuite) CreateDeployment( Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: name, - Image: image, - Args: args, + Name: name, + Image: image, + Command: command, + Args: args, Ports: []v1.ContainerPort{ {ContainerPort: port}, }, @@ -82,19 +94,105 @@ func (s *IntegrationTestSuite) ExposeDeployment( }, } - _, err := s.k8s.CoreV1().Services(s.ns).Create( - context.Background(), - &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Annotations: annotations, + service, err := s.k8s.CoreV1().Services(s.ns).Get( + context.Background(), name, metav1.GetOptions{}, + ) + + if err != nil { + _, err = s.k8s.CoreV1().Services(s.ns).Create( + context.Background(), + &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Annotations: annotations, + }, + Spec: spec, }, - Spec: spec, + metav1.CreateOptions{}, + ) + s.Require().NoError(err) + } else { + service.Spec = spec + service.ObjectMeta.Annotations = annotations + + _, err = s.k8s.CoreV1().Services(s.ns).Update( + context.Background(), + service, + metav1.UpdateOptions{}, + ) + s.Require().NoError(err) + } +} + +// RunJob starts a single job and then awaits the result, returing it as string +func (s *IntegrationTestSuite) RunJob( + image string, timeout time.Duration, cmd ...string) string { + + ctx, _ := context.WithTimeout(context.Background(), timeout) + name := fmt.Sprintf("job-%08x", rand.Uint32()) + + // Specify the job + spec := batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, }, + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Name: name, + Image: image, + Command: cmd, + }, + }, + }, + }, + }, + } + + // Start it + _, err := s.k8s.BatchV1().Jobs(s.ns).Create( + ctx, + &spec, metav1.CreateOptions{}, ) s.Require().NoError(err) + + // Wait for completion + var job *batchv1.Job + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, timeout, true, + func(ctx context.Context) (bool, error) { + job, err = s.k8s.BatchV1().Jobs(s.ns).Get( + ctx, name, metav1.GetOptions{}) + + if err != nil { + return false, err + } + return job.Status.Succeeded > 0, nil + }, + ) + + s.Require().NoError(err) + + // Get pod + pods, err := s.k8s.CoreV1().Pods(s.ns).List( + ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("job-name=%s", name), + }, + ) + + s.Require().NoError(err) + s.Require().Len(pods.Items, 1) + + logs, err := s.k8s.CoreV1().Pods(s.ns).GetLogs( + pods.Items[0].Name, &v1.PodLogOptions{}).Do(ctx).Raw() + + s.Require().NoError(err) + + return string(logs) } // CCMLogs returns all the logs of the CCM since the given time. @@ -493,3 +591,98 @@ func (s *IntegrationTestSuite) TestFloatingIPConflicts() { lines := s.CCMLogs(start) s.Assert().Contains(lines, "assigned to another service") } + +func (s *IntegrationTestSuite) TestServiceProxyProtocol() { + + // Get the branch to run http-echo with (in the future, we might + // offer this in a separate container). + branch := os.Getenv("HTTP_ECHO_BRANCH") + if len(branch) == 0 { + branch = "main" + } + + // Deploy our http-echo server to check for proxy connections + s.T().Log("Creating http-echo deployment", "branch", branch) + s.CreateDeployment("http-echo", "golang", 2, 80, "bash", "-c", fmt.Sprintf(` + git clone https://github.com/cloudscale-ch/cloudscale-cloud-controller-manager ccm; + cd ccm; + git checkout %s || exit 1; + cd cmd/http-echo; + go run main.go -host 0.0.0.0 -port 80 + `, branch)) + + // Expose the deployment using a LoadBalancer service + s.ExposeDeployment("http-echo", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-pool-protocol": "proxy", + + // Make sure to get the default behavior of older Kubernetes releases, + // even on newer releases. + "k8s.cloudscale.ch/loadbalancer-ip-mode": "VIP", + }) + + // Wait for the service to be ready + s.T().Log("Waiting for http-echo service to be ready") + service := s.AwaitServiceReady("http-echo", 180*time.Second) + s.Require().NotNil(service) + + addr := service.Status.LoadBalancer.Ingress[0].IP + url := fmt.Sprintf("http://%s/proxy-protocol/used", addr) + + // Wait for respones to work + s.T().Log("Waiting for http-echo responses") + errors := 0 + + for i := 0; i < 100; i++ { + _, err := testkit.HTTPRead(url) + + if err == nil { + break + } else { + s.T().Logf("Request %d failed: %s", i, err) + errors++ + } + + time.Sleep(5 * time.Millisecond) + } + + // Make sure our HTTP requests get wrapped in the PROXY protocol + s.T().Log("Testing PROXY protocol from outside") + + used, err := testkit.HTTPRead(url) + s.Assert().NoError(err) + s.Assert().Equal("true\n", used) + + // Sending a request from inside the cluster does not work, unless we + // use a workaround. + s.T().Log("Testing PROXY protocol from inside") + used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) + s.Assert().Equal("false\n", used) + + // The workaround works by using an IP that needs to be reolved via name + s.ExposeDeployment("http-echo", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-pool-protocol": "proxy", + "k8s.cloudscale.ch/loadbalancer-ip-mode": "VIP", + "k8s.cloudscale.ch/loadbalancer-force-hostname": fmt.Sprintf( + "%s.cust.cloudscale.ch", + strings.ReplaceAll(addr, ".", "-"), + ), + }) + + s.T().Log("Testing PROXY protocol from inside with workaround") + used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) + s.Assert().Equal("true\n", used) + + // On newer Kubernetes releases, the defaults just work + newer, err := kubeutil.IsKubernetesReleaseOrNewer(s.k8s, 1, 30) + s.Assert().NoError(err) + + if newer { + s.ExposeDeployment("http-echo", 80, 80, map[string]string{ + "k8s.cloudscale.ch/loadbalancer-pool-protocol": "proxy", + }) + + s.T().Log("Testing PROXY protocol on newer Kubernetes releases") + used = s.RunJob("curlimages/curl", 90*time.Second, "curl", "-s", url) + s.Assert().Equal("true\n", used) + } +} diff --git a/pkg/internal/kubeutil/annotate.go b/pkg/internal/kubeutil/annotate.go index 3e9b8f7..3ecc220 100644 --- a/pkg/internal/kubeutil/annotate.go +++ b/pkg/internal/kubeutil/annotate.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "strconv" "strings" v1 "k8s.io/api/core/v1" @@ -129,3 +130,30 @@ func PatchServiceExternalTrafficPolicy( return PatchService(ctx, client, service, operations) } + +// IsKubernetesReleaseOrNewer fetches the Kubernetes release and returns +// true if matches the given major.minor release, or is newer. +func IsKubernetesReleaseOrNewer( + client kubernetes.Interface, + major int, + minor int, +) (bool, error) { + release, err := client.Discovery().ServerVersion() + if err != nil { + return false, fmt.Errorf("failed to read kubernetes version: %w", err) + } + + k8sMajor, err := strconv.Atoi(release.Major) + if err != nil { + return false, fmt.Errorf( + "failed to convert '%s' to int: %w", release.Major, err) + } + + k8sMinor, err := strconv.Atoi(release.Minor) + if err != nil { + return false, fmt.Errorf( + "failed to convert '%s' to int: %w", release.Minor, err) + } + + return k8sMajor > major || (k8sMajor == major && k8sMinor >= minor), nil +}