From 9b4e9801b43b448b5829d995cc6e2d32da34f065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Oliver=20B=C3=A4hler?= Date: Fri, 22 Mar 2024 13:37:24 +0100 Subject: [PATCH] feat: add flag --workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Oliver Bähler --- controllers/pod/metadata.go | 4 ++- controllers/pv/controller.go | 4 ++- controllers/rbac/manager.go | 4 ++- controllers/resources/global.go | 4 ++- controllers/resources/namespaced.go | 4 ++- controllers/servicelabels/endpoint.go | 4 ++- controllers/servicelabels/service.go | 4 ++- controllers/tenant/manager.go | 48 +++++++++++++++++++++++++-- go.mod | 2 ++ go.sum | 4 +++ main.go | 26 ++++++++------- 11 files changed, 86 insertions(+), 22 deletions(-) diff --git a/controllers/pod/metadata.go b/controllers/pod/metadata.go index 9c9d772f..787790ce 100644 --- a/controllers/pod/metadata.go +++ b/controllers/pod/metadata.go @@ -15,6 +15,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -123,8 +124,9 @@ func (m *MetadataReconciler) isNamespaceInTenant(ctx context.Context, namespace return len(tl.Items) > 0 } -func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error { return ctrl.NewControllerManagedBy(mgr). For(&corev1.Pod{}, m.forOptionPerInstanceName(ctx)). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(m) } diff --git a/controllers/pv/controller.go b/controllers/pv/controller.go index b4753b74..e743d417 100644 --- a/controllers/pv/controller.go +++ b/controllers/pv/controller.go @@ -12,6 +12,7 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" log2 "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -88,7 +89,7 @@ func (c *Controller) Reconcile(ctx context.Context, request reconcile.Request) ( return reconcile.Result{}, nil } -func (c *Controller) SetupWithManager(mgr ctrl.Manager) error { +func (c *Controller) SetupWithManager(mgr ctrl.Manager, workerCount int) error { label, err := capsuleutils.GetTypeLabel(&capsulev1beta2.Tenant{}) if err != nil { return err @@ -113,5 +114,6 @@ func (c *Controller) SetupWithManager(mgr ctrl.Manager) error { return !ok }))). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(c) } diff --git a/controllers/rbac/manager.go b/controllers/rbac/manager.go index ce431915..24a7e36a 100644 --- a/controllers/rbac/manager.go +++ b/controllers/rbac/manager.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -32,11 +33,12 @@ type Manager struct { } //nolint:revive -func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string) (err error) { +func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string, workerCount int) (err error) { namesPredicate := utils.NamesMatchingPredicate(ProvisionerRoleName, DeleterRoleName) crErr := ctrl.NewControllerManagedBy(mgr). For(&rbacv1.ClusterRole{}, namesPredicate). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) if crErr != nil { err = multierror.Append(err, crErr) diff --git a/controllers/resources/global.go b/controllers/resources/global.go index 4e8fa7d3..88ba1809 100644 --- a/controllers/resources/global.go +++ b/controllers/resources/global.go @@ -16,6 +16,7 @@ import ( "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/handler" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" @@ -63,7 +64,7 @@ func (r *Global) enqueueRequestFromTenant(ctx context.Context, object client.Obj return reqs } -func (r *Global) SetupWithManager(mgr ctrl.Manager) error { +func (r *Global) SetupWithManager(mgr ctrl.Manager, workerCount int) error { r.client = mgr.GetClient() r.processor = Processor{ client: mgr.GetClient(), @@ -72,6 +73,7 @@ func (r *Global) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&capsulev1beta2.GlobalTenantResource{}). Watches(&capsulev1beta2.Tenant{}, handler.EnqueueRequestsFromMapFunc(r.enqueueRequestFromTenant)). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/controllers/resources/namespaced.go b/controllers/resources/namespaced.go index 60eadcbd..f2cb2750 100644 --- a/controllers/resources/namespaced.go +++ b/controllers/resources/namespaced.go @@ -14,6 +14,7 @@ import ( "sigs.k8s.io/cluster-api/util/patch" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" ctrllog "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -26,7 +27,7 @@ type Namespaced struct { processor Processor } -func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error { +func (r *Namespaced) SetupWithManager(mgr ctrl.Manager, workerCount int) error { r.client = mgr.GetClient() r.processor = Processor{ client: mgr.GetClient(), @@ -34,6 +35,7 @@ func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&capsulev1beta2.TenantResource{}). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/controllers/servicelabels/endpoint.go b/controllers/servicelabels/endpoint.go index 60a45184..db877ac4 100644 --- a/controllers/servicelabels/endpoint.go +++ b/controllers/servicelabels/endpoint.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" ) type EndpointsLabelsReconciler struct { @@ -17,7 +18,7 @@ type EndpointsLabelsReconciler struct { Log logr.Logger } -func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error { r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{ obj: &corev1.Endpoints{}, client: mgr.GetClient(), @@ -26,5 +27,6 @@ func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ct return ctrl.NewControllerManagedBy(mgr). For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/controllers/servicelabels/service.go b/controllers/servicelabels/service.go index 74d1368b..e1ff8dfa 100644 --- a/controllers/servicelabels/service.go +++ b/controllers/servicelabels/service.go @@ -9,6 +9,7 @@ import ( "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/controller" ) type ServicesLabelsReconciler struct { @@ -17,7 +18,7 @@ type ServicesLabelsReconciler struct { Log logr.Logger } -func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error { +func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error { r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{ obj: &corev1.Service{}, client: mgr.GetClient(), @@ -26,5 +27,6 @@ func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctr return ctrl.NewControllerManagedBy(mgr). For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)). + WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}). Complete(r) } diff --git a/controllers/tenant/manager.go b/controllers/tenant/manager.go index f797bfa5..03814829 100644 --- a/controllers/tenant/manager.go +++ b/controllers/tenant/manager.go @@ -5,8 +5,13 @@ package tenant import ( "context" + "errors" + "fmt" + "strings" + "time" "github.com/go-logr/logr" + "github.com/juju/mutex/v2" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -14,8 +19,10 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/reconcile" capsulev1beta2 "github.com/projectcapsule/capsule/api/v1beta2" @@ -23,12 +30,16 @@ import ( type Manager struct { client.Client - Log logr.Logger - Recorder record.EventRecorder - RESTConfig *rest.Config + Log logr.Logger + Recorder record.EventRecorder + RESTConfig *rest.Config + MaxConcurrentReconciles int + clock mutex.Clock } func (r *Manager) SetupWithManager(mgr ctrl.Manager) error { + r.clock = clock.RealClock{} + return ctrl.NewControllerManagedBy(mgr). For(&capsulev1beta2.Tenant{}). Owns(&corev1.Namespace{}). @@ -36,6 +47,7 @@ func (r *Manager) SetupWithManager(mgr ctrl.Manager) error { Owns(&corev1.LimitRange{}). Owns(&corev1.ResourceQuota{}). Owns(&rbacv1.RoleBinding{}). + WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}). Complete(r) } @@ -55,6 +67,26 @@ func (r Manager) Reconcile(ctx context.Context, request ctrl.Request) (result ct return } + + releaser, err := mutex.Acquire(r.mutexSpec(instance)) + if err != nil { + switch { + case errors.As(err, &mutex.ErrTimeout): + r.Log.Info("acquire timed out, current process is blocked by another reconciliation") + + return ctrl.Result{Requeue: true}, nil + case errors.As(err, &mutex.ErrCancelled): + r.Log.Info("acquire cancelled") + + return ctrl.Result{Requeue: true}, nil + default: + r.Log.Error(err, "acquire failed") + + return ctrl.Result{}, err + } + } + defer releaser.Release() + // Ensuring the Tenant Status if err = r.updateTenantStatus(ctx, instance); err != nil { r.Log.Error(err, "Cannot update Tenant status") @@ -138,6 +170,16 @@ func (r Manager) Reconcile(ctx context.Context, request ctrl.Request) (result ct return ctrl.Result{}, err } +func (r *Manager) mutexSpec(obj client.Object) mutex.Spec { + return mutex.Spec{ + Name: strings.ReplaceAll(fmt.Sprintf("capsule%s", obj.GetUID()), "-", ""), + Clock: r.clock, + Delay: 2 * time.Millisecond, + Timeout: time.Second, + Cancel: nil, + } +} + func (r *Manager) updateTenantStatus(ctx context.Context, tnt *capsulev1beta2.Tenant) error { return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) { if tnt.Spec.Cordoned { diff --git a/go.mod b/go.mod index b2b91828..25b6c510 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.21 require ( github.com/go-logr/logr v1.4.1 github.com/hashicorp/go-multierror v1.1.1 + github.com/juju/mutex/v2 v2.0.0 github.com/onsi/ginkgo/v2 v2.17.1 github.com/onsi/gomega v1.32.0 github.com/pkg/errors v0.9.1 @@ -49,6 +50,7 @@ require ( github.com/imdario/mergo v0.3.13 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect diff --git a/go.sum b/go.sum index d845df48..fd5498ec 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,10 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 h1:EJHbsNpQyupmMeWTq7inn+5L/WZ7JfzCVPJ+DP9McCQ= +github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9/go.mod h1:TRm7EVGA3mQOqSVcBySRY7a9Y1/gyVhh/WTCnc5sD4U= +github.com/juju/mutex/v2 v2.0.0 h1:rVmJdOaXGWF8rjcFHBNd4x57/1tks5CgXHx55O55SB0= +github.com/juju/mutex/v2 v2.0.0/go.mod h1:jwCfBs/smYDaeZLqeaCi8CB8M+tOes4yf827HoOEoqk= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= diff --git a/main.go b/main.go index f4918fbd..c63ffaa0 100644 --- a/main.go +++ b/main.go @@ -83,7 +83,7 @@ func main() { var metricsAddr, namespace, configurationName string - var webhookPort int + var webhookPort, workerCount int var goFlagSet goflag.FlagSet @@ -94,6 +94,7 @@ func main() { "Enabling this will ensure there is only one active controller manager.") flag.BoolVar(&version, "version", false, "Print the Capsule version and exit") flag.StringVar(&configurationName, "configuration-name", "default", "The CapsuleConfiguration resource name to use") + flag.IntVar(&workerCount, "workers", 1, "Defines the number of concurrent reconciles the controller can handle") opts := zap.Options{ EncoderConfigOptions: append([]zap.EncoderConfigOption{}, func(config *zapcore.EncoderConfig) { @@ -190,10 +191,11 @@ func main() { } if err = (&tenantcontroller.Manager{ - RESTConfig: manager.GetConfig(), - Client: manager.GetClient(), - Log: ctrl.Log.WithName("controllers").WithName("Tenant"), - Recorder: manager.GetEventRecorderFor("tenant-controller"), + RESTConfig: manager.GetConfig(), + Client: manager.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("Tenant"), + Recorder: manager.GetEventRecorderFor("tenant-controller"), + MaxConcurrentReconciles: workerCount, }).SetupWithManager(manager); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Tenant") os.Exit(1) @@ -254,21 +256,21 @@ func main() { os.Exit(1) } - if err = rbacManager.SetupWithManager(ctx, manager, configurationName); err != nil { + if err = rbacManager.SetupWithManager(ctx, manager, configurationName, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Rbac") os.Exit(1) } if err = (&servicelabelscontroller.ServicesLabelsReconciler{ Log: ctrl.Log.WithName("controllers").WithName("ServiceLabels"), - }).SetupWithManager(ctx, manager); err != nil { + }).SetupWithManager(ctx, manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ServiceLabels") os.Exit(1) } if err = (&servicelabelscontroller.EndpointsLabelsReconciler{ Log: ctrl.Log.WithName("controllers").WithName("EndpointLabels"), - }).SetupWithManager(ctx, manager); err != nil { + }).SetupWithManager(ctx, manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "EndpointLabels") os.Exit(1) } @@ -281,12 +283,12 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "EndpointSliceLabels") } - if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager); err != nil { + if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PodLabels") os.Exit(1) } - if err = (&pv.Controller{}).SetupWithManager(manager); err != nil { + if err = (&pv.Controller{}).SetupWithManager(manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "PersistentVolume") os.Exit(1) } @@ -298,12 +300,12 @@ func main() { os.Exit(1) } - if err = (&resources.Global{}).SetupWithManager(manager); err != nil { + if err = (&resources.Global{}).SetupWithManager(manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "resources.Global") os.Exit(1) } - if err = (&resources.Namespaced{}).SetupWithManager(manager); err != nil { + if err = (&resources.Namespaced{}).SetupWithManager(manager, workerCount); err != nil { setupLog.Error(err, "unable to create controller", "controller", "resources.Namespaced") os.Exit(1) }