Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flag --workers #1013

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion controllers/pod/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
Expand Down Expand Up @@ -123,8 +124,9 @@ func (m *MetadataReconciler) isNamespaceInTenant(ctx context.Context, namespace
return len(tl.Items) > 0
}

func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (m *MetadataReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}, m.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(m)
}
4 changes: 3 additions & 1 deletion controllers/pv/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
log2 "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -88,7 +89,7 @@ func (c *Controller) Reconcile(ctx context.Context, request reconcile.Request) (
return reconcile.Result{}, nil
}

func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
func (c *Controller) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
label, err := capsuleutils.GetTypeLabel(&capsulev1beta2.Tenant{})
if err != nil {
return err
Expand All @@ -113,5 +114,6 @@ func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {

return !ok
}))).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(c)
}
4 changes: 3 additions & 1 deletion controllers/rbac/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -32,11 +33,12 @@ type Manager struct {
}

//nolint:revive
func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string) (err error) {
func (r *Manager) SetupWithManager(ctx context.Context, mgr ctrl.Manager, configurationName string, workerCount int) (err error) {
namesPredicate := utils.NamesMatchingPredicate(ProvisionerRoleName, DeleterRoleName)

crErr := ctrl.NewControllerManagedBy(mgr).
For(&rbacv1.ClusterRole{}, namesPredicate).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
if crErr != nil {
err = multierror.Append(err, crErr)
Expand Down
4 changes: 3 additions & 1 deletion controllers/resources/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -63,7 +64,7 @@ func (r *Global) enqueueRequestFromTenant(ctx context.Context, object client.Obj
return reqs
}

func (r *Global) SetupWithManager(mgr ctrl.Manager) error {
func (r *Global) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
r.client = mgr.GetClient()
r.processor = Processor{
client: mgr.GetClient(),
Expand All @@ -72,6 +73,7 @@ func (r *Global) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.GlobalTenantResource{}).
Watches(&capsulev1beta2.Tenant{}, handler.EnqueueRequestsFromMapFunc(r.enqueueRequestFromTenant)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}

Expand Down
4 changes: 3 additions & 1 deletion controllers/resources/namespaced.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -26,14 +27,15 @@ type Namespaced struct {
processor Processor
}

func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error {
func (r *Namespaced) SetupWithManager(mgr ctrl.Manager, workerCount int) error {
r.client = mgr.GetClient()
r.processor = Processor{
client: mgr.GetClient(),
}

return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.TenantResource{}).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}

Expand Down
4 changes: 3 additions & 1 deletion controllers/servicelabels/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

type EndpointsLabelsReconciler struct {
Expand All @@ -17,7 +18,7 @@ type EndpointsLabelsReconciler struct {
Log logr.Logger
}

func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{
obj: &corev1.Endpoints{},
client: mgr.GetClient(),
Expand All @@ -26,5 +27,6 @@ func (r *EndpointsLabelsReconciler) SetupWithManager(ctx context.Context, mgr ct

return ctrl.NewControllerManagedBy(mgr).
For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}
4 changes: 3 additions & 1 deletion controllers/servicelabels/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
)

type ServicesLabelsReconciler struct {
Expand All @@ -17,7 +18,7 @@ type ServicesLabelsReconciler struct {
Log logr.Logger
}

func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager) error {
func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, workerCount int) error {
r.abstractServiceLabelsReconciler = abstractServiceLabelsReconciler{
obj: &corev1.Service{},
client: mgr.GetClient(),
Expand All @@ -26,5 +27,6 @@ func (r *ServicesLabelsReconciler) SetupWithManager(ctx context.Context, mgr ctr

return ctrl.NewControllerManagedBy(mgr).
For(r.abstractServiceLabelsReconciler.obj, r.abstractServiceLabelsReconciler.forOptionPerInstanceName(ctx)).
WithOptions(controller.Options{MaxConcurrentReconciles: workerCount}).
Complete(r)
}
48 changes: 45 additions & 3 deletions controllers/tenant/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,49 @@

import (
"context"
"errors"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
"github.com/juju/mutex/v2"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

capsulev1beta2 "github.com/projectcapsule/capsule/api/v1beta2"
)

type Manager struct {
client.Client
Log logr.Logger
Recorder record.EventRecorder
RESTConfig *rest.Config
Log logr.Logger
Recorder record.EventRecorder
RESTConfig *rest.Config
MaxConcurrentReconciles int
clock mutex.Clock
}

func (r *Manager) SetupWithManager(mgr ctrl.Manager) error {
r.clock = clock.RealClock{}

return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.Tenant{}).
Owns(&corev1.Namespace{}).
Owns(&networkingv1.NetworkPolicy{}).
Owns(&corev1.LimitRange{}).
Owns(&corev1.ResourceQuota{}).
Owns(&rbacv1.RoleBinding{}).
WithOptions(controller.Options{MaxConcurrentReconciles: r.MaxConcurrentReconciles}).
Complete(r)
}

Expand All @@ -55,6 +67,26 @@

return
}

releaser, err := mutex.Acquire(r.mutexSpec(instance))
if err != nil {
switch {
case errors.As(err, &mutex.ErrTimeout):

Check failure on line 74 in controllers/tenant/manager.go

View workflow job for this annotation

GitHub Actions / lint

errorsas: second argument to errors.As should not be *error (govet)
r.Log.Info("acquire timed out, current process is blocked by another reconciliation")

return ctrl.Result{Requeue: true}, nil
case errors.As(err, &mutex.ErrCancelled):

Check failure on line 78 in controllers/tenant/manager.go

View workflow job for this annotation

GitHub Actions / lint

errorsas: second argument to errors.As should not be *error (govet)
r.Log.Info("acquire cancelled")

return ctrl.Result{Requeue: true}, nil
default:
r.Log.Error(err, "acquire failed")

return ctrl.Result{}, err
}
}
defer releaser.Release()

// Ensuring the Tenant Status
if err = r.updateTenantStatus(ctx, instance); err != nil {
r.Log.Error(err, "Cannot update Tenant status")
Expand Down Expand Up @@ -138,6 +170,16 @@
return ctrl.Result{}, err
}

func (r *Manager) mutexSpec(obj client.Object) mutex.Spec {
return mutex.Spec{
Name: strings.ReplaceAll(fmt.Sprintf("capsule%s", obj.GetUID()), "-", ""),
Clock: r.clock,
Delay: 2 * time.Millisecond,
Timeout: time.Second,
Cancel: nil,
}
}

func (r *Manager) updateTenantStatus(ctx context.Context, tnt *capsulev1beta2.Tenant) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
if tnt.Spec.Cordoned {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/go-logr/logr v1.4.1
github.com/hashicorp/go-multierror v1.1.1
github.com/juju/mutex/v2 v2.0.0
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -49,6 +50,7 @@ require (
github.com/imdario/mergo v0.3.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9 h1:EJHbsNpQyupmMeWTq7inn+5L/WZ7JfzCVPJ+DP9McCQ=
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9/go.mod h1:TRm7EVGA3mQOqSVcBySRY7a9Y1/gyVhh/WTCnc5sD4U=
github.com/juju/mutex/v2 v2.0.0 h1:rVmJdOaXGWF8rjcFHBNd4x57/1tks5CgXHx55O55SB0=
github.com/juju/mutex/v2 v2.0.0/go.mod h1:jwCfBs/smYDaeZLqeaCi8CB8M+tOes4yf827HoOEoqk=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand Down
26 changes: 14 additions & 12 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func main() {

var metricsAddr, namespace, configurationName string

var webhookPort int
var webhookPort, workerCount int

var goFlagSet goflag.FlagSet

Expand All @@ -94,6 +94,7 @@ func main() {
"Enabling this will ensure there is only one active controller manager.")
flag.BoolVar(&version, "version", false, "Print the Capsule version and exit")
flag.StringVar(&configurationName, "configuration-name", "default", "The CapsuleConfiguration resource name to use")
flag.IntVar(&workerCount, "workers", 1, "Defines the number of concurrent reconciles the controller can handle")

opts := zap.Options{
EncoderConfigOptions: append([]zap.EncoderConfigOption{}, func(config *zapcore.EncoderConfig) {
Expand Down Expand Up @@ -190,10 +191,11 @@ func main() {
}

if err = (&tenantcontroller.Manager{
RESTConfig: manager.GetConfig(),
Client: manager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Tenant"),
Recorder: manager.GetEventRecorderFor("tenant-controller"),
RESTConfig: manager.GetConfig(),
Client: manager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Tenant"),
Recorder: manager.GetEventRecorderFor("tenant-controller"),
MaxConcurrentReconciles: workerCount,
}).SetupWithManager(manager); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Tenant")
os.Exit(1)
Expand Down Expand Up @@ -254,21 +256,21 @@ func main() {
os.Exit(1)
}

if err = rbacManager.SetupWithManager(ctx, manager, configurationName); err != nil {
if err = rbacManager.SetupWithManager(ctx, manager, configurationName, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Rbac")
os.Exit(1)
}

if err = (&servicelabelscontroller.ServicesLabelsReconciler{
Log: ctrl.Log.WithName("controllers").WithName("ServiceLabels"),
}).SetupWithManager(ctx, manager); err != nil {
}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ServiceLabels")
os.Exit(1)
}

if err = (&servicelabelscontroller.EndpointsLabelsReconciler{
Log: ctrl.Log.WithName("controllers").WithName("EndpointLabels"),
}).SetupWithManager(ctx, manager); err != nil {
}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "EndpointLabels")
os.Exit(1)
}
Expand All @@ -281,12 +283,12 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "EndpointSliceLabels")
}

if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager); err != nil {
if err = (&podlabelscontroller.MetadataReconciler{Client: manager.GetClient()}).SetupWithManager(ctx, manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PodLabels")
os.Exit(1)
}

if err = (&pv.Controller{}).SetupWithManager(manager); err != nil {
if err = (&pv.Controller{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "PersistentVolume")
os.Exit(1)
}
Expand All @@ -298,12 +300,12 @@ func main() {
os.Exit(1)
}

if err = (&resources.Global{}).SetupWithManager(manager); err != nil {
if err = (&resources.Global{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "resources.Global")
os.Exit(1)
}

if err = (&resources.Namespaced{}).SetupWithManager(manager); err != nil {
if err = (&resources.Namespaced{}).SetupWithManager(manager, workerCount); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "resources.Namespaced")
os.Exit(1)
}
Expand Down
Loading