Skip to content

Commit

Permalink
feat: validate exported services when handling backend update (#211)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhiying-lin authored Nov 8, 2024
1 parent 5f65940 commit 0cb38e4
Show file tree
Hide file tree
Showing 5 changed files with 511 additions and 18 deletions.
194 changes: 176 additions & 18 deletions pkg/controllers/hub/trafficmanagerbackend/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ import (
"context"
"errors"
"fmt"
"math"
"strings"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/trafficmanager/armtrafficmanager"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -38,6 +41,8 @@ import (
const (
trafficManagerBackendProfileFieldKey = ".spec.profile.name"
trafficManagerBackendBackendFieldKey = ".spec.backend.name"
// fields name used to filter resources
exportedServiceFieldNamespacedName = ".spec.serviceReference.namespacedName"

// AzureResourceEndpointNamePrefix is the prefix format of the Azure Traffic Manager Endpoint created by the fleet controller.
// The naming convention of a Traffic Manager Endpoint is fleet-{TrafficManagerBackendUUID}#.
Expand Down Expand Up @@ -231,7 +236,14 @@ func (r *Reconciler) handleUpdate(ctx context.Context, backend *fleetnetv1alpha1
// The controller will retry when err is not nil.
return ctrl.Result{}, err
}

klog.V(2).InfoS("Found the serviceImport", "trafficManagerBackend", backendKObj, "serviceImport", klog.KObj(serviceImport), "clusters", serviceImport.Status.Clusters)

desiredEndpointsMaps, invalidServicesMaps, err := r.validateExportedServiceForServiceImport(ctx, backend, serviceImport)
if err != nil {
return ctrl.Result{}, err
}
klog.V(2).InfoS("Found the exported services behind the serviceImport", "trafficManagerBackend", backendKObj, "serviceImport", klog.KObj(serviceImport), "numberOfDesiredEndpoints", len(desiredEndpointsMaps), "numberOfInvalidServices", len(invalidServicesMaps))
return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -365,6 +377,100 @@ func (r *Reconciler) updateTrafficManagerBackendStatus(ctx context.Context, back
return nil
}

type desiredEndpoint struct {
Endpoint armtrafficmanager.Endpoint
Cluster fleetnetv1alpha1.ClusterStatus
}

// validateExportedServiceForServiceImport returns two maps:
// * a map of desired endpoints for the serviceImport (key is the endpoint name).
// * a map of invalid services which cannot be exposed as the trafficManagerEndpoints (key is the cluster name).
func (r *Reconciler) validateExportedServiceForServiceImport(ctx context.Context, backend *fleetnetv1alpha1.TrafficManagerBackend, serviceImport *fleetnetv1alpha1.ServiceImport) (map[string]desiredEndpoint, map[string]error, error) {
backendKObj := klog.KObj(backend)
serviceImportKObj := klog.KObj(serviceImport)

if len(serviceImport.Status.Clusters) == 0 {
klog.V(2).InfoS("No clusters found in the serviceImport", "trafficManagerBackend", backendKObj, "serviceImport", serviceImportKObj)
// Controller will only create the serviceImport when there is a cluster exposing their services.
// Updating the status will be in a separate call and could fail.
setUnknownCondition(backend, "In the process of exporting the services")
// We don't need to requeue the request and when the serviceImport status is set, the controller will be re-triggered.
return nil, nil, r.updateTrafficManagerBackendStatus(ctx, backend)
}

internalServiceExportList := &fleetnetv1alpha1.InternalServiceExportList{}
namespaceName := types.NamespacedName{Namespace: serviceImport.Namespace, Name: serviceImport.Name}
listOpts := client.MatchingFields{
exportedServiceFieldNamespacedName: namespaceName.String(),
}
if listErr := r.Client.List(ctx, internalServiceExportList, &listOpts); listErr != nil {
klog.ErrorS(listErr, "Failed to list internalServiceExports used by the serviceImport", "trafficManagerBackend", backendKObj, "serviceImport", serviceImportKObj)
setUnknownCondition(backend, fmt.Sprintf("Failed to list the exported service %q: %v", namespaceName, listErr))
if err := r.updateTrafficManagerBackendStatus(ctx, backend); err != nil {
return nil, nil, err
}
return nil, nil, listErr
}
internalServiceExportMap := make(map[string]*fleetnetv1alpha1.InternalServiceExport, len(internalServiceExportList.Items))
for i, export := range internalServiceExportList.Items {
internalServiceExportMap[export.Spec.ServiceReference.ClusterID] = &internalServiceExportList.Items[i]
}

desiredEndpoints := make(map[string]desiredEndpoint, len(serviceImport.Status.Clusters)) // key is the endpoint name
invalidServices := make(map[string]error, len(serviceImport.Status.Clusters)) // key is cluster name
for _, clusterStatus := range serviceImport.Status.Clusters {
internalServiceExport, ok := internalServiceExportMap[clusterStatus.Cluster]
if !ok {
getErr := fmt.Errorf("failed to find the internalServiceExport for the cluster %q", clusterStatus.Cluster)
// Usually controller should update the serviceImport status first before deleting the internalServiceImport.
// It could happen that the current serviceImport has stale information.
// The controller will be re-triggered when the serviceImport is updated.
klog.ErrorS(getErr, "InternalServiceExport not found for the cluster", "trafficManagerBackend", backendKObj, "serviceImport", serviceImportKObj, "clusterID", clusterStatus.Cluster)
setUnknownCondition(backend, fmt.Sprintf("Failed to find the exported service %q for %q: %v", namespaceName, clusterStatus.Cluster, getErr))
return nil, nil, r.updateTrafficManagerBackendStatus(ctx, backend)
}
if err := isValidTrafficManagerEndpoint(internalServiceExport); err != nil {
invalidServices[clusterStatus.Cluster] = err
klog.V(2).InfoS("Invalid service for TrafficManager endpoint", "trafficManagerBackend", backendKObj, "serviceImport", serviceImportKObj, "clusterID", clusterStatus.Cluster, "error", err)
continue
}
endpoint := generateAzureTrafficManagerEndpoint(backend, internalServiceExport)
desiredEndpoints[*endpoint.Name] = desiredEndpoint{Endpoint: endpoint, Cluster: clusterStatus}
}
desiredWeight := int(math.Ceil(float64(*backend.Spec.Weight) / float64(len(desiredEndpoints))))
for _, dp := range desiredEndpoints {
dp.Endpoint.Properties.Weight = ptr.To(int64(desiredWeight))
}
klog.V(2).InfoS("Finishing validating services", "trafficManagerBackend", backendKObj, "serviceImport", serviceImportKObj, "numberOfDesiredEndpoints", len(desiredEndpoints), "numberOfInvalidServices", len(invalidServices), "desiredWeight", desiredWeight)
return desiredEndpoints, invalidServices, nil
}

// isValidTrafficManagerEndpoint returns error if the service cannot be added as a TrafficManager endpoint.
func isValidTrafficManagerEndpoint(export *fleetnetv1alpha1.InternalServiceExport) error {
if export.Spec.Type != corev1.ServiceTypeLoadBalancer {
return fmt.Errorf("unsupported service type %q", export.Spec.Type)
}
if export.Spec.IsInternalLoadBalancer {
return fmt.Errorf("internal load balancer is not supported")
}
if !export.Spec.IsDNSLabelConfigured {
return fmt.Errorf("DNS label is not configured to the public IP")
}
return nil
}

func generateAzureTrafficManagerEndpoint(backend *fleetnetv1alpha1.TrafficManagerBackend, service *fleetnetv1alpha1.InternalServiceExport) armtrafficmanager.Endpoint {
endpointName := fmt.Sprintf(AzureResourceEndpointNameFormat, backend.UID, backend.Spec.Backend, service.Spec.ServiceReference.ClusterID)
return armtrafficmanager.Endpoint{
Name: &endpointName,
Type: ptr.To(string(armtrafficmanager.EndpointTypeAzureEndpoints)),
Properties: &armtrafficmanager.EndpointProperties{
TargetResourceID: service.Spec.PublicIPResourceID,
EndpointStatus: ptr.To(armtrafficmanager.EndpointStatusEnabled),
},
}
}

// SetupWithManager sets up the controller with the Manager.
func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
// set up an index for efficient trafficManagerBackend lookup
Expand Down Expand Up @@ -392,6 +498,19 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) err
return err
}

// add index to quickly query internalServiceExport list by service
internalServiceExportIndexerFunc := func(o client.Object) []string {
name, ok := o.(*fleetnetv1alpha1.InternalServiceExport)
if !ok {
return []string{}
}
return []string{name.Spec.ServiceReference.NamespacedName}
}
if err := mgr.GetFieldIndexer().IndexField(ctx, &fleetnetv1alpha1.InternalServiceExport{}, exportedServiceFieldNamespacedName, internalServiceExportIndexerFunc); err != nil {
klog.ErrorS(err, "Failed to create index", "field", exportedServiceFieldNamespacedName)
return err
}

return ctrl.NewControllerManagedBy(mgr).
For(&fleetnetv1alpha1.TrafficManagerBackend{}).
Watches(
Expand All @@ -402,6 +521,10 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) err
&fleetnetv1alpha1.ServiceImport{},
handler.EnqueueRequestsFromMapFunc(r.serviceImportEventHandler()),
).
Watches(
&fleetnetv1alpha1.InternalServiceExport{},
handler.EnqueueRequestsFromMapFunc(r.internalServiceExportEventHandler()),
).
Complete(r)
}

Expand Down Expand Up @@ -434,27 +557,62 @@ func (r *Reconciler) trafficManagerProfileEventHandler() handler.MapFunc {

func (r *Reconciler) serviceImportEventHandler() handler.MapFunc {
return func(ctx context.Context, object client.Object) []reconcile.Request {
trafficManagerBackendList := &fleetnetv1alpha1.TrafficManagerBackendList{}
fieldMatcher := client.MatchingFields{
trafficManagerBackendBackendFieldKey: object.GetName(),
}
// ServiceImport and TrafficManagerBackend should be in the same namespace.
if err := r.Client.List(ctx, trafficManagerBackendList, client.InNamespace(object.GetNamespace()), fieldMatcher); err != nil {
klog.ErrorS(err,
"Failed to list trafficManagerBackends for the serviceImport",
"serviceImport", klog.KObj(object))
return r.enqueueTrafficManagerBackendByServiceImport(ctx, object)
}
}

func (r *Reconciler) enqueueTrafficManagerBackendByServiceImport(ctx context.Context, object client.Object) []reconcile.Request {
trafficManagerBackendList := &fleetnetv1alpha1.TrafficManagerBackendList{}
fieldMatcher := client.MatchingFields{
trafficManagerBackendBackendFieldKey: object.GetName(),
}
// ServiceImport and TrafficManagerBackend should be in the same namespace.
if err := r.Client.List(ctx, trafficManagerBackendList, client.InNamespace(object.GetNamespace()), fieldMatcher); err != nil {
klog.ErrorS(err,
"Failed to list trafficManagerBackends for the serviceImport",
"serviceImport", klog.KObj(object))
return []reconcile.Request{}
}

res := make([]reconcile.Request, 0, len(trafficManagerBackendList.Items))
for _, backend := range trafficManagerBackendList.Items {
res = append(res, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: backend.Namespace,
Name: backend.Name,
},
})
}
return res
}

func (r *Reconciler) internalServiceExportEventHandler() handler.MapFunc {
return func(ctx context.Context, object client.Object) []reconcile.Request {
internalServiceExport, ok := object.(*fleetnetv1alpha1.InternalServiceExport)
if !ok {
return []reconcile.Request{}
}

res := make([]reconcile.Request, 0, len(trafficManagerBackendList.Items))
for _, backend := range trafficManagerBackendList.Items {
res = append(res, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: backend.Namespace,
Name: backend.Name,
},
})
serviceImport := &fleetnetv1alpha1.ServiceImport{}
serviceImportName := types.NamespacedName{Namespace: internalServiceExport.Spec.ServiceReference.Namespace, Name: internalServiceExport.Spec.ServiceReference.Name}
serviceImportKRef := klog.KRef(serviceImportName.Namespace, serviceImportName.Name)
if err := r.Client.Get(ctx, serviceImportName, serviceImport); err != nil {
klog.ErrorS(err, "Failed to get serviceImport", "serviceImport", serviceImportKRef, "internalServiceExport", klog.KObj(internalServiceExport))
return []reconcile.Request{}
}
return res
for _, cs := range serviceImport.Status.Clusters {
// When the cluster exposes the service, first we will check whether the cluster can be exposed or not.
// For example, whether the service spec conflicts with other existing services.
// If the cluster is not in the serviceImport status, there are two possibilities:
// * the controller is still in the processing of this cluster.
// * the cluster cannot be exposed because of the conflicted spec, which will be clearly indicated in the
// serviceExport status.
// For the first case, when the processing is finished, serviceImport will be updated so that this controller
// will be triggered again.
if cs.Cluster == internalServiceExport.Spec.ServiceReference.ClusterID {
return r.enqueueTrafficManagerBackendByServiceImport(ctx, serviceImport)
}
}
return []reconcile.Request{}
}
}
Loading

0 comments on commit 0cb38e4

Please sign in to comment.