Skip to content

Commit

Permalink
feat: add leader election to allow for multiple replicas. (#27)
Browse files Browse the repository at this point in the history
* feat: add leader election to allow for multiple replicas.

* chore: run tidy and remove unneeded package.

* fix: reverse order of arguments to be more intuitive.

* feat: allow identity to be set via config.

This allows for the use of the downward API to set the identity as the pod name.

* fix: missed variable rename.

* fix: set correct defaults.
  • Loading branch information
Justin Bertrand authored Nov 3, 2023
1 parent 0bb13c2 commit 16c285e
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 97 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/golang/protobuf v1.4.3 // indirect
github.com/google/go-cmp v0.5.2 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.1.2 // indirect
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/imdario/mergo v0.3.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
Expand Down Expand Up @@ -202,6 +203,7 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
107 changes: 100 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"github.com/StatCan/ingress-istio-controller/pkg/controller"
"github.com/StatCan/ingress-istio-controller/pkg/signals"
istio "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/transport"
"k8s.io/klog"
)

Expand All @@ -23,14 +33,15 @@ var (
scopedGateways bool
ingressClass string
defaultWeight int
lockName string
lockNamespace string
lockIdentity string
)

func main() {
klog.InitFlags(nil)
flag.Parse()

stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
klog.Fatalf("error building kubeconfig: %v", err)
Expand Down Expand Up @@ -63,12 +74,71 @@ func main() {
istioInformerFactory.Networking().V1beta1().VirtualServices(),
istioInformerFactory.Networking().V1beta1().Gateways())

kubeInformerFactory.Start(stopCh)
istioInformerFactory.Start(stopCh)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

wait := make(chan os.Signal, 1)
signal.Notify(wait, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-wait
klog.Info("received signal, shutting down")
cancel()
}()

kubeInformerFactory.Start(ctx.Done())
istioInformerFactory.Start(ctx.Done())

runWithLeaderElection(ctlr, cfg, kubeclient, ctx)
}

if err = ctlr.Run(2, stopCh); err != nil {
klog.Fatalf("error running controller: %v", err)
func runWithLeaderElection(ctlr *controller.Controller, cfg *rest.Config, kubeclient *kubernetes.Clientset, ctx context.Context) {

// Acquire a lock
// Identity used to distinguish between multiple cloud controller manager instances
klog.Infof("leader identity id: %s", lockIdentity)

var lock resourcelock.Interface

lock = &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: lockName,
Namespace: lockNamespace,
},
Client: kubeclient.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: lockIdentity,
},
}

cfg.Wrap(transport.ContextCanceller(ctx, fmt.Errorf("the leader is shutting down")))

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
if err := ctlr.Run(2, ctx); err != nil {
if err != context.Canceled {
klog.Errorf("error running controller: %v", err)
}
}
},
OnStoppedLeading: func() {
klog.Info("stopped leading")
},
OnNewLeader: func(identity string) {
if identity == lockIdentity {
// We just acquired the lock
return
}

klog.Infof("new leader elected: %v", identity)
},
},
})
}

func init() {
Expand All @@ -79,4 +149,27 @@ func init() {
flag.BoolVar(&scopedGateways, "scoped-gateways", false, "Gateways are scoped to the same namespace they exist within. This will limit the Service search for Load Balancer status. In istiod, this is controlled via the PILOT_SCOPE_GATEWAY_TO_NAMESPACE environment variable.")
flag.StringVar(&ingressClass, "ingress-class", "", "The ingress class annotation to monitor (empty string to skip checking annotation)")
flag.IntVar(&defaultWeight, "virtual-service-weight", 100, "The weight of the Virtual Service destination.")
flag.StringVar(&lockName, "lock-name", getEnvVarOrDefault("LOCK_NAME", "ingress-istio-controller"), "The name of the leader lock.")
flag.StringVar(&lockNamespace, "lock-namespace", getEnvVarOrDefault("LOCK_NAMESPACE", "ingress-istio-controller-system"), "The namespace where the leader lock resides.")
flag.StringVar(&lockIdentity, "lock-identity", getEnvVarOrDefault("LOCK_IDENTITY", createIdentity()), "The unique identity of the replica. (Pod name is best)")
}

// Returns an environment variables value if set, otherwise returns dflt.
func getEnvVarOrDefault(envVar, dflt string) string {
val, ok := os.LookupEnv(envVar)
if ok {
return val
} else {
return dflt
}
}

// Creates a unique identity.
func createIdentity() string {
hostname, err := os.Hostname()
if err != nil {
klog.Fatal(err)
}
// add a uniquifier so that two processes on the same host don't accidentally both become active
return hostname + "_" + string(uuid.NewUUID())
}
9 changes: 5 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -131,24 +132,24 @@ func NewController(
}

// Run runs the controller.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
func (c *Controller) Run(threadiness int, ctx context.Context) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()

klog.Info("starting controller")

klog.Info("waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched, c.gatewaysSynched); !ok {
if ok := cache.WaitForCacheSync(ctx.Done(), c.ingressesSynched, c.ingressClassesSynched, c.servicesSynched, c.virtualServicesSynched, c.gatewaysSynched); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

klog.Info("starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
}

klog.Info("started workers")
<-stopCh
<-ctx.Done()
klog.Info("shutting down workers")

return nil
Expand Down
43 changes: 0 additions & 43 deletions pkg/signals/signal.go

This file was deleted.

23 changes: 0 additions & 23 deletions pkg/signals/signal_posix.go

This file was deleted.

20 changes: 0 additions & 20 deletions pkg/signals/signal_windows.go

This file was deleted.

0 comments on commit 16c285e

Please sign in to comment.