diff --git a/cmd/vsphere-cloud-controller-manager/main.go b/cmd/vsphere-cloud-controller-manager/main.go index 44bf2429c..38767db43 100644 --- a/cmd/vsphere-cloud-controller-manager/main.go +++ b/cmd/vsphere-cloud-controller-manager/main.go @@ -25,7 +25,9 @@ import ( "fmt" "math/rand" "os" + "os/signal" "strings" + "syscall" "time" cloudprovider "k8s.io/cloud-provider" @@ -200,11 +202,24 @@ func main() { _ = watch.Close() // ignore explicitly when the watch closes }(watch) + // Notify the stop channel on SIGTERM or SIGINT in order + // to run cleanup such as logout of VSphere sessions + cancelChan := make(chan os.Signal, 1) + signal.Notify(cancelChan, syscall.SIGTERM, syscall.SIGINT) + go func() { + sig := <-cancelChan + klog.Infof("Signal received: %s. Stopping...\n", sig) + close(stop) + }() + if err := app.Run(completedConfig, cloud, controllerInitializers, webhookHandlers, stop); err != nil { // explicitly ignore the error by Fprintf, exiting anyway due to app error + // We don't call SessionLogout here since errors after initialization aren't bubbled up to here _, _ = fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } + // Log out of all sessions on exit or signal + vsphere.SessionLogout() } command.Run = func(cmd *cobra.Command, args []string) { diff --git a/pkg/cloudprovider/vsphere/cloud.go b/pkg/cloudprovider/vsphere/cloud.go index d9b12a1d2..364a2ce1f 100644 --- a/pkg/cloudprovider/vsphere/cloud.go +++ b/pkg/cloudprovider/vsphere/cloud.go @@ -21,6 +21,7 @@ import ( "io" "os" "runtime" + "sync" v1 "k8s.io/api/core/v1" klog "k8s.io/klog/v2" @@ -55,7 +56,13 @@ const ( dualStackFeatureGateEnv string = "ENABLE_ALPHA_DUAL_STACK" ) +var ( + logoutCh chan struct{} + logoutWG sync.WaitGroup +) + func init() { + logoutCh = make(chan struct{}) cloudprovider.RegisterCloudProvider(RegisteredProviderName, func(config io.Reader) (cloudprovider.Interface, error) { byConfig, err := io.ReadAll(config) if err != nil { @@ -101,6 +108,14 @@ func newVSphere(cfg *ccfg.CPIConfig, nsxtcfg *ncfg.Config, lbcfg *lcfg.LBConfig, return vs, nil } +// SessionLogout signals all VSphere sessions to logout and waits before returning +func SessionLogout() { + if logoutCh != nil { + close(logoutCh) + logoutWG.Wait() + } +} + // Initialize initializes the cloud provider. func (vs *VSphere) Initialize(clientBuilder cloudprovider.ControllerClientBuilder, stop <-chan struct{}) { client, err := clientBuilder.Client(ClientName) @@ -113,6 +128,18 @@ func (vs *VSphere) Initialize(clientBuilder cloudprovider.ControllerClientBuilde vs.connectionManager = connMgr vs.nodeManager.connectionManager = connMgr + logoutWG.Add(1) + // Gracefully logout of all VSphere sessions if the stop channel is signaled + go func() { + // TODO: The stop channel is unusable since the cloud-provider module + // passes in a context.TODO() in for the context: + // https://github.com/kubernetes/cloud-provider/blob/1bae60eb89ced16795f81a900b79cb55524ba6f0/app/controllermanager.go#L536 + // <-stop + <-logoutCh + logout(vs) + logoutWG.Done() + }() + vs.informMgr.AddNodeListener(vs.nodeAdded, vs.nodeDeleted, nil) vs.informMgr.Listen() @@ -261,7 +288,9 @@ func validateDualStack(cfg *ccfg.CPIConfig) error { } func logout(vs *VSphere) { + klog.Info("logout: ending session to vSphere") vs.connectionManager.Logout() + klog.Info("logout: finished") } // Notification handler when node is added into k8s cluster.