Skip to content

Commit

Permalink
Support multiple replicas of ASO pod
Browse files Browse the repository at this point in the history
 - Use Deployment mode Recreate, which ensures that all webhook pods
   serving CRD conversion requests during upgrade are running the latest
   version of ASO.
 - Take lease during CRD management, to ensure that ASO doesn't fight
   with itself during CRD installation.
  • Loading branch information
matthchr committed Nov 26, 2024
1 parent 204237c commit faea3bf
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ spec:
selector:
matchLabels:
control-plane: controller-manager
strategy:
type: Recreate
template:
metadata:
annotations:
Expand Down
158 changes: 128 additions & 30 deletions v2/cmd/controller/app/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/http/pprof"
"os"
"sync"
"time"

. "github.com/Azure/azure-service-operator/v2/internal/logging"
Expand All @@ -25,12 +26,14 @@ import (
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/util/workqueue"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/healthz"
ctrlleader "sigs.k8s.io/controller-runtime/pkg/leaderelection"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
Expand All @@ -49,6 +52,7 @@ import (
"github.com/Azure/azure-service-operator/v2/internal/util/interval"
"github.com/Azure/azure-service-operator/v2/internal/util/kubeclient"
"github.com/Azure/azure-service-operator/v2/internal/util/lockedrand"
"github.com/Azure/azure-service-operator/v2/internal/util/to"
common "github.com/Azure/azure-service-operator/v2/pkg/common/config"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime/conditions"
Expand Down Expand Up @@ -92,11 +96,17 @@ func SetupControllerManager(ctx context.Context, setupLog logr.Logger, flgs *Fla
}

k8sConfig := ctrl.GetConfigOrDie()
mgr, err := ctrl.NewManager(k8sConfig, ctrl.Options{
ctrlOptions := ctrl.Options{
Scheme: scheme,
NewCache: cacheFunc,
LeaderElection: flgs.EnableLeaderElection,
LeaderElectionID: "controllers-leader-election-azinfra-generated",
// Manually set lease duration (to default) so that we can use it for our leader elector too.
// See https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/manager/internal.go#L52
LeaseDuration: to.Ptr(15 * time.Second),
RenewDeadline: to.Ptr(10 * time.Second),
RetryPeriod: to.Ptr(2 * time.Second),
GracefulShutdownTimeout: to.Ptr(30 * time.Second),
// It's only safe to set LeaderElectionReleaseOnCancel to true if the manager binary ends
// when the manager exits. This is the case with us today, so we set this to true whenever
// flgs.EnableLeaderElection is true.
Expand All @@ -107,7 +117,8 @@ func SetupControllerManager(ctx context.Context, setupLog logr.Logger, flgs *Fla
Port: flgs.WebhookPort,
CertDir: flgs.WebhookCertDir,
}),
})
}
mgr, err := ctrl.NewManager(k8sConfig, ctrlOptions)
if err != nil {
setupLog.Error(err, "unable to create manager")
os.Exit(1)
Expand All @@ -119,45 +130,38 @@ func SetupControllerManager(ctx context.Context, setupLog logr.Logger, flgs *Fla
os.Exit(1)
}

// TODO: Put all of the CRD stuff into a method?
crdManager, err := newCRDManager(clients.log, mgr.GetConfig())
var leaderElector *LeaderElector
if flgs.EnableLeaderElection {
// nolint: contextcheck // false positive?
leaderElector, err = newLeaderElector(k8sConfig, setupLog, ctrlOptions, mgr)
if err != nil {
setupLog.Error(err, "failed to initialize leader elector")
os.Exit(1)
}
}

crdManager, err := newCRDManager(clients.log, mgr.GetConfig(), leaderElector)
if err != nil {
setupLog.Error(err, "failed to initialize CRD client")
os.Exit(1)
}
existingCRDs, err := crdManager.ListOperatorCRDs(ctx)
existingCRDs, err := crdManager.ListCRDs(ctx)
if err != nil {
setupLog.Error(err, "failed to list current CRDs")
os.Exit(1)
}

switch flgs.CRDManagementMode {
case "auto":
var goalCRDs []apiextensions.CustomResourceDefinition
goalCRDs, err = crdManager.LoadOperatorCRDs(crdmanagement.CRDLocation, cfg.PodNamespace)
if err != nil {
setupLog.Error(err, "failed to load CRDs from disk")
os.Exit(1)
}

// We only apply CRDs if we're in webhooks mode. No other mode will have CRD CRUD permissions
if cfg.OperatorMode.IncludesWebhooks() {
var installationInstructions []*crdmanagement.CRDInstallationInstruction
installationInstructions, err = crdManager.DetermineCRDsToInstallOrUpgrade(goalCRDs, existingCRDs, flgs.CRDPatterns)
if err != nil {
setupLog.Error(err, "failed to determine CRDs to apply")
os.Exit(1)
}

included := crdmanagement.IncludedCRDs(installationInstructions)
if len(included) == 0 {
err = eris.New("No existing CRDs in cluster and no --crd-pattern specified")
setupLog.Error(err, "failed to apply CRDs")
os.Exit(1)
}

// Note that this step will restart the pod when it succeeds
err = crdManager.ApplyCRDs(ctx, installationInstructions)
err = crdManager.Install(ctx, crdmanagement.Options{
CRDPatterns: flgs.CRDPatterns,
ExistingCRDs: existingCRDs,
Path: crdmanagement.CRDLocation,
Namespace: cfg.PodNamespace,
})
if err != nil {
setupLog.Error(err, "failed to apply CRDs")
os.Exit(1)
Expand All @@ -172,7 +176,7 @@ func SetupControllerManager(ctx context.Context, setupLog logr.Logger, flgs *Fla

// There are 3 possibilities once we reach here:
// 1. Webhooks mode + crd-management-mode=auto: existingCRDs will be up to date (upgraded, crd-pattern applied, etc)
// by the time we get here as the pod will keep exiting until it is so (see crdManager.ApplyCRDs above).
// by the time we get here as the pod will keep exiting until it is so (see crdManager.applyCRDs above).
// 2. Non-webhooks mode + auto: As outlined in https://azure.github.io/azure-service-operator/guide/authentication/multitenant-deployment/#upgrading
// the webhooks mode pod must be upgraded first, so there's not really much practical difference between this and
// crd-management-mode=none (see below).
Expand Down Expand Up @@ -458,14 +462,108 @@ func makeControllerOptions(log logr.Logger, cfg config.Values) generic.Options {
}
}

func newCRDManager(logger logr.Logger, k8sConfig *rest.Config) (*crdmanagement.Manager, error) {
type LeaderElector struct {
elector *leaderelection.LeaderElector
leaseAcquired *sync.WaitGroup
leaseReleased *sync.WaitGroup
}

func (l *LeaderElector) Elector() *leaderelection.LeaderElector {
return l.elector
}

func (l *LeaderElector) LeaseAcquired() *sync.WaitGroup {
return l.leaseAcquired
}

func (l *LeaderElector) LeaseReleased() *sync.WaitGroup {
return l.leaseReleased
}

func newLeaderElector(
k8sConfig *rest.Config,
log logr.Logger,
ctrlOptions ctrl.Options,
mgr ctrl.Manager,
) (*LeaderElector, error) {
resourceLock, err := ctrlleader.NewResourceLock(
k8sConfig,
mgr,
ctrlleader.Options{
LeaderElection: ctrlOptions.LeaderElection,
LeaderElectionResourceLock: ctrlOptions.LeaderElectionResourceLock,
LeaderElectionID: ctrlOptions.LeaderElectionID,
})
if err != nil {
return nil, err
}

log = log.WithName("crdManagementLeaderElector")
leaseAcquiredWait := &sync.WaitGroup{}
leaseAcquiredWait.Add(1)
leaseReleasedWait := &sync.WaitGroup{}
leaseReleasedWait.Add(1)

// My assumption is that OnStoppedLeading is guaranteed to
// be called after OnStartedLeading and we don't need to protect this
// shared state with a mutex.
var leaderContext context.Context

leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: resourceLock,
LeaseDuration: *ctrlOptions.LeaseDuration,
RenewDeadline: *ctrlOptions.RenewDeadline,
RetryPeriod: *ctrlOptions.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
log.V(Status).Info("Elected leader")
leaseAcquiredWait.Done()
leaderContext = ctx
},
OnStoppedLeading: func() {
leaseReleasedWait.Done()

exitCode := 1
select {
case <-leaderContext.Done():
exitCode = 0 // done is closed
default:
}

if exitCode == 0 {
log.V(Status).Info("Lost leader due to cooperative lease release")
} else {
log.V(Status).Info("Lost leader")
}
os.Exit(exitCode)
},
},
ReleaseOnCancel: ctrlOptions.LeaderElectionReleaseOnCancel,
Name: ctrlOptions.LeaderElectionID,
})
if err != nil {
return nil, err
}

return &LeaderElector{
elector: leaderElector,
leaseAcquired: leaseAcquiredWait,
leaseReleased: leaseReleasedWait,
}, nil
}

func newCRDManager(
logger logr.Logger,
k8sConfig *rest.Config,
leaderElection crdmanagement.LeaderElector,
) (*crdmanagement.Manager, error) {
crdScheme := runtime.NewScheme()
_ = apiextensions.AddToScheme(crdScheme)
crdClient, err := client.New(k8sConfig, client.Options{Scheme: crdScheme})
if err != nil {
return nil, eris.Wrap(err, "unable to create CRD client")
}

crdManager := crdmanagement.NewManager(logger, kubeclient.NewClient(crdClient))
crdManager := crdmanagement.NewManager(logger, kubeclient.NewClient(crdClient), leaderElection)
return crdManager, nil
}
2 changes: 2 additions & 0 deletions v2/config/manager/manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ spec:
matchLabels:
control-plane: controller-manager
replicas: 1
strategy:
type: Recreate
revisionHistoryLimit: 10
template:
metadata:
Expand Down
2 changes: 1 addition & 1 deletion v2/internal/crdmanagement/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func testSetup(t *testing.T) *testData {
logger := testcommon.NewTestLogger(t)
cfg := config.Values{}

crdManager := crdmanagement.NewManager(logger, kubeClient)
crdManager := crdmanagement.NewManager(logger, kubeClient, nil)

return &testData{
cfg: cfg,
Expand Down
Loading

0 comments on commit faea3bf

Please sign in to comment.